Сценарии интеграции с Apache Camel
Сколько вы знаете шаблонов интеграции приложений(EIP)? Сколько из них вы можете использовать?
Симпатичный «верблюд» снова здесь, а значит, представляю вам продолжение серии статей об Apache Camel. В этой статье найдутся как самые необходимые, так и очень любопытные шаблоны интеграции. Расскажу о том, как они ложатся на нашу интеграцию.
Если вы знакомы с шаблонами, но решаете стоит ли связываться с «верблюдом», то наши примеры помогут разобраться. Если вам интересен путь от сценариев использования к реализации интеграции, то эта статья как раз об этом. Прошу под кат.
Напомню, мы построили свою сервисную шину на Apache Camel. Предыстория описывались в предыдущей части. Сейчас Camel для нас уже данность, с которой надо бороться. В нашем “зоопарке” прибавилось, кроме последнего, мы имеем две системы. Первая — это наша основная система, которая построена на классической трёхуровневой “клиент-серверной” архитектуре. Она представляет собой BPMS систему, сложность которой обусловлена многолетним процессом “допиливания” мелких “хотелок”. Вторая — небольшая и простая, как валенок, система той же архитектуры. Её будем называть офисом продаж. Разбирать варианты использования будем как раз на её примере.
Анкетная система с потребностью интеграции.
Наш офис продаж — приложение, которое не имеет сложной логики. Оно реализует процесс регистрации клиентов и ввод документов, необходимых для заказа. Эта система открыта для клиентов после регистрации, поэтому требования ко времени отклика и пропускной способности более высокого порядка, чем к основной системе. Ограничения безопасности, пропускной способности, независимой настройки и необходимость поддержания обособленного цикла разработки привели к выделению офиса продаж в самостоятельное приложение. Для обслуживания процесса продаж нам требовалась его интеграция в ИТ-инфраструктуру нашего заказчика.
Знакомство с интеграцией начнём с вариантов использования офиса продаж. Мы выделили в бизнес-процессе нашего приложения две основные роли: покупателя и администратора. Именно эти группы пользователей с ним работают.
На схеме разобраны основные варианты использования. Обратите внимание на правую часть схемы, несколько прецедентов выходят за рамки офиса продаж и требуют определённых действий со стороны основной системы. Именно эти прецеденты мы будем разбирать дальше.
Для формирования целостного представления о нашем офисе разберём схему. Наши покупатели подают заявки для участия в аукционах. Проведение аукционов — основной процесс офиса продаж. Для участия в них покупатели должны проходить регистрацию и оформлять заявки, заполняя большое количество скучных регистрационных форм. Администратор выполняет функции подготовки и проведения аукционов. Для этого по каждому из аукционов подготавливается специальный шаблон заявки — анкета, поэтому класс таких систем иногда называют анкетными. Процесс проведения аукциона завершается для офиса продаж передачей информации по поданным заявкам в основную систему для выбора победителя и оформления сделки.
Качественные требования к процессам интеграции:
- Потери нормативно-справочной информации допустимы и не должны приводить к нарушению процесса функционирования подсистем;
- Обращение во внешние системы не должно нарушать процесс проведения аукциона в случае отказа внешних систем и/или каналов связи;
- Заявки должны гарантировано передаваться и доставляться между подсистемами;
- Файлы должны гарантировано передаваться и доставляться между подсистемами;
- Пользовательская нагрузка на проведение аукционов должна полностью ложиться на офис продаж, разгружая основную систему. Поэтому, взаимодействия между системами должны быть сведены к минимуму;
- В распоряжении пользователя офиса продаж не должно быть механизмов воздействия на основную систему.
При разборе обозначенных прецедентов возникли следующие архитектурные задачи:
- Организация транспорта;
- Распределение функций между подсистемами;
- Синхронизация справочников;
- Передача зависимых сущностей;
- Наблюдение за процессом обмена;
- Передача файлов.
Разберём решения этих архитектурных задач на примере сценариев использования и соотнесём с шаблонами EIP.
Организация транспорта.
В состав транспорта входят обе описанных выше системы и Camel. О последнем уже много было сказано в предыдущей части статьи, поэтому пойдём дальше.
Все три системы связанны брокером ActiveMQ по протоколу AMQP.
Напомню, системы обмениваются пакетами, используя JMS. Полезной нагрузкой этих пакетов решили сделать XML и сериализовать в него объекты одной из систем, используя JAXB. Но какие объекты взять за основу, чтобы при этом минимизировать временные затраты на создание интеграции? Систем две — значит, и форматов может быть два. Решили остановится на объектах офиса продаж, эта система более легковесная, объекты области домена связаны с другими архитектурными слоями этого приложения только JPA аннотациями. Выделение на её основе транспортных объектов не составляло труда. Альтернативное решение (использовать объекты основной системы) представлялось практически невыполнимым из-за наличия большого количества метаданных и сложных связей с другими бизнес-сущностями, выходящими за границы интересующего нас процесса обслуживания офиса продаж. Ещё один оставшийся вариант — создание новых транспортных объектов. Его даже не рассматривали, так как он требовал реализацию процедуры и импорта, и экспорта в обеих системах, от которых в первом случае для подсистемы офиса продаж можно было отказаться.
Наверное, вас удивило, что мы выбрали XML в качестве полезной нагрузки наших пакетов? Но, уверяю, этому были причины. Сериализация, использующая стандарт JAXB, включена в JVM — это упрощает работу с ней и не требует дополнительных модулей. На момент разработки интеграции у нас уже был опыт работы с JAXB, так что никакого overhead-а на ознакомление не требовалось. Ещё одна “плюшка” — то, что XML — текстовый формат, а значит, при возникновении нештатных ситуаций можно вмешиваться в его структуру и вносить необходимые коррективы. Но были и недостатки: известно, что структура формата XML заметно увеличивает объём данных. Однако, информация, которой мы планировали обмениваться, по первоначальным оценкам не превышала для справочной информации — 100 мб, для заявок — 1 мб на заявку. Это, согласитесь, не очень большие цифры. Кроме этого, обмен информацией должен был происходить единовременно, и требований к оперативности не предъявлялось.
Немного подробностей об архитектуре формирования и разбора сообщений.
До этого мы описывали механизмы передачи и работы с сообщениями, не касаясь непосредственно Camel-а. Пора рассказать, какие функции были возложены на него, и каким образом он был встроен в систему интеграции.
Итак, функции:
- Независимая настройка маршрутизации сообщений;
- Передача сообщений из очередей офиса в очереди основной системы и наоборот;
- Согласование форматов для унификации конечных точек.
Для упрощения связывания систем единым брокером сообщений названия очередей и топиков необходимо унифицировать. Мы используем такое соглашение: [наименование системы].[наименование функционального блока].[направление передачи данных в канале]
Впрочем, вскоре оказалось, что чем меньше точек в сложной системе, тем проще настройка. Весь функционал по разбору сообщений и маршрутизации легко перебрасывается в сервисную шину. Так что для нашей основной системы очень скоро осталась одна конечная точка для отправки данных: bpms.office.export
Распределение функций между подсистемами.
Посмотрим на это распределение с точки зрения архитектуры интеграции. Офис продаж отвечает за:
- обработку, настройку и размещение лотов;
- все задачи формирования и проверки данных пакетов с информацией о заявках;
- подготовку и передачу дополнительных файлов зарегистрированных заявок;
- синхронизацию нормативно-справочной информации.
Сервисная шина:
- передачу нормативно-справочной информации;
- передачу информации о лотах и аукционах;
- передача данных заявок.
Основная система:
- подготовку и отправку нормативно-справочной информации;
- инициацию и отправку задания на проведение аукциона;
- построение дальнейшего бизнес-процесса обработки заявок;
Идём дальше, теперь реальные примеры.
Синхронизация справочников.
Процесс синхронизации справочников составлен на основе приведённых ранее прецедентов. Схема получается такая:
Видно, что по сравнению со схемой прецедентов здесь синхронизация выполняется на стороне офиса продаж. Мы уже упоминали об этом раньше, но повторюсь, офисов продаж может быть много, а это значит, что и выгружать нормативную информацию надо в каждый из них. Так как требования к надёжности передачи, а также обеспечения гарантированной доставки здесь не столь строги, можно использовать JMS каналы типа “публикация-подписка”. Далее приведена схема передачи данных в виде шаблонов EIP.
На схеме видно, что в сервисной шине используются два роута. Сообщения проходят по единому выходному каналу основной системы (BPMS), фильтруются и передаются в специализированный канал, отвечающий за работу со справочной информацией. Здесь формат сообщений согласуется с системой офиса, они объединяются в каталоги (агрегируются) и передаются в систему офиса продаж по ещё одному JMS каналу.
На схеме используются несколько паттернов EIP:
- Message channel — канал сообщений;
- Endpoint — конечная точка;
- Message filter — фильтр сообщений;
- Message translator — шаблон компонента преобразования формата сообщения;
- Aggregator — шаблон, который позволяет объединять несколько сообщений в одно.
Почему так сложно? Давайте пойдём по порядку. Использование фильтра сообщений необходимо для упрощения настройки и появилось здесь после объединения выходных конечных точек в одну. Минусы подхода:
- возможны взаимные блокировки передачи сообщений. Высокая интенсивность обмена сообщениями одного из обслуживаемых типов (например, со справочной информацией) может переполнить буфер единого канала и заблокировать передачу сообщений другого типа. Получается, что обмен сообщениями одного типа зависит от сообщений другого типа.
Полностью исключить такую возможность нельзя, но можно уменьшить эффект, создав дополнительные буферные каналы для каждого типа сообщений. Брокер ActiveMQ поддерживает раздельную настройку выделения ресурсов на каждый такой канал, благодаря чему можно позаботиться о необходимом запасе памяти и дискового пространства с учётом интенсивности обмена.
Вернёмся к нашему роуту, дальше сообщения передаются как раз по такому буферному JMS каналу. Следующим интересным моментом является компонент, объединяющий пакеты. Необходим он из-за того, что для выполнения синхронизации требуется полный набор элементов справочника. Самая простая возможность обеспечить полноту — это собрать все элементы в одно сообщение. Критичный минус объединения сообщений — это большой объём полученного сообщения, который может стать проблемой и для брокера, и сервисной шины.
К счастью, наши справочники не столь велики, и простота вышла на первое место. Но эту проблему не стоит недооценивать, в настройках брокера для каждого канала выделяется ограниченный объём оперативной памяти, превышение которого может привести к критической ошибке.
Пример роутов:
from("jms:topic:bpms.office.request").routeId("catalog-synchronization-filter")
.filter( header("destination").isEqualTo( "portal.export.catalog" ) )
.setHeader("catalogCode", simple("${header.catalog}"))
.inOnly("jms:topic:catalog.synchronized");
from("jms:topic:catalog.synchronized").routeId("catalog-synchronization-topic")
.filter( header("catalogCode").in(
...
"GRNTI","OKATO","OKFS","OKOGU","OKOPF","OKVED",
… ) )
.setHeader("catalogCode")
.groovy( "switch( request.getHeaders().get('catalogCode') ){" +
...
" case "GRNTI": return "GRNTICatalog"" +
" case "OKATO": return "okatoCatalog"" +
" case "OKFS": return "okfsCatalog"" +
" case "OKOGU": return "okoguCatalog"" +
" case "OKOPF": return "okopfCatalog"" +
" case "OKVED": return "okvedCatalog"" +
...
"}")
.inOnly( "direct:office.synchronization");
from("direct:office.synchronized").routeId("catalog-import-office-filter")
.aggregate(header("catalogCode"), new CatalogItemAggregationStrategy()).completionTimeout(3000)
.inOnly("jms:topic:office.catalog.synchronization?timeToLive=200000");
private static class CatalogItemAggregationStrategy implements AggregationStrategy
{
public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
{
String newBody = newExchange.getIn().getBody(String.class);
if (oldExchange != null)
{
String oldCode = (String)oldExchange.getIn().getHeader("catalogCode");
String newCode = (String)newExchange.getIn().getHeader("catalogCode");
if( StringUtils.equals( oldCode, newCode) )
{
String oldBody = oldExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(
StringUtils.substringBeforeLast( oldBody, "n</catalogs>") +
StringUtils.substringAfter( newBody, "?>") + "n</catalogs>" );
return oldExchange;
}
}
StringBuilder builder = new StringBuilder( newBody );
builder.insert( builder.indexOf("n") + 1, "<catalogs>n");
builder.append( "</catalogs>n" );
newExchange.getIn().setBody( builder.toString());
return newExchange;
}
}
Передача зависимых сущностей.
Для начала, составим схему процесса публикации лота с оглядкой на диаграмму прецедентов.
Как видно процесс передачи сравнительно прост. Можно переложить схему процесса на EIP, получится уже известная по предыдущей части схема:
В простоте есть слабое место, обратите внимание на подробную схему публикации лотов в нотации BPMN.
Вот и получается, что выгрузить лот можно только после того, как будет создан аукцион. Процесс публикации аукциона изображён на первой схеме этого раздела серым цветом. Для решения проблемы связанных процессов предлагались два решения:
- Реализовать поддержку распределённых транзакции и обернуть транзакцией группу сообщений;
- Передавать все данные одним сообщением.
Второй вариант показался в реализации проще и быстрее, на нём и остановились. Нам потребовалось создавать заглушки и передавать минимальную информацию о недостающих объектах. Пример, сообщения:
<office>
<lot>
<id>fs000000000is3ro0d708me9ms</id>
<code>2011-1.5-051-001</code>
<theme>Тема</theme>
<aims>Цель тестового лота</aims>
<auction>
<id>fs000000000gtk9f5oa05h426o</id>
<topic>Название аукциона</topic>
</auction>
</lot>
</office>
На примере видно, что сообщение содержит информацию о лоте и минимальную информацию об аукционе.
В итоге, использованное нами решение — передавать связанные объекты для сохранения полноты изменений -, стало хорошей альтернативой распределённым транзакциям. Идём дальше.
Наблюдение за процессом обмена.
Разберём эту задачу на примере процесса получения и регистрации результатов, который остаётся ключевым и для интеграции, и для офиса продаж. Схема процесса продемонстрирована на рисунке:
Процесс начинается в основной системе отправкой запроса на получение предложений по лоту, эта часть не представлена на схеме потому, что подобный маршрут уже обсуждался в предыдущем разделе. Далее, когда запрос приходит в удалённый офис, начинается сбор и проверка поданных покупателями заявки по лоту. Обрабатываются только полностью заполненные и проверенные заявки на момент поступления запроса от основной системы. Заявки делятся на фрагменты. Разделение на фрагменты позволяет использовать меньше памяти и ускорять процесс обработки. Когда пакет с заявками попадет в основную систему, начинается его разбор и создание на его основе сущностей основной системы (импорт). Мы не стали здесь экспериментировать с параллельностью, чтобы не сталкиваться с проблемами консистентности БД в параллельных транзакциях. Все длительные операции передачи и импорта заявок разбиты на фазы, каждая фаза завершается отправкой уведомления и пользователь имеет возможность наблюдать за процессом импорта.
Схема передачи заявок в паттернах EIP.
Пример настойки роутов:
from("jms:topic:bpms.office.request").routeId("bpms-request-order")
.filter( header("destination").isEqualTo( "office.order.request" ) )
.inOnly("jms:topic:bpms-to-office.order.request");
…
from("jms:queue:office.order.export").routeId("bpms-responce-order")
.log("going to bpms import: ${headers.JMSDestination}")
.wireTap("direct:order.audit")
.choice()
.when( header("customer").isEqualTo("bpms.import"))
.log("filling conumer trying recieve: ${headers.JMSDestination}, ${headers.importType}")
.inOnly("jms:queue:from.office.to.bpms.order.import")
.otherwise()
.to("log:office?multiline=true");
Процесс передачи запроса в целом такой же, как и в других кейсах. Новый шаблон, использованный в ней — "WireTap". Этот компонент позволяет добавить наблюдателя в процесс обмена сообщений, в нашем примере он пересылает пакет с заявкой в канал аудита. Пример:
from("direct:order.audit")
.split( xpath("//*[local-name()='demand']") )
.process( new Processor() {
@Override
public void process(Exchange item ) throws Exception {
Message in = item.getIn();
Message out = item.getOut();
out.setHeaders( in.getHeaders() );
out.setHeader("cliendId", in.getMessageId());
out.setHeader("level", "DEBUG");
out.setBody( String.format(
"Получена заявка №%sn " +
"Передаётся на:%sn " +
"Зарегистрированный callback:%snn",
xpath("//*[local-name()='fullNumber']/text()").evaluate(item, String.class),
in.getHeader("customer"), in.getHeader("callbackUUID")) );
}
})
.inOnly("jms:topic:system.audit");
Этот роут намного интереснее предыдущего, сообщения здесь — это заявки, сериализованные в XML. Мы разбиваем входящий пакет на отдельные заявки, используя специальный компонент splitter и xpath выражение. Далее нам уже не нужна вся заявка, поэтому оставляем только информацию, нужную для аудита, и пересылаем пакет в JMS канал общего аудита jms:topic:system.audit
. В этом канале накапливаются уведомления о состоянии передачи и импорта заявок, и обо всех нештатных ситуациях. Сообщения возвращаются в основную систему и связываются с инициатором получения результатов по свойству “callbackUUID”. Пример, роута возвращающего часть сообщений аудита в основную систему:
from("jms:topic:system.audit")
.filter( PredicateBuilder.and(
header("callbackUUID").isNotNull(),
header("fcntp.audit").isNull() ) )
.setHeader("system.audit", simple( "true", Boolean.class ))
.inOnly("fcntpJms:topic:fcntp.audit?timeToLive=10000");
Плюсы подхода:
- все сообщения накапливаются в одном канале и обрабатываются единообразно.
- требуется настроить только один канал — это значит меньше настроек в брокере сообщений.
Новые использованные шаблоны EIP:
- Wire Tap — маршрутизация копии сообщения;
- Splitter — компонент, который позволяет разделять большое сообщение на фрагменты.
Передача файлов.
Передача файлов — задача не очень простая, особенно если решать её средствами JMS. Разберём этот кейс. В требованиях к этой задаче ничего формализованного не было, нужно было передавать данные неограниченного объёма. В Camel-е не нашлось готового решения для того, чтобы перегнать файл с одного сервера на другой. Реализация разбиения файлов на фиксированные куски с последующей склейкой встречала ряд сложностей:
- надо было контролировать порядок фрагментов;
- реализовать собственный механизм восстановления файла из фрагментов;
- отладить передачу.
Мы решили пойти другим путём: в Camel копирование файлов между локальными папками реализовано просто и гибко, этот механизм нам подходил. Полная схема обработки файлов, следующая: файл кладётся одной системой в сетевую папку, Camel находит его и копирует в другую папку, а после копирования сообщает основной системе, что файл был передан. Вот схема:
Для полноты картины — вот роут, который мы используем:
RouteDefinition fileExport = (RouteDefinition)from("file:{{office.transport.dir}}?delete=true&exclude=.*\.tmp")
.onException(IOException.class)
.maximumRedeliverise(1)
.handled(true)
.useOriginalMessage()
.wireTap("jms:topic:system.audit")
.transform(exceptionMessage())
.end()
.to("file:{{office.failed.dir}}").end()
.process( TimsetampProcsseor.newInstance("sendtimsetamp") )
.wireTap("jms:queue:office.files.import")
.newExchangeHeader("fileName", simple("${headers.CamelFileName}"))
.newExchangeBody( constant("progress") )
.end()
.to("file:{{bpms.transport.dir}}");
// notify bpms by topic:office.files.import
fileExport
.transform().constant("ok")
.delay( 20 )
.process( TimsetampProcessor.newInstance("sendtimsetamp") )
.setHeader("fileName", simple("${headers.CamelFileName}"))
// send notify
.inOnly("jms:queue:office.files.import");
Как видно на рисунке выше, файлы копируются в три этапа. На первом — файлы копируются из файлового хранилища в транспортную папку офиса продаж. Маршрутизацию в сервисной шине поясняет роут, он запускается только тогда, когда файлы становятся доступны в транспортной папке подсистемы удаленного офиса. Сервисная шина постоянно сканирует эту папку и, как только файл туда попадает, сразу же перемещает его в транспортную папку основной системы. Далее сервисная шина создаёт уведомление о перемещении файла и отправляет его в очередь office.files.import
. В этом роуте мы используем механизм обработки исключений, он гарантирует, что если файл попал в транспортную папку офиса продаж, то основная система получит оповещение независимо от успеха или неудачи перемещения файла. Пора подводить итоги.
Итоги.
На примере одной из наших систем мы познакомились с основными сценариями интеграции. Все приведённые здесь примеры выдержки реального приложения и сейчас они нами используются. Наши примеры показывают, c какой простотой роуты Camel-а ложатся на паттерны интеграции. Приведённые в статье основные архитектурные задачи покрываются средствами, предлагаемыми Camel-ом. Даже такие сложные задачи как, например, копирование файлов под силу решать используя Camel. Напомню, что в статье делается акцент на интеграцию основанную на сообщениях. Рассчитываю, что вы, прочитав статью, смогли убедиться в том, что “Верблюд” действительно хорош.
Об ошибках и способах их решения я планирую написать в следующей статье (своего рода “Продолжение следует…”). Поэтому до новых встреч.
Полезные ссылки:
- Поддерживаемые Apache Camel компоненты и протоколы;
- Шаблоны интеграции
- Плезная литература по теме
- Исходные коды примеров из книги Camel in Action
Автор: coriollon