Архитектуры, управляемые событиями (Event Driven Architecture), в целом, и Apache Kafka, в частности, привлекли в последнее время большое внимание. Для реализации всех преимуществ архитектуры, управляемой событиями, механизм делегирования событий должен быть по своей сути асинхронным. Тем не менее, могут существовать некоторые особые сценарии/потоки использования, в которых требуется семантика Синхронного Запроса-Ответа. В этом выпуске показано, как реализовать "Запрос-Ответ" с помощью Apache Kafka.
Перевел @middle_java
Дата оригинальной статьи: 26 October 2018
Apache Kafka по своей природе асинхронна. Следовательно, семантика «Запрос-Ответ» для Apache Kafka не является естественной. Тем не менее эта задача не нова. Паттерн Интеграции Корпоративных приложений (Enterprise Integration Pattern) Request-Reply обеспечивает проверенный механизм синхронного обмена сообщениями по асинхронным каналам:
Паттерн Return Address (Адрес Возврата) дополняет паттерн Request-Reply механизмом указания запрашивающей стороной адреса, на который должен быть отправлен ответ:
Недавно в Spring Kafka 2.1.3 была добавлена поддержка из коробки паттерна «Request Reply», а в версии 2.2 были отполированы некоторые из его шероховатостей. Рассмотрим, как работает эта поддержка:
На стороне клиента: ReplyingKafkaTemplate
Хорошо известная абстракция Template (Шаблон) формирует в Spring основу для клиентской части механизма Request-Reply.
@Bean
public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate(
ProducerFactory < String, Request > pf,
KafkaMessageListenerContainer < String, Reply > lc) {
return new ReplyingKafkaTemplate < > (pf, lc);
}
Здесь все довольно прямолинейно: мы настраиваем ReplyingKafkaTemplate, который отправляет сообщения-запросы со String ключами и получает сообщения-ответы со String ключами. Вместе с тем ReplyingKafkaTemplate должен быть основан на ProducerFactory Запроса, ConsumerFactory Ответа и MessageListenerContainer с соответствующими конфигурациями консюмера и продюсера. Следовательно, необходимая конфигурация довольно развесиста:
@Value("${kafka.topic.car.reply}")
private String replyTopic;
@Bean
public Map < String, Object > consumerConfigs() {
Map < String, Object > props = new HashMap < > ();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
@Bean
public Map < String, Object > producerConfigs() {
Map < String, Object > props = new HashMap < > ();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory < String, Request > requestProducerFactory() {
return new DefaultKafkaProducerFactory < > (producerConfigs());
}
@Bean
public ConsumerFactory < String, Reply > replyConsumerFactory() {
return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(),
new JsonSerializer < Reply > ());
}
@Bean
public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() {
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties);
}
В этом случае использование replyKafkaTemplate для отправки синхронного запроса и получения ответа выглядит следующим образом:
@Value("${kafka.topic.car.request}")
private String requestTopic;
@Value("${kafka.topic.car.reply}")
private String replyTopic;
@Autowired
private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate;
...
RequestReply request = RequestReply.request(...);
//создаем producer record
ProducerRecord < String, Request > record = new ProducerRecord < String, Request > (requestTopic, request);
// устанавливаем топик для ответа в заголовке
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
// отправляем запрос в топик Kafka и асинхронно получаем ответ в указанный топик для ответа
RequestReplyFuture < String, Request, Reply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record);
sendAndReceive.addCallback(new ListenableFutureCallback < ConsumerRecord < String, Reply >> () {
@Override
public void onSuccess(ConsumerRecord < String, Reply > result) {
// получаем значение consumer record
Reply reply = result.value();
System.out.println("Reply: " + reply.toString());
}
});
Здесь также много бойлерплейта и низкоуровневого API, да еще этот устаревший API ListenableFuture вместо современной CompletableFuture.
requestReplyKafkaTemplate заботится о генерации и установке заголовка KafkaHeaders.CORRELATION_ID, но мы должны явно задать заголовок KafkaHeaders.REPLY_TOPIC для запроса. Обратите также внимание, что этот же топик для ответа был излишне заинжектен выше в replyListenerContainer. Гадость какая-то. Не совсем то, чего я ожидал от абстракции Spring.
Серверная сторона: @SendTo
На стороне сервера обычный KafkaListener, прослушивающий топик для запроса, дополнительно декорирован аннотацией @SendTo, чтобы предоставить сообщение-ответ. Объект, возвращаемый методом слушателя, автоматически оборачивается (wrapped) в ответное сообщение, добавляется CORRELATION_ID и ответ публикуется в топике, указанном в заголовке REPLY_TOPIC.
@Bean
public Map < String, Object > consumerConfigs() {
Map < String, Object > props = new HashMap < > ();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
@Bean
public Map < String, Object > producerConfigs() {
Map < String, Object > props = new HashMap < > ();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ConsumerFactory < String, Request > requestConsumerFactory() {
return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(),
new JsonSerializer < Request > ());
}
@Bean
public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory < String, Request > factory =
new ConcurrentKafkaListenerContainerFactory < > ();
factory.setConsumerFactory(requestConsumerFactory());
factory.setReplyTemplate(replyTemplate());
return factory;
}
@Bean
public ProducerFactory < String, Reply > replyProducerFactory() {
return new DefaultKafkaProducerFactory < > (producerConfigs());
}
@Bean
public KafkaTemplate < String, Reply > replyTemplate() {
return new KafkaTemplate < > (replyProducerFactory());
}
Здесь также требуется некоторая конфигурация, но конфигурация слушателя проще:
@KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory")
@SendTo()
public Reply receive(Request request) {
Reply reply = ...;
return reply;
}
Но как насчет нескольких экземпляров консюмера?
Все вроде работает, пока мы не используем несколько экземпляров консюмера. Если у нас есть несколько экземпляров клиента, мы должны убедиться, что ответ отправляется в корректный экземпляр клиента. Документация Spring Kafka предполагает, что каждый консюмер может использовать уникальный топик или, что с запросом отправляется дополнительное значение заголовка KafkaHeaders.RESPONSE_PARTITION — четырехбайтовое поле, содержащее BIG-ENDIAN-представление целочисленного номера раздела. Использование раздельных топиков для разных клиентов явно не очень гибко, поэтому мы выбираем явную настройку REPLY_PARTITION. Тогда клиент должен знать, на какую партицию он назначен. В документации предлагается использовать явную конфигурацию для выбора конкретной партиции. Давайте добавим ее к нашему примеру:
@Value("${kafka.topic.car.reply.partition}")
private int replyPartition;
...
@Bean
public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() {
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition);
return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset);
}
private static byte[] intToBytesBigEndian(final int data) {
return new byte[] {
(byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff),
(byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff),
};
}
...
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition)));
RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record);
...
Не очень красиво, но это работает. Требуемая конфигурация обширна и API выглядит низкоуровнево. Необходимость явной настройки партиций усложняет процесс динамического масштабирования количества клиентов. Очевидно, можно сделать лучше.
Инкапсулирование обработки топика для ответа и партиции
Давайте начнем с инкапсуляции паттерна Return Address, передавая вместе топик для ответа и партицию. Топик для ответа должен быть заинжектен в RequestReplyTemplate и, следовательно, вообще не должен присутствовать в API. Когда речь идет о партициях для ответа, сделаем наоборот: извлечем партицию (партиции), назначенную слушателю топика для ответа, и передадим эту партицию автоматически. Это избавляет клиента от необходимости заботиться об этих заголовках.
При этом, давайте также сделаем API таким, чтобы он напоминал стандартный KafkaTemplate (перегрузим метод sendAndReceive() упрощенными параметрами и добавим соответствующие перегруженные методы, использующие настроенный по умолчанию топик):
public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > {
public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory,
GenericMessageListenerContainer < K, R > replyContainer) {
super(producerFactory, replyContainer);
}
private TopicPartition getFirstAssignedReplyTopicPartition() {
if (getAssignedReplyTopicPartitions() != null &&
getAssignedReplyTopicPartitions().iterator().hasNext()) {
TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Using partition " + replyPartition.partition());
}
return replyPartition;
} else {
throw new KafkaException("Illegal state: No reply partition is assigned to this instance");
}
}
private static byte[] intToBytesBigEndian(final int data) {
return new byte[] {
(byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff),
(byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff),
};
}
public RequestReplyFuture < K,
V,
R > sendAndReceiveDefault(@Nullable V data) {
return sendAndReceive(getDefaultTopic(), data);
}
public RequestReplyFuture < K,
V,
R > sendAndReceiveDefault(K key, @Nullable V data) {
return sendAndReceive(getDefaultTopic(), key, data);
}
...
public RequestReplyFuture < K,
V,
R > sendAndReceive(String topic, @Nullable V data) {
ProducerRecord < K, V > record = new ProducerRecord < > (topic, data);
return doSendAndReceive(record);
}
public RequestReplyFuture < K,
V,
R > sendAndReceive(String topic, K key, @Nullable V data) {
ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data);
return doSendAndReceive(record);
}
...
@Override
public RequestReplyFuture < K,
V,
R > sendAndReceive(ProducerRecord < K, V > record) {
return doSendAndReceive(record);
}
protected RequestReplyFuture < K,
V,
R > doSendAndReceive(ProducerRecord < K, V > record) {
TopicPartition replyPartition = getFirstAssignedReplyTopicPartition();
record.headers()
.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes()))
.add(new RecordHeader(KafkaHeaders.REPLY_PARTITION,
intToBytesBigEndian(replyPartition.partition())));
return super.sendAndReceive(record);
}
}
Следующий шаг: Адаптируем ListenableFuture к более современной CompletableFuture.
public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > {
public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory,
GenericMessageListenerContainer < K, R > replyContainer) {
super(producerFactory, replyContainer);
}
public CompletableFuture < R > requestReplyDefault(V value) {
return adapt(sendAndReceiveDefault(value));
}
public CompletableFuture < R > requestReplyDefault(K key, V value) {
return adapt(sendAndReceiveDefault(key, value));
}
...
public CompletableFuture < R > requestReply(String topic, V value) {
return adapt(sendAndReceive(topic, value));
}
public CompletableFuture < R > requestReply(String topic, K key, V value) {
return adapt(sendAndReceive(topic, key, value));
}
...
private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) {
CompletableFuture < R > completableResult = new CompletableFuture < R > () {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = requestReplyFuture.cancel(mayInterruptIfRunning);
super.cancel(mayInterruptIfRunning);
return result;
}
};
// добавим коллбек к результату отправки запроса
requestReplyFuture.getSendFuture().addCallback(new ListenableFutureCallback < SendResult < K, V >> () {
@Override
public void onSuccess(SendResult < K, V > sendResult) {
// NOOP
}
@Override
public void onFailure(Throwable t) {
completableResult.completeExceptionally(t);
}
});
// добавим коллбек к ответу
requestReplyFuture.addCallback(new ListenableFutureCallback < ConsumerRecord < K, R >> () {
@Override
public void onSuccess(ConsumerRecord < K, R > result) {
completableResult.complete(result.value());
}
@Override
public void onFailure(Throwable t) {
completableResult.completeExceptionally(t);
}
});
return completableResult;
}
}
Упакуем это в утилитную библиотеку и теперь у нас есть API, который намного больше соответствует основной философии проектирования Spring «Соглашение над Конфигурацией» («Convention over Configuration»). Вот итоговый код клиента:
@Autowired
private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate;
...
requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - >
System.out.println("Reply: " + reply.toString());
);
Подводим итоги
Подводя итог, Spring для Kafka 2.2 обеспечивает полностью функциональную реализацию паттерна Request-Reply в Apache Kafka, но API по-прежнему имеет некоторые шероховатости. В этом выпуске мы увидели, что некоторые дополнительные абстракции и адаптации API могут дать более логичный высокоуровневый API.
Предупреждение 1:
Одним из главных преимуществ архитектуры, управляемой событиями, является разделение (decoupling) продюсеров и консюмеров событий, что позволяет создавать гораздо более гибкие и эволюционирующие системы. Использование синхронной семантики «Запрос-Ответ» является полной противоположностью, когда запрашивающая и отвечающая стороны сильно связаны между собой. Следовательно, ее следует использовать только в случае необходимости.
Предупреждение 2:
Если требуется синхронный Запрос-Ответ, то протокол на основе HTTP намного проще и эффективнее, чем использование асинхронного канала типа Apache Kafka.
Тем не менее, могут быть сценарии, когда синхронный Запрос-Ответ через Kafka имеет смысл. Разумно выбирайте лучший инструмент для работы.
Полностью рабочий пример можно найти на сайте github.com/callistaenterprise/blog-synchronous-kafka.
Комментарии
Federico • 7 месяцев назад
А когда у нас есть гибридные потребности, допустим, в 50% кейсов нам нужен Запрос-Ответ и в 50% нам нужно событийное управление? Как нам это сделать? Конфигурация, необходимая Spring Kafka, выглядит довольно ужасно.
Jehanzeb Qayyum • 6 месяцев назад
Теперь Spring имеет дефолтную поддержку с использованием партиций в одном общем топике для ответа.
Начиная с версии 2.2, шаблон пытается определить топик для ответа или партицию из сконфигурированного контейнера ответа (reply container).
https://docs.spring.io/spring-kafka/reference/html/#replying-template
nir rozenberg • 8 месяцев назад
Привет,
В последнем абзаце вы написали, что могут быть сценарии, когда синхронный Запрос-Ответ через Kafka имеет смысл по сравнению с HTTP.
Можно привести примеры таких сценариев?
Спасибо,
Nir
Janne Keskitalo nir rozenberg • 8 месяцев назад
Мы собираемся разделить систему обработки транзакций большого объема на несколько микросервисов и у меня есть идея использовать обмен сообщениями Kafka «Запрос-Ответ» для достижения похожих способов обработки (processing affinity). В основном Kafka используется для маршрутизации всех вызовов одного клиента в один и тот же процесс обработчика транзакций, который затем последовательно их выполняет по одному. Такой вид обработки гарантирует линеаризируемость (https://stackoverflow.com/a/19515375/7430325), причинно-следственную связность, а также позволяет эффективное кэширование. По сути, усилия по координации были бы перенесены из базы данных в Kafka и мы могли бы запустить базу данных в строгом режиме изоляции Serializable.
Мне еще предстоит углубиться в детали нашей семантики транзакций, чтобы увидеть, где здесь не дотягивает, так что это пока просто идея.
Перевел @middle_java
Автор: TorinoSM