На Хабре имеется серия переводов официального руководства по RabbitMQ (1, 2, 3, 4, 5). К сожалению, в официальном руководстве не рассматривается вопрос организации отложенный сообщений, а я считаю этот вопрос весьма важным. Поэтому я решал сам написать такую статью.
Примеры кода будут на Перле, но никаких специфических для Перла деталей в коде не будет, поэтому примеры могут быть сравнительно легко адаптированы для любого другого языка.
Постановка задачи
Иногда нужно выполнить какую-либо задачу не «вот-прям-сию-секунду», а спустя некоторое время.
Например, у нас есть скрипт, который время от времени обращается к какому-нибудь API, и, если ответ не изменился, «зысыпает» на некоторое время, потом «просыпается» и снова проверяет.
Или, к примеру, мы сохранили временный файл и нам нужно завести таймер, чтобы удалить файл по истечении указанного времени.
В подобных случаях нам нужен механизм, позволяющий создать в RabbitMQ отложенное сообщение (если, конечно, мы хотим делать это средствами RabbitMQ).
К сожалению, а самом RabbitMQ нет готового механизма для публикации отложенных сообщений. Сообщения, публикуемые отправителями в RabbitMQ, доставляются получателям мгновенно. Конечно, получатель может быть не подключен к RabbitMQ, в этом случае сообщение будет доставлено после подключения, но если получатель подключен — доставка сообщения производится сразу.
Нельзя просто так опубликовать сообщение и сказать ему: «Полежи пока незаметно в уголочке, а через 10 минут вылезай и доставляйся получателю».
Поэтому возникает задача — как, с помощью RabbitMQ, организовать отложенные сообщения?
Решение
Для этого придется сделать обходной маневр. Ключевая идея такая — если сообщение, отправленное в очередь, немедленно доставляется слушающему эту очередь получателю, значит, нужно отправить это сообщение в другую очередь!
В общем и целом схема работы будет такая:
- Создаем обменник, в который будут отправляться отложенные сообщения
- Создаем очередь, в которой будут храниться отложенные сообщения
- Делаем переплет между очередью и обменником
- Настраиваем очередь так, чтобы сообщения, полежав в ней некоторое заданное время, отправлялись в обычный обменник, для немедленной доставки получателю
Получатель
Рассмотрим скрипт consumer_dlx.pl:
#!/usr/bin/perl
use strict;
use warnings;
use Net::AMQP::RabbitMQ;
my $mq = Net::AMQP::RabbitMQ->new();
my $user = 'guest';
my $password = 'guest';
my $exchange = 'myexchange';
my $queue = 'myqueue';
my $routing_key = 'mykey';
# Подключение
$mq->connect("localhost", {user => $user, password => $password});
# Канал
$mq->channel_open(1);
# Обменник
$mq->exchange_declare(1, $exchange, {exchange_type => 'direct'});
# Очередь
$mq->queue_declare(1, $queue);
# Переплет
$mq->queue_bind(1, $queue, $exchange, $routing_key);
# Подписка
$mq->consume(1, $queue);
# Второй комплект очередь-переплет-подписка
$mq->queue_declare(1, $queue.'2');
$mq->queue_bind(1, $queue.'2', $exchange, $routing_key.'2');
$mq->consume(1, $queue.'2');
# Получение сообщений (бесконечный цикл)
while ( my $msg = $mq->recv() ) {
print "$msg->{body} ($msg->{routing_key})n";
}
Я не буду заострять внимание на каждой строке этого скрипта, так как тут нет ничего нового для человека, прочитавшего вышеупомянутые статьи из руководства. Это вполне обычный получатель сообщений, нет даже никакой специфики, связанной с рассматриваемой темой — отложенными сообщениями. Получатель нужен лишь для демонстрации, вся соль будет в отправителе.
Отмечу лишь один момент:
Обратите внимание на то, что получатель создает и слушает две разные очереди, переплетенные с обменником двумя разными routing_key. В принципе, достаточно и одной очереди, но с двумя будет нагляднее, плюс это поможет далее продемонстрировать одну полезную возможность.
Отправитель
Теперь рассмотрим скрипт producer_dlx.pl:
#!/usr/bin/perl
use strict;
use warnings;
use Net::AMQP::RabbitMQ;
my $mq = Net::AMQP::RabbitMQ->new();
my $user = 'guest';
my $password = 'guest';
my $exchange = 'myexchange';
my $exchange_dlx = 'myexchange.dlx';
my $queue_dlx = 'myqueue.dlx';
my $message = $ARGV[0] || 'mymessage';
my $routing_key = $ARGV[1] || 'mykey';
my $expiration = $ARGV[2] || 0;
# Подключение
$mq->connect("localhost", {user => $user, password => $password});
# Канал
$mq->channel_open(1);
# Обменник
$mq->exchange_declare(1, $exchange, {exchange_type => 'direct'});
# Обменник dlx
$mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'fanout'});
# Очередь dlx
$mq->queue_declare(1, $queue_dlx, {}, {'x-dead-letter-exchange' => $exchange});
# Переплет
$mq->queue_bind(1, $queue_dlx, $exchange_dlx, $routing_key);
# Публикуем сообщение
$mq->publish(1, $routing_key , $message, {exchange => $exchange_dlx}, {expiration => $expiration});
Разберем отдельные участки кода.
# Обменник
$mq->exchange_declare(1, $exchange, {exchange_type => 'direct'});
Это тот же самый обменник, который используется в получателе. Наш отправитель не отправляет сообщения напрямую в этот обменник, но создать обменник все равно нужно, так как он все-таки будет далее использоваться, хоть и косвенно.
# Обменник dlx
$mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'fanout'});
Это обменник, в который мы будем отправлять отложенные сообщения.
Обратите внимание на тип создаваемого обменника — 'fanout', в отличие от первого обменника, имеющего типа 'direct'. Далее я объясню, почему именно 'fanout'.
# Очередь dlx
$mq->queue_declare(1, $queue_dlx, {}, {'x-dead-letter-exchange' => $exchange});
Здесь мы создаем очередь, в которую будет помещаться отложенные сообщения.
Аргумент 'x-dead-letter-exchange' — это гвоздь, на котором держится весь механизм отложенный сообщений. Если для очереди указан этот аргумент, то сообщения, у которых истекло время хранения, будут автоматически перемещаться из этой очереди в тот обменник, который был указан в этом аргументе.
Соответственно, в качестве обменника нужно указать обычный обменник, с которым работает получатель.
На всякий случай, пометка для тех, что не знаком с Перлом: конструкция {}
в третьем параметре означает, что в этом месте нужно передать ссылку на хеш с опциями, но, поскольку в данном конкретном случае никаких опций не требуется, то передается ссылка на пустой хеш.
# Публикуем сообщение
$mq->publish(1, $routing_key , $message, {exchange => $exchange_dlx}, {expiration => $expiration});
Отправляем сообщение в обменник для отложенных сообщений.
Здесь важен параметр 'expiration'. Этот параметр задает время хранения сообщения в миллисекундах. По истечению этого времени сообщение будет удалено из очереди. Но, как говорилось выше, если для очереди задан аргумент 'x-dead-letter-exchange', то одновременно с удалением из очереди сообщение будет отправлено в указанный в аргументе обменник, а тот в свою очередь, отправит сообщение в переплетенную с ним обычную очередь для немедленной доставки.
Тонкий момент с routing_key
Как вы помните, в получателе мы создали один обменник типа 'direct' и две переплетенные с ним по разным ключам очереди. Такая схема может применяться для отправки сообщений на одну тему двум разным получателям, например, отправка лога в файл или на консоль, в зависимости от ситуации. За то, в какую очередь отправлять сообщение, отвечает ключ routing_key.
А теперь представьте, что два сообщения с двумя разными ключами нужно сделать отложенными. Мы отправим их в обменник для отложенных сообщений, обменник должен будет решить, в какую очередь их отправлять. Но у них разные routing_key, поэтому обменник, если бы он был типа 'direct', не мог бы отправить их в одну и ту же очередь.
Именно поэтому обменник для отложенных сообщений мы делаем типа 'fanout' — этот тип обменника игнорирует routing_key и отправляет сообщения во все очереди, которые с ним переплетены. В нашем случае переплетена только одна очередь — очередь для отложенных сообщений. Соответственно, все сообщения, с любыми routing_key, отправленные в обменник для отложенных сообщений, пойдут в эту очередь.
Внимательный читатель в этом месте должен спросить: «А с каким routing_key сообщения будут отправлены в обычный обменник после истечения их срока хранения в очереди отложенных сообщений?»
Они будут отправлены с тем же routing_key, который у них и был. Значение routing_key не меняется, если ничего для этого специально не делать (но при желании можно настроить очередь на изменение routing_key).
Запуск
Сначала нужно запустить consumer_dlx.pl, потом можно запускать producer_dlx.pl с разными параметрами.
Параметры: [сообщение] [ключ mykey или mykey2] [задержка в миллисекундах].
На статичной картинке не видно, но после запуска producer_dlx.pl с указанием задержки происходит эта самая задержка, а потом consumer_dlx.pl выводит сообщение (в скобках выводится ключ).
WARNING
Как мне тут верно подсказал пользователь Tsyganov_Ivan, имеется проблема с сообщениями, имеющими разный expired. Дело в том, что сообщения «выходят» из очереди строго последовательно (на то она и очередь). Из-за этого возможна ситуация, когда сообщение с большим expired, идущее впереди, «запрёт» выход из очереди сообщениям, имеющим маленький expired, даже если этот маленький expired уже истек.
Поэтому, если вдруг у вас есть необходимость для разных очередей указывать разные 'expired', то вместо одной общей отложенной очереди сделайте нескольких индивидуальных отложенных очередей — на каждую обычную очередь своя отложенная.
Автор: ivanych