Продолжаю серию перевода уроков с официального сайта. Примеры будут на php, но их можно реализовать на большинстве популярных ЯП.
В предыдущей статье мы разработали систему логирования. Нам удалось отправлять сообщения нескольким получателям. В этой статье модернизируем нашу программу — будем отправлять получателю только часть сообщений. Например, мы сможем сохранять на диске только сообщения с критическими ошибками (экономия места на диске), а в консоли будем отображать все сообщения.
Bindings
В предыдущей статье мы создавали связи(bindings). Напомним код:
$channel->queue_bind($queue_name, 'logs');
Binding — это связь между точкой доступа и очередью. Это можно интерпретировать как: очередь хочет получить сообщения из точки доступа.
Binding может принимать параметр routing_key. Чтобы не путаться с параметром $channel::basic_publish (тоже содержит параметр routing_key), назовем его binding_key. Рассмотрим создание связи с ключом binding_key:
$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);
Значение этого ключа зависит от типа точки доступа. Точка доступа с типом fanout просто проигнорирует его.
Точка доступа Direct
Наша система логирования в предыдущей статье отправляла всем подписчиками все сообщения. Мы хотим расширить нашу систему, чтобы фильтровать сообщения по степени важности. Для примера мы сделаем так, чтобы скрипт, записывающий логи на диск не тратил своё место на собщения с типом warning или info.
Ранее мы использовали точку доступа с типом fanout, которая не дает нам полной гибкости — она подходит только для простой трансляции.
Вместо это мы будем использовать тип direct. Его алгоритм очень прост — сообщения идут в ту очередь, binding_key которой совпадает с routing key сообщения.
Рассмотрим схему на картинке:
На схеме изображена точка доступа X и две связанные с ней очереди. Первая очередь связана с binding key = orange, а вторая очередь имеет две связи. Одна с ключом binding key = black, а вторая с ключом — green.
Сообщения с routing key = orange будут направляться в очередь Q1, а сообщения с ключом black или green направятся в очередь Q2. Все остальные сообщения будут удалены.
Составные связи (Multiple bindings)
Вполне допустимо связывать несколько очередей с одинаковым ключом binding key. В этом примере мы связываем точку доступа X и очередь Q1 с тем же ключом black, что и у очереди Q2. В этом примере direct ведет себя также как и fanout: отсылает сообщения во все связанные очереди. Сообщения с ключом black попадет в обе очереди Q1 и Q2.
Отправка логов
Построим алгоритм отправки сообщений. Вместо fanout для точки доступа будем использовать тип direct. Routing key будет совпадать с названием типа лога. Допустим что скрипт отправки лога будет знать тип лога.
Для начала создадим точку доступа:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
Теперь отправим сообщение:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);
Лог будет иметь 3 типа: 'info', 'warning', 'error'.
Подписка
Отправка сообщений будет таким же как и в примере предыдущей статьи, с одним условием — нужно создать для каждого типа лога свою связь binding.
foreach($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
Итого получаем
Код скрипта продюсера emit_log_direct.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severity = $argv[1];
if(empty($severity)) $severity = "info";
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);
echo " [x] Sent ",$severity,':',$data," n";
$channel->close();
$connection->close();
?>
Код скрипта подписчика receive_logs_direct.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPConnection;
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$severities = array_slice($argv, 1);
if(empty($severities )) {
file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]n");
exit(1);
}
foreach($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "n";
$callback = function($msg){
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
Если вы хотите сохранить в файл только логи с типом error и warning, наберите в консоли:
$ php receive_logs_direct.php warning error > logs_from_rabbit.log
Если вы хотите отобразить все логи на экране, наберите в консоли
$ php receive_logs_direct.php info warning error
[*] Waiting for logs. To exit press CTRL+C
Или чтобы вытащить только логи error:
$ php emit_log_direct.php error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'
(исходники (emit_log_direct.php source) и (receive_logs_direct.php source))
В следующей статье будет рассмотрена прослушка сообщений соответствующих какому-либо шаблону
Автор: dlux66