В этом посте я хочу показать как использовать отложенные сообщения в RabbitMQ. В качестве примера задачи, где удобно использовать отложенную очередь, возьму механизм постбеков (s2s ping back, s2s pixel).
В двух словах о механизме постбеков:
1. Происходит некое событие
2. Ваше приложение должно уведомить об этом событии сторонний сервис
3. Если сторонний сервис оказался недоступен, то необходимо повторить уведомление еще раз через несколько минут
Для повторного уведомления я буду использовать отложенную очередь.
RabbitMQ по умолчанию не умеет задерживать сообщения, они доставляются сразу после публикации. Функционал отложенной доставки доступен в виде плагина rabbitmq-delayed-message-exchange.
Сразу хочу отметить, что плагин экспериментальный. Не смотря на то, что в целом он достаточно стабилен, использовать в продакшене его нужно на свой страх и риск.
Собираем Docker контейнер с RMQ и плагином
За основу я возьму официальный образ с management plugin, пригодится для тестирования.
Dockerfile:
FROM rabbitmq:3.6-management
RUN apt-get update && apt-get install -y curl
RUN curl http://www.rabbitmq.com/community-plugins/v3.6.x/rabbitmq_delayed_message_exchange-0.0.1.ez > $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-0.0.1.ez
RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange
docker build --tag=x25/rmq-delayed-message-exchange .
docker run -d --name rmq -p 5672:5672 -p 15672:15672 x25/rmq-delayed-message-exchange
Spring AMQP
Spring Framework полностью поддерживает плагин в проекте spring-rabbit
. Начиная с версии 1.6.4 можно пользоваться как xml/bean конфигурациями так и аннотациями.
Я буду использовать Spring Boot Amqp Starter.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
compile "org.springframework.boot:spring-boot-starter-amqp"
Конфигурация через аннотации
При использовании бутстраппера и аннотаций Spring берет большую часть работы на себя. Достаточно лишь написать:
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME),
exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME))
public void onMessage(Message<?> message) {
//...
}
И при запуске приложения RabbitAdmin автоматически объявит delayed exchange
, queue
, создаст обработчики событий и привяжет их к аннотированному методу.
Нужно больше потоков для обработки сообщений? Это настраивается через файл внешней конфигурации (свойство spring.rabbitmq.listener.concurrency) или через параметр containerFactory у аннотации:
//Создаем конфигурацию:
@Configuration
public class RabbitConfiguration {
@Bean(name = "containerFactory")
@Autowired
public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(10);
factory.setPrefetchCount(30);
return factory;
}
}
//Добавляем параметр:
@RabbitListener(containerFactory = "containerFactory", ...)
Для отправки отложенного сообщения удобно использовать RabbitTemplate:
rabbitTemplate.send(
DELAY_EXCHANGE_NAME,
DELAY_QUEUE_NAME,
MessageBuilder
.withBody(data)
.setHeader("x-delay", MESSAGE_DELAY_MS).build()
);
Отправлено оно будет моментально, но доставлено будет с задержкой, указанной в заголовке x-delay
. Максимально допустимое время задержки (2^32-1) мс.
Готовый пример приложения:
@SpringBootApplication
public class Application {
private final Logger log = LoggerFactory.getLogger(Application.class);
private final static String DELAY_QUEUE_NAME = "delayed.queue";
private final static String DELAY_EXCHANGE_NAME = "delayed.exchange";
private final static String DELAY_HEADER = "x-delay";
private final static String NUM_ATTEMPT_HEADER = "x-num-attempt";
private final static long RETRY_BACKOFF = 5000;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME),
exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME))
public void onMessage(Message<byte[]> message) {
String endpointUrl = new String(message.getPayload());
Long numAttempt = (Long) message.getHeaders().getOrDefault(NUM_ATTEMPT_HEADER, 1L);
log.info("Message received, url={}, attempt={}", endpointUrl, numAttempt);
if (!doNotifyEndpoint(endpointUrl) && numAttempt < 3) {
rabbitTemplate.send(
DELAY_EXCHANGE_NAME,
DELAY_QUEUE_NAME,
MessageBuilder
.withBody(message.getPayload())
.setHeader(DELAY_HEADER, numAttempt * RETRY_BACKOFF)
.setHeader(NUM_ATTEMPT_HEADER, numAttempt + 1)
.build()
);
}
}
private boolean doNotifyEndpoint(String url) {
try {
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
/* @todo: set up connection timeouts */
return (connection.getResponseCode() == 200);
} catch (Exception e) {
log.error(e.getMessage());
return false;
}
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
listener:
prefetch: 10
concurrency: 50
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.2.RELEASE")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
jar {
baseName = 'rmq-delayed-demo'
version = '0.1.0'
}
repositories {
mavenCentral()
}
sourceCompatibility = 1.8
targetCompatibility = 1.8
dependencies {
compile("org.springframework.boot:spring-boot-starter-amqp")
testCompile("org.springframework.boot:spring-boot-starter-test")
}
Проверяем отложенную доставку через панель управления (rmq-management), она доступна на порту 15672:
Логи:
2016-12-21 14:27:25.927: Message received, url=http://localhost:1234, attempt=1
2016-12-21 14:27:25.941: Connection refused (Connection refused)
2016-12-21 14:27:30.946: Message received, url=http://localhost:1234, attempt=2
2016-12-21 14:27:30.951: Connection refused (Connection refused)
2016-12-21 14:27:40.954: Message received, url=http://localhost:1234, attempt=3
Конфигурация через XML
При использовании XML конфигураций нужно просто установить у exchange-бина свойство delayed
в true
и RabbitAdmin сделает все остальное за вас.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<int:channel id="to-delayed-rmq" />
<int-amqp:outbound-channel-adapter channel="to-delayed-rmq"
amqp-template="rabbitTemplate"
exchange-name="delayed.exchange"
routing-key="delayed.binding"
mapped-request-headers="x-delay" />
<int-amqp:inbound-channel-adapter channel="from-delayed-rmq-queue"
queue-names="delayed.queue"
message-converter="amqpMessageConverter"
connection-factory="rabbitConnectionFactory"
concurrent-consumers="10"
prefetch-count="50" />
<int:service-activator input-channel="from-delayed-rmq-queue" method="onMessage">
<bean id="postbackServiceActivator" class="PostbackServiceActivator" />
</int:service-activator>
<rabbit:queue name="delayed.queue" />
<rabbit:direct-exchange name="delayed.exchange" delayed="true">
<rabbit:bindings>
<rabbit:binding queue="delayed.queue" key="delayed.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
Полезные ссылки
Автор: webaff