Хочу продолжить серию перевода уроков с официального сайта. Примеры будут на php, но их можно реализовать на большинстве популярных ЯП.
Публикация/Подписка
В предыдущей статье было рассмотрено создание рабочей очереди сообщений. Было сделано допущение, что каждое сообщение будет направлено одному обработчику(worker). В этой статье усложним задачу – отправим сообщение нескольким подписчикам. Этот паттерн известен как "publish/subscribe" (публикация/подписка).
Чтобы понять этот шаблон, создадим простую систему логирования. Она будет состоять из двух программ – первая будет создавать логи, вторая считывать и печатать их.
В нашей систему логирования каждая программа подписчик будет получать каждое сообщение. Благодаря этому, мы сможем запустить одного подписчика на сохранение логов на диск, а потом в любое время сможем создать другого подписчика для отображения логов на экран.
По существу, каждое сообщение будет транслироваться каждому подписчику.
Точки обмена(exchanges)
В предыдущих статьях для отправки и принятия сообщений мы работали с очередью. Теперь рассмотрим расширенную модель отправки сообщений Rabbit.
Напомним термины предыдущей статьи:
- Producer (поставщик) ‒ программа, отправляющая сообщения
- Queue (очередь) – буффер, хранящий сообщение
- Consumer (подписчик) ‒ программа, принимающая сообщения.
Основная идея в модели отправки сообщений Rabbit – Поставщик(producer) никогда не отправляет сообщения напрямую в очередь. Фактически, довольно часто поставщик не знает, дошло ли его сообщение до конкретной очереди.
Вместо этого поставщик отправляет сообщение в точку доступа. В точке доступа нет ничего сложного. Точка доступа выполняет две функции:
— получает сообщения от поставщика
— отправляет эти сообщения в очередь.
Точка доступа точно знает, что делать с поступившими сообщениями. Отправить сообщение в конкретную очередь, либо в несколько очередей, либо не отправлять никому и удалить его. Эти правила описываются в типе точки доступа (exchange type).
Существуют несколько типов: direct, topic, headers и fanout. Мы остановимся на последнем типе fanout. Создадим точку с доступа с этим типом и назовем её – logs:
$channel->exchange_declare('logs', 'fanout', false, false, false);
Тип fanout – очень прост. Он копирует все сообщения которые поступают к нему во все очереди, которые ему доступны. Это то что нам нужно для нашей системы логирования.
Просмотр списка точек доступа:
Чтобы посмотреть все точки доступа на сервере, необходимо выполнить команду rabbitmqctl:
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
...done.
Мы видим список точек доступа с наименованием amq.* и точку доступа без имени, которая используется по умолчанию (она не подходит для выполнения нашей задачи).
Наименование точек доступа.
В предыдущих статьях мы ничего не знали о точках доступа, но всё-таки могли отправлять письма в очередь. Это было возможно, потому что использовали точку доступа по умолчанию, которая идентифицируется пустой строкой “”.
Вспомним как раньше мы отправляли письма:
$channel->basic_publish($msg, '', 'hello');
Здесь используется точка доступа по умолчанию или безымянная точка доступа: сообщение направляется в очередь, идентифицированную через ключ “routing_key”. Ключ “routing_key” передается через третий параметр функции basic_publish.
Теперь мы можем отправить сообщение в нашу именованную точку доступа.
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');
Временные очереди:
Всё это время мы использовали наименование очередей (“hello“ или “task_queue”). Возможность давать наименования помогает указать обработчикам (workers) определенную очередь, а также делить очередь между продюсерами и подписчиками.
Но наша система логирования требует, чтобы в очередь поступали все сообщения, а не только часть. Также мы хотим, чтобы сообщения были актуальными, а не старыми. Для этого нам понадобиться 2 вещи:
Каждый раз когда мы соединяемся с Rabbit, мы создаем новую очередь, или даем создать серверу случайное наименование.
Каждый раз когда подписчик отключается от Rabbit, мы удаляем очередь.
В php-amqplib клиенте, когда мы обращаемся к очереди без наименовании, мы создаем временную очередь и автоматически сгенерированным наименованием:
list($queue_name, ,) = $channel->queue_declare("");
Метод вернет автоматически сгенерированное имя очереди. Она может быть такой – ‘amq.gen-JzTY20BRgKO-HjmUJj0wLg.’.
Когда заявленное соединение оборвется, очередь автоматически удалиться.
Переплеты(Bindings)
Итак, у нас есть точка доступа с типом fanout и очередь. Сейчас нам нужно сказать точке доступа, чтобы она отправила сообщение в очередь. Отношение между точкой доступа и очередью называется bindings.
$channel->queue_bind($queue_name, 'logs');
С этого момента, сообщения для нашей очереди проходят через точку доступа
Посмотреть список binding-ов можно используя команду rabbitmqctl list_bindings
Отправка во все очереди:
Программа продюсер, которая создает сообщения, не изменилась с предыдущей статьи. Единственное важное отличие – теперь мы направляем сообщения в нашу именованную точку доступа ‘logs’, вместо точки доступа по умолчанию. Нам нужно было указать имя очереди при отправки сообщения. Но для точки доступа с типом fanout в этом нет необходимости. Рассмотрим код скрипта emit_log.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo " [x] Sent ", $data, "n";
$channel->close();
$connection->close();
?>
Как вы видите, после установки соединения мы создаем точку доступа. Этот шаг необходим, так как использование несуществующей точки доступа – запрещено.
Сообщение в точке доступа будут потеряны, так как ни одна очередь не связана с точкой доступа. Но это хорошо для нас: пока нет ни одного подписчика нашей точки доступа, все сообщения могут безопасно удалятся.
Код подписчика receive_logs.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPConnection;
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
echo ' [*] Waiting for logs. To exit press CTRL+C', "n";
$callback = function($msg){
echo ' [x] ', $msg->body, "n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
Если вы хотите сохранить логи в файл, вам потребуется открыть консоль и набрать:
$ php receive_logs.php > logs_from_rabbit.log
Если вы хотите отобразить логи на экран, откройте еще одно окно и наберите:
$ php receive_logs.php
Ну и конечно запуск продюсера сообщений:
$ php emit_log.php
С помощью команды rabbitmqctl list_bindings мы можем удостовериться, что код правильно создал очередь и связал её с точкой доступа. С двумя открытыми программами receive_logs.php у вас должно получиться следующее:
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.
Здесь отображено, что данные точки доступа logs отправляются в две очереди, имена которых созданы автоматически. Это именно то, чего мы добивались.
В следующей статье будет описано, как прослушать только часть сообщений.
Автор: dlux66