Доброго времени суток, %username%.
Скажу сразу, на 99% данный пост про Java EE Connector Architecture, с примерами кода. Откуда взялся 1% про Fidonet вы поймете в самом конце.
Минимальный пакет для входящего соединения — 4 класса, для исходящего — 8 классов и настройка адаптера на стороне сервера приложений.
Дальше — только подробности и боль
Для начала — история вопроса и решение бизнес-задачи.
Постановка задачи
Мне поставили задачу об интеграции существующей бизнес-системы («система А» ) с другой системой, которая была разработана много лет назад и понимает только собственный протокол передачи данных («система B»). Модифицировать чужие системы нельзя, соответственно задача свелась к написанию некой шины/прокси. Интеграция состоит в передаче туда-сюда сообщений с конвертацией их в процессе из одного формата в другой.
Система «А» имела много современных механизмов интеграции, самым простым для использования были признаны веб-сервисы. Под это дело был оперативно запилен стандартный интеграционный скелет для JEE — JAX-WS+EJB+JMS для гарантированной доставки сообщения.
А вот для работы с системой «B» стандартных средств не было. Робкие попытки поработать с сетью из контекста EJB успехом не увенчались, гугл подсказал два варианта решения проблемы: костылить сервлеты для работы с non-http или написать JCA-адаптер. Понятно, что был выбран второй путь — с JCA я до этого не работал, а узнать что-то новое всегда интересно.
Исследование
Начав копать гугл, я достаточно сильно обломался. Везде писали, ЧТО именно нужно сделать ( коннектор, менеджер, адаптер итд ), но почти нигде не писали, КАК это сделать. Стандартный способ «посмотреть чужой код и понять процесс» дал сбой — чужого кода был такой мизер, что понять что-то мне не представлялось возможным.
Спасли меня две вещи: JSR 322 и единственный нагугленный адаптер на google code. Собственно, это и стало отправной точкой — задеплоив примеры из jca-sockets и открыв pdf, я начал разбираться и путем научного тыка понимать, как оно собственно работает.
Потратив около 16 часов на исследование и эксперименты, я выяснил следующее:
JCA-модуль имеет внутри себя две независимых части: «Входящие» и «Исходящие». Эти части могут быть как вместе, так и по-отдельности. Более того, их может быть несколько. Сам модуль регистритуется классом, реализующим javax.resource.spi.ResourceAdapter и указанным в META-INF/ra.xml, при этом ResourceAdapter нужен в первую очередь для работы с Входящими; Для Исходящих адаптер ничего не делает и его скелет можно даже не заполнять.
Входящие
Входящий канал биндится к MessageEndpoint'у ( обычно это @MessageDrivenBean; да-да, JCA это кишки JMS ) и активируется ActivationSpec'ом.
META-INF/ra.xml — описание ResourceAdapter'а и inbound потоков
<connector xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/connector_1_7.xsd"
version="1.7" metadata-complete="true">
<vendor-name>kreon-services</vendor-name>
<eis-type>FidoNet</eis-type>
<resourceadapter-version>2.5</resourceadapter-version>
<resourceadapter>
<!-- Класс, который реализует javax.resource.spi.ResourceAdapter; config-property - поля, должны быть доступны через геттеры/сеттеры -->
<resourceadapter-class>in.fidonode.binkp.ra.BinkpServerResourceAdapter</resourceadapter-class>
<config-property>
<config-property-name>version</config-property-name>
<config-property-type>java.lang.String</config-property-type>
<config-property-value>jnode-jee 2.5 binkp/1.1</config-property-value>
</config-property>
<!-- Описание входящего потока -->
<inbound-resourceadapter>
<messageadapter>
<messagelistener>
<!-- Интерфейс, который должен реализовать @MessageDrivenBean для того, чтоб получать сообщения с этого адаптера -->
<messagelistener-type>in.fidonode.binkp.ra.BinkpMessageListener</messagelistener-type>
<activationspec>
<!-- Класс-холдер параметров, которая передается через @ActivationConfigProperty, должны быть доступны геттеры и сеттеры параметров -->
<activationspec-class>in.fidonode.binkp.ra.BinkpActivationSpec</activationspec-class>
<!-- Список обязательных параметров -->
<required-config-property>
<config-property-name>listenPort</config-property-name>
</required-config-property>
<!-- Описание параметров параметров -->
<config-property>
<config-property-name>listenPort</config-property-name>
<config-property-type>java.lang.Integer</config-property-type>
<config-property-value>24554</config-property-value>
</config-property>
</activationspec>
</messagelistener>
</messageadapter>
</inbound-resourceadapter>
</resourceadapter>
</connector>
Интерфейс BinkpMessageListener — для клиентов и должен быть в classpath;
Приведу его тут:
public interface BinkpMessageListener {
public void onMessage(FidoMessage message);
}
Теперь рассмотрим простейшую реализацию ResourceAdapter
public class BinkpServerResourceAdapter implements ResourceAdapter, Serializable {
private static final long serialVersionUID = 1L;
private static Logger log = Logger.getLogger(BinkpServerResourceAdapter.class
.getName());
private ConcurrentHashMap<BinkpActivationSpec, BinkpEndpoint> activationMap =
new ConcurrentHashMap<BinkpActivationSpec, BinkpEndpoint>();
private BootstrapContext ctx;
private String version;
@Override
public void endpointActivation(MessageEndpointFactory endpointFactory,
ActivationSpec spec) throws ResourceException {
BinkpEndpoint activation = new BinkpEndpoint(ctx.getWorkManager(),
(BinkpActivationSpec) spec, endpointFactory);
activationMap.put((BinkpActivationSpec) spec, activation);
activation.start();
log.info("endpointActivation(" + activation + ")");
}
@Override
public void endpointDeactivation(MessageEndpointFactory endpointFactory,
ActivationSpec spec) {
BinkpEndpoint activation = activationMap.remove(spec);
if (activation != null)
activation.stop();
log.info("endpointDeactivation(" + activation + ")");
}
@Override
public void start(BootstrapContext ctx)
throws ResourceAdapterInternalException {
this.ctx = ctx;
log.info("start()");
}
@Override
public void stop() {
for (BinkpEndpoint act : activationMap.values()) {
act.stop();
}
activationMap.clear();
log.info("stop()");
}
@Override
public XAResource[] getXAResources(ActivationSpec[] arg0)
throws ResourceException {
return null;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
}
Что тут происходит? При загрузке JCA-модуля создается экземпляр класса BinkpServerResourceAdapter, у него заполняются параметры ( в данном случае — поле version) и вызывается метод start().
На самом деле, внутри метода start() можно делать много всего, но в данном примере мы просто сохраняем контекст для получения из него в дальнейшем WorkManager'а.
Когда сервер приложений находит @MessageDrivenBean, он пытается найти адаптер, который отправляет сообщения на тот интерфейс, который реализует бин. Для JMS это MessageListener, у нас это BinkpMessageListener. Создается ActivationSpec ( у нас это BinkpActivationSpec, реализующий javax.resource.spi.ActivationSpec), поля в котором заполняются согласно данным в activationConfig, создается MessageEndpointFactory и вызывается ResourceAdapter.endpointActivation(). В этой функции необходимо создать тот «сервер», который будет принимать входящие соединения, будь то tcp/ip сервер или поток для работы с unix-socket, создать на основе того конфига который был в MDB. Класс BinkpEndpoint — это и есть тот самый «сервер».
public class BinkpEndpoint implements Work, FidoMessageListener {
private static final Logger logger = Logger.getLogger(BinkpEndpoint.class
.getName());
private BinkpServer server;
private final WorkManager workManager;
private final MessageEndpointFactory messageEndpointFactory;
public BinkpEndpoint(WorkManager workManager,
BinkpActivationSpec activationSpec,
MessageEndpointFactory messageEndpointFactory) {
this.workManager = workManager;
this.messageEndpointFactory = messageEndpointFactory;
server = new BinkpServer(activationSpec.getListenPort(), this);
}
public void start() throws ResourceException {
workManager.scheduleWork(this);
}
public void stop() {
if (server != null) {
server.stop();
}
}
/** из FidoMessageListener **/
@Override
public Message incomingMessage(FidoMessage message) {
String message = msg.encode();
BinkpMessageListener listener = (BinkpMessageListener) messageEndpointFactory
.createEndpoint(null);
listener.onMessage(message);
}
/** из Work **/
@Override
public void run() {
server.start();
}
/** из Work **/
@Override
public void release() {
stop();
}
}
Можно заметить, что везде фигурируют некие endpoint'ы. У меня был с этим некоторый затык, поэтому расшифрую:
Endpoint — это то, что слушает «входящий» поток. Именно к нему относятся функции endpointActication
MessageEndpoint — экземпляр MDB, который обрабатывает то или иное сообщение. Получается вызовом MessageEndpointFactory.createEndpoint() ( Эту функцию нельзя вызывать из основного треда ). Он легко кастится к интерфейсу MDB.
Собственно, все. Реализацию BinkpServer за ненадобностью опущу, но принцип должен быть понятен, минимальный «Входящий» JCA делается из четырех классов ( ResourceAdapter, MessageListener, ActivationSpec, Endpoint )
Создание Endpoint'а и обработка входящих:
@MessageDriven(messageListenerInterface = BinkpMessageListener.class,
activationConfig = { @ActivationConfigProperty(propertyName = "listenPort", propertyValue = "24554") })
public class ReceiveMessageBean implements BinkpMessageListener {
@Override
public void onMessage(FidoMessage msg) {
// do smth with mesaage
}
}
Исходящие
А вот тут — все веселее, минимальный «Исходящий» JCA делается аж из 8 классов, что в 2 раза больше чем «Входящий». Но давайте по-порядку.
META-INF/ra.xml — описание ResourceAdapter'а и outbound потоков
<connector xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/connector_1_7.xsd"
version="1.7" metadata-complete="true">
<vendor-name>kreon-services</vendor-name>
<eis-type>FidoNet</eis-type>
<resourceadapter-version>2.5</resourceadapter-version>
<resourceadapter>
<!-- Класс, который реализует javax.resource.spi.ResourceAdapter; config-property - поля, должны быть доступны через геттеры/сеттеры -->
<resourceadapter-class>in.fidonode.binkp.ra.BinkpServerResourceAdapter</resourceadapter-class>
<config-property>
<config-property-name>version</config-property-name>
<config-property-type>java.lang.String</config-property-type>
<config-property-value>jnode-jee 2.5 binkp/1.1</config-property-value>
</config-property>
<!-- Описание исходящего потока. Всегда ПЕРЕД входящим -->
<outbound-resourceadapter>
<connection-definition>
<!-- Фабрика с которой будет работать JEE-сервер, она создает фабрику для создания соединений -->
<managedconnectionfactory-class>in.fidonode.binkp.ra.ManagedConnectionFactory</managedconnectionfactory-class>
<!-- Фабика соединений, которая будет отдаваться по запросу. Интерфейс должен быть у клиента в classpath -->
<connectionfactory-interface>in.fidonode.binkp.ra.ConnectionFactory</connectionfactory-interface>
<connectionfactory-impl-class>in.fidonode.binkp.ra.ConnectionFactoryImpl</connectionfactory-impl-class>
<!-- Соединение, которое будет отдавать фабрика соединений. Интерфейс должен быть у клиента в classpath -->
<connection-interface>in.fidonode.binkp.ra.Connection</connection-interface>
<connection-impl-class>in.fidonode.binkp.ra.ConnectionImpl</connection-impl-class>
</connection-definition>
<!-- Про транзакции и аутентификацию есть отдельный талмуд, я этот момент пропустил -->
<transaction-support>NoTransaction</transaction-support>
<reauthentication-support>false</reauthentication-support>
</outbound-resourceadapter>
<!-- Описание входящего потока -->
<inbound-resourceadapter>
<!-- ... -->
</inbound-resourceadapter>
</resourceadapter>
</connector>
Интерфейсы Connection и ConnectionFactory — для клиентов и должны быть в classpath. Сразу приведу их тут, там ничего интересного.
BinkpClient приводить не буду :-)
public interface Connection {
public BinkpClient connect(String hostname, int port);
}
public interface ConnectionFactory {
public Connection createConnection();
}
Соединения бывают Managed и Unmanaged. Первые — со свистелками, listener'ами и другим, вторые — без.
Класс, реализующий ManagedConnectionFactory, должен уметь создавать оба типа соединений.
public class ManagedConnectionFactory implements
javax.resource.spi.ManagedConnectionFactory {
private PrintWriter logwriter;
private static final long serialVersionUID = 1L;
/**
* Создание фабрики для unmanaged-соединений
*/
@Override
public Object createConnectionFactory() throws ResourceException {
return new ConnectionFactoryImpl();
}
/**
* Создание managed-фабрики для managed-connection
*/
@Override
public Object createConnectionFactory(ConnectionManager cxManager)
throws ResourceException {
return new ManagedConnectionFactoryImpl(this, cxManager);
}
/**
* Создание managed-соединения
*/
@Override
public ManagedConnection createManagedConnection(Subject subject,
ConnectionRequestInfo cxRequestInfo) throws ResourceException {
return new in.fidonode.binkp.ra.ManagedConnection();
}
@Override
public PrintWriter getLogWriter() throws ResourceException {
return logwriter;
}
@SuppressWarnings("rawtypes")
@Override
public ManagedConnection matchManagedConnections(Set connectionSet,
Subject subject, ConnectionRequestInfo cxRequestInfo)
throws ResourceException {
ManagedConnection result = null;
Iterator it = connectionSet.iterator();
while (result == null && it.hasNext()) {
ManagedConnection mc = (ManagedConnection) it.next();
if (mc instanceof in.fidonode.binkp.ra.ManagedConnection) {
result = mc;
}
}
return result;
}
@Override
public void setLogWriter(PrintWriter out) throws ResourceException {
logwriter = out;
}
}
Когда приложение запрашивает у JEE-сервера тот или иной коннектор, сервер приложений просит ManagedConnectionFactory создать ConnectionFactory и отдает его приложению.
Как можно заметить, ConnectionFactory тоже бывает Managed и Unmanaged. В принципе все это можно свести к одному классу, но это сильно зависит от того, что именно и как мы передаем, есть ли там транзакции итд.
ConnectionFactoryIml просто делает new ConnectionImpl(), а вот ManagedConnectionFactoryImpl чуть посложнее:
public class ManagedConnectionFactoryImpl implements ConnectionFactory {
private ManagedConnectionFactory factory;
private ConnectionManager manager;
public ManagedConnectionFactoryImpl(ManagedConnectionFactory factory,
ConnectionManager manager) {
super();
this.factory = factory;
this.manager = manager;
}
/** создает managed-соединение через родителя-ManagedConnectionFactory **/
@Override
public Connection createConnection() {
try {
return (Connection) manager.allocateConnection(factory, null);
} catch (ResourceException e) {
return null;
}
}
}
ManagedConnection, реализующий javax.resource.spi.ManagedConnection — это обертка для интерфейса Connection, которая как-раз добавляет свистелок и listener'ов. Именно этот класс возвращает ManagedConnectionFactory.createManagedConnection(), которую мы вызываем при создании соединения из ManagedConnectionFactoryImpl.createConnection() через ConnectionManager.allocateConnection()
public class ManagedConnection implements javax.resource.spi.ManagedConnection {
private PrintWriter logWriter;
private Connection connection;
private List<ConnectionEventListener> listeners;
public ManagedConnection() {
listeners = Collections
.synchronizedList(new ArrayList<ConnectionEventListener>());
}
@Override
public void associateConnection(Object connection) throws ResourceException {
if (connection != null && connection instanceof Connection) {
this.connection = (Connection) connection;
}
}
@Override
public Object getConnection(Subject subject,
ConnectionRequestInfo cxRequestInfo) throws ResourceException {
if (connection == null) {
connection = new ManagedConnectionImpl();
}
return connection;
}
@Override
public void cleanup() throws ResourceException {
}
@Override
public void destroy() throws ResourceException {
}
@Override
public PrintWriter getLogWriter() throws ResourceException {
return logWriter;
}
@Override
public ManagedConnectionMetaData getMetaData() throws ResourceException {
throw new NotSupportedException();
}
@Override
public XAResource getXAResource() throws ResourceException {
throw new NotSupportedException();
}
@Override
public LocalTransaction getLocalTransaction() throws ResourceException {
return null;
}
@Override
public void setLogWriter(PrintWriter out) throws ResourceException {
logWriter = out;
}
@Override
public void addConnectionEventListener(ConnectionEventListener listener) {
if (listener != null) {
listeners.add(listener);
}
}
@Override
public void removeConnectionEventListener(ConnectionEventListener listener) {
if (listener != null) {
listeners.remove(listener);
}
}
}
Ну вот, теперь мы подошли к самому простому — реализации соединения :-)
public class ConnectionImpl implements Connection {
@Override
public BinkpClient connect(String hostname, int port) {
return new BinkpClient(hostname, port);
}
}
Итоговая цепочка вызовов для установления исходящего соединения
ManagedConnectionFactory.createConnectionFactory()
->ManagedConnectionFactoryImpl.createConnection()
-->СonnectionManager.allocateConnection()
--->ManagedConnectionFactory.createManagedConnection()
---->ManagedConnection.getConnection()
----->ManagedConnectionImpl.connect()
Ну и не забудьте настроить сервер приложений для работы с этим адаптером, ну и jndi указать.
Код для вызова:
private BinkpClient createBinkpClient(String host, int port) {
ConnectionFactory cf = ((ConnectionFactory) new InitialContext().lookup("java:eis/BinkpConnectionFactory"));
Connection conn = cf.getConnection();
return conn.connect(host, port);
}
А причем тут Фидо?
А почти непричем. Дело в том, что изначальная задача была вовсе не о binkp, но она была рабочей, а значит попадала под NDA. Поэтому, разобравшись с JCA и решив что нужно написать статью на Хабре ( кстати, оглядываясь назад я начинаю понимать, почему никто такую статью еще не написал. И это еще без транзакций! ), я разморозил старую идею — форк jnode для JEE-серверов, для запуска ноды в виде одного ear. В свое время именно знаний JCA мне не хватило для того, чтобы запустить проект :-)
Под это все я написал вышеприведенные примеры, и они даже заработали. Так что если вы хотите потренироваться в java ee вообще и рефакторинге из java se в частности — пишите письма и коммитьте код. Да, пойнтов все еще беру.
Спасибо за внимание, оставайтесь с нами. Опечатки можно писать в комментариях, я даже не сомневаюсь что их тут десятки.
Автор: kreon