Типичная ситуация: один сервис добавил сообщение в очередь, другой сервис его прочитал, но из-за ошибки или рестарта сервиса не смог передать дальше. Сообщение потеряно.
В логах при этом не всегда есть что-то полезное, и часто всё заканчивается тем, что разработчики идут ругаться: «Почему в нашей очереди нет вашего сообщения?»
Такие кейсы почти никогда не отлавливаются во время тестирования, и юнит-тесты тут тоже не помогают, поэтому проблема часто всплывает уже на проде.
При этом JMS API позволяет выбирать такой подход к обработке сообщений, который позволит избежать потери сообщения в случае ошибки или рестарта сервиса.
Как это работает?
Популярные брокеры сообщений, такие как Apache Kafka, ActiveMQ, Oracle AQ и другие, поддерживают формат JMS, а он позволяет использовать разные режимы подтверждения обработки сообщений. Вот какие подходы можно использовать:
AUTO_ACKNOWLEDGE (по умолчанию)
-
Сообщение автоматически удаляется из очереди после успешного прочтения слушателем.
-
Это наиболее распространённый режим для стандартных случаев.
CLIENT_ACKNOWLEDGE
-
Сообщение остаётся в очереди, пока вы явно не вызовете метод Message.acknowledge() в коде.
-
Используется, если приложение должно самостоятельно управлять подтверждением.
DUPS_OK_ACKNOWLEDGE
-
Сообщение подтверждается с некоторой задержкой. Возможны повторные доставки в случае сбоя.
-
Обычно используется для сценариев, где допустима некоторая избыточность. Этот режим я не буду рассматривать, т.к. он подходит для более специфических случаев.
TRANSACTED
-
Сообщение удаляется только после успешного завершения транзакции.
-
Если транзакция откатывается, сообщение снова становится доступным для слушателей очереди (redelivery).
Соответственно, по умолчанию брокер будет сразу удалять сообщение после прочтения сервисом. Но он может также оставлять его в очереди до завершения транзакции или дожидаться явного подтверждения от сервиса для того, чтобы удалить сообщение. При этом большинство брокеров обеспечивают защиту от повторного прочтения сообщения другим потоком, пока транзакция не была завершена, либо не было получено подтверждение от клиента.
У каждого брокера есть свои особенности работы и важно следить за настройками, чтобы избежать повторного прочтения сообщений до завершения транзакции (такое возможно, например, если в kafka режим read_committed для транзакций был изменён на read_uncommitted).
Какой подход к обработке сообщений выбрать?
Давайте разберём, как это работает, на примере сервиса на Spring Boot, который занимается формированием документов.
Пусть в очередь ActiveMQ Artemis попадают сообщения с набором параметров для формирования документа. Сервис получает сообщение из ActiveMQ, формирует документ и кладёт его в хранилище. Для сервиса важно обеспечить доставку документа в хранилище.
Предположим, для ActiveMQ используется такая конфигурация:
@Configuration
public class JmsConfig {
@Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory();
}
@Bean
public JmsListenerContainerFactory<DefaultMessageListenerContainer> jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
За обработку сообщений отвечает метод, помеченный аннотацией: @JmsListener(destination = "queue.example", containerFactory = "jmsListenerContainerFactory")
. Этот метод читает сообщение, формирует документ и отправляет документ в хранилище.
В конфигурации не был указан acknowledgement mode, транзакции не были включены. Значит, будут применены настройки по умолчанию, а это режим AUTO_ACKNOWLEDGE. Это значит, что, как только сервис получит сообщение, оно будет удалено из очереди. Если сервис рестартует в момент формирования документа, документ не попадёт в хранилище, а сообщение не вернётся в очередь.
Транзакции
Что произойдёт, если для jmsListenerContainerFactory
был установлен транзакционный режим?
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionTransacted(true);
return factory;
}
Транзакция откатится, если метод, помеченный аннотацией @JmsListener выбросит исключение. В случае рестарта сервиса транзакция также завершится, по таймауту.
Ситуация с исключением требует дополнительных уточнений. Часто ошибки обрабатываются без выброса исключений, например, так:
@JmsListener(destination = "${spring.queue.example}", containerFactory = "jmsListenerContainerFactory")
public void onMessage(TextMessage message) {
try {
//реализация логики, передача сообщения дальше
} catch (Exception e) {
log.error("Error", e);
}
}
В этом случае, несмотря на транзакции, сообщение не вернётся в очередь. С точки зрения JMS метод завершил свою работу успешно.
Значит, для того, чтобы транзакция откатилась, в случае ошибки нужно выбрасывать исключение. Правда, если ошибка возникла, например, из-за неверного формата сообщения, то возвращать его в очередь не имеет смысла, поэтому лучше всего сначала отлавливать такие ошибки и обрабатывать их без выброса исключения.
Также, если метод запускает какую-то задачу в другом потоке или асинхронно, то транзакция также не откатится, даже если задача была завершена неуспешно. Транзакция считается успешно завершённой, ведь метод завершился без исключения.
Важный момент при работе с транзакциями — таймаут. В ActiveMQ это настройка transaction timeout, и по умолчанию это 5 минут. Это значит, что, если ваш метод не завершил транзакцию в течение 5 минут, то транзакция откатится брокером, и сообщение будет снова доступно для чтения.
Что будет, если наш сервис будет выполнять формирование какого-то документа 10 минут? За это время транзакция откатится, сообщение снова станет доступно для чтения. При этом, если транзакция откатывается из-за истечения таймаута, и сервис всё-таки завершает формирование документа спустя время, действия сервиса больше не будут связаны с брокером сообщений и транзакцией. Транзакции-то нет, она откатилась.
Получается, если какой-то документ формируется больше пяти минут, то сообщение возвращается в очередь, а хранилище заполняется одинаковыми документами до тех пор, пока не будет превышен лимит попыток повторной доставки сообщения.
CLIENT_ACKNOWLEDGE
Хотя таймаут можно настраивать на уровне брокера, и также есть способы настраивать таймаут на уровне клиента, транзакции плохо подходят для выполнения задач, требующих неопределённо долгого времени выполнения.
Поэтому для сервиса, который делает тяжёлую работу и подолгу формирует документы, лучше отказаться от транзакций и использовать подход CLIENT_ACKNOWLEDGE:
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
В этом случае сообщение не будет удалено из очереди до того момента, пока клиент не вызовет метод message.acknowledge()
. При этом CLIENT_ACKNOWLEDGE не мешает обработке сообщений в несколько потоков: хотя сообщение остаётся в очереди, оно находится в статусе "неподтверждённое". В этом статусе оно недоступно для других клиентов для чтения.
Также, хотя при подходе CLIENT_ACKNOWLEDGE не используется таймаут, сообщения всё равно возвращаются в очередь при рестарте сервиса. Дело в том, что брокер использует heartbeat для того, чтобы убедиться, что соединение поддерживается. Это значит, что, например, раз в 60 секунд брокер будет проверять, что соединение живо. Как только выясняется, что соединение разорвано, например, из-за рестарта сервиса, все неподтверждённые сообщения снова становятся доступными для чтения другими слушателями.
Вот пример, в котором в случае ошибки сообщение остаётся неподтверждённым:
@JmsListener(destination = "${spring.queue.example}", containerFactory = "jmsListenerContainerFactory")
public void onMessage(Message message) {
try {
//логика
message.acknowledge();
} catch (Exception e) {
log.error("Error", e);
// Сообщение останется неподтверждённым
}
}
Что здесь происходит в случае ошибки? Так как в результате работы метода не было выброшено исключение и не был вызван метод acknowledge()
, сообщение останется неподтверждённым. Оно сохранится в очереди в таком статусе до тех пор, пока соединение не будет разорвано, при этом другие сообщения будут обрабатываться.
Если в результате работы метода было выброшено исключение, сообщение снова становится доступным для чтения:
@JmsListener(destination = "${spring.queue.example}", containerFactory = "jmsListenerContainerFactory")
public void onMessage(Message message) {
try {
//логика
message.acknowledge();
} catch (Exception e) {
log.error("Error", e);
//Сообщение возвращается в очередь
throw new RuntimeException(e);
}
}
Итог:
-
При стандартных настройках сообщения удаляются из очереди сразу после прочтения. Это значит, что, если произошёл сбой или рестарт сервиса во время обработки сообщения, сообщение будет потеряно.
-
Для того, чтобы гарантировать обработку сообщений, можно использовать транзакции либо подход CLIENT_ACKNOWLEDGE. Важно следить за тем, как завершает свою работу метод в случае ошибки: с выбросом исключения или без. Особенно это актуально, если используются транзакции. В случае, если исключение было отловлено в блоке
catch
, а новое исключение не было выброшено, транзакция завершится успешно, и сообщение удалится из очереди. -
Если с момента прочтения сообщения до завершения работы метода может пройти неопределённо долгое время, как в случае сервиса, который формирует тяжёлые документы, лучше использовать подход CLIENT_ACKNOWLEDGE и отказаться от транзакций. В этом случае сообщения не будут возвращаться в очередь по таймауту, что позволит избежать неприятных ошибок.
Автор: progrbobr