Всем доброго времени суток, друзья.
Сегодня захотелось поговорить о том, как можно работать с RabbitMQ в Symfony и совсем чуть-чуть о некоторых подводных комнях. В конце я напишу парочку интересных моментов о кролике (рус. перевод «rabbit») для тех, кто совсем в танке.
Я не буду рассказывать про сам RabbitMQ, поэтому если вы пока и этого не знаете, почитайте следующие переводы:
habrahabr.ru/post/149694
habrahabr.ru/post/150134
habrahabr.ru/post/200870
habrahabr.ru/post/201096
habrahabr.ru/post/201178
Не бойтесь примеров на перле или пайтоне — это не страшно, все достаточно понятно из исходного кода.
+ Все достаточно подробно описано, когда я читал это в свое время, достаточно было интерпретировать код мысленно, чтобы понять как что и зачем.
Если вы уже знаете, что такое консумер и почему в нем нужно делать $em->clear() + gc_collect_cycles, а после закрывать соединение с базой данных, то, скорее всего, вы ничего нового для себя не узнаете. Статья скорее для тех, кто не хочет разбираться с AMQP протоколом, но которым нужно прямо сейчас применять очереди и выбор почему-то бездумно пал на RabbitMQ, а не тот же легковестный beanstalkd.
Если же у вас микросервисная архитектура и вы ждете, что я расскажу вам как сварить коммуникацию между компонентами через AMQP, как красиво делать RPC, то я сам чего-то подобного очень давно жду на хабре…
Перед нами задача: отправлять сообщения на Email в очереди, используя RabbitMQ, а так же обеспечить отказоустойчивость: если почтовый сервер ответил таймаутом или что-то ещё сломалось — нужно попробовать выполнить задачу через 30 секунд ещё раз.
Итак, устанавливаем наш бандл:
github.com/php-amqplib/RabbitMqBundle
Я слишком ленив, чтобы описывать вам, как нужно копировать composer require команду и строку в AppKernel.
Я очень надеюсь, что вы сами это сделали и готовы приступать к конфигурированию нашего бандла.
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
Теперь вы можете открыть ваш localhost:15672 под учеткой: guest guest и увидеть много прикольных вещей, в которых скоро вы будете разбираться и чувствовать себя мужиком.
Теперь устанавливаем сам бандл:
composer require php-amqplib/rabbitmq-bundle
И регистрируем его в нашем приложении:
// app/AppKernel.php
public function registerBundles()
{
$bundles = array(
new OldSoundRabbitMqBundleOldSoundRabbitMqBundle(),
);
}
Вот и всё.
Конфигурация бандла для нас:
old_sound_rabbit_mq:
connections:
default:
host: 'localhost'
port: 5672
user: 'guest'
password: 'guest'
vhost: '/'
lazy: false
connection_timeout: 3
read_write_timeout: 3
keepalive: false
heartbeat: 0
use_socket: true
producers:
send_email:
connection: default
exchange_options: { name: 'notification.v1.send_email', type: direct }
consumers:
send_email:
connection: default
exchange_options: { name: 'notification.v1.send_email', type: direct }
queue_options: { name: 'notification.v1.send_email' }
callback: app.consumer.mail_sender
Здесь огромное внимание следует обратить на producers и consumers. Если очень коротко и просто: producer — это то, что отправляет сообщения через RabbitMQ в consumer, а consumer в свою очередь — та вещь, которая получает и обрабатывает эти сообщения. Здесь же exchange_options — опции для обменника (вы же прочитали статьи про rabbitmq, которые были в начале статьи?), queue_options — опции для очереди (аналогично). Так же стоит обратить внимание на callback в consumer — здесь указывается ID сервиса, который расширяет ConsumerInterface (execute метод с аргументом сообщения).
Т.к. пока что у вас его нету, при запуске приложения или компиляции контейнера мы получим какое-то DI исключение, что сервис не найден, но мы его запрашиваем. Поэтому давайте создавать наш сервис:
#app/config/services.yml
services:
app.consumer.mail_sender:
class: AppBundleConsumerMailSenderConsumer
И сам класс:
namespace AppBundleConsumer;
use OldSoundRabbitMqBundleRabbitMqConsumerInterface;
use PhpAmqpLibMessageAMQPMessage;
/**
* Class NotificationConsumer
*/
class MailSenderConsumer implements ConsumerInterface
{
/**
* @var AMQPMessage $msg
* @return void
*/
public function execute(AMQPMessage $msg)
{
echo 'Ну тут типа сообщение пытаюсь отправить: '.$msg->getBody().PHP_EOL;
echo 'Отправлено успешно!...';
}
}
Ну вы же не обиделись, что я не включил в статью как работать со SwiftMailer? :) Нам важно, чтобы сюда асинхронно доставлялась строка через очередь сообщений, то, как мы будем обрабатывать эту строку — дело наше. Почта — всего лишь пример кейса.
Как же нам передать строку в наш консьюмер? Для этого давайте создадим тестовую команду:
namespace AppBundleCommand;
use SymfonyBundleFrameworkBundleCommandContainerAwareCommand;
use SymfonyComponentConsoleInputInputInterface;
use SymfonyComponentConsoleOutputOutputInterface;
class TestConsumerCommand extends ContainerAwareCommand
{
/**
* {@inheritdoc}
*/
protected function configure()
{
$this
->setName('app:test-consumer')
->setDescription('Hello PhpStorm');
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('Сообщенька для отправки на мыло...');
}
Снова извиняюсь, что не сделал для вас контроллер с красивой формочкой — я слишком ленив для этого. Да и слишком уж излишне. А я экономный лентяй и люблю рисовать, мечтаю немного в сторону теорий и архитектуры приложений. Отвлеклись.
Теперь запускаем наш consumer и приказываем ему ждать сообщения из RabbitMQ:
bin/console rabbitmq:consumer send_email -vvv
И отправим ему сообщение из нашей тестовой команды:
bin/console app:test-consumer
И вот сейчас, в процессе rabbitmq:consumer, мы можем увидеть наше сообщение! И что псевдо отправка завершилась успехом.
А теперь давайте посмотрим, как можно реализовать отложенную обработку сообщений в случае ошибок. Я не буду использовать плагин RabbitMQ для отложенных сообщений. Мы будем достигать этого путем создания новой очереди, в которой укажем время жизни сообщений 30сек и установим настройку: после смерти — перекладываться в основную очередь.
Достаточно лишь добавить новый producer:
producers:
send_email:
connection: default
exchange_options: { name: 'notification.v1.send_email', type: direct }
delayed_send_email:
connection: default
exchange_options:
name: 'notification.v1.send_email_delayed_30000'
type: direct
queue_options:
name: 'notification.v1.send_email_delayed_30000'
arguments:
x-message-ttl: ['I', 30000]
x-dead-letter-exchange: ['S', 'notification.v1.send_email']
Теперь давайте изменим логику консумера:
namespace AppBundleConsumer;
use OldSoundRabbitMqBundleRabbitMqConsumerInterface;
use OldSoundRabbitMqBundleRabbitMqProducerInterface;
use PhpAmqpLibMessageAMQPMessage;
/**
* Class NotificationConsumer
*/
class MailSenderConsumer implements ConsumerInterface
{
private $delayedProducer;
/**
* MailSenderConsumer constructor.
* @param ProducerInterface $delayedProducer
*/
public function __construct(ProducerInterface $delayedProducer)
{
$this->delayedProducer = $delayedProducer;
}
/**
* @var AMQPMessage $msg
* @return void
*/
public function execute(AMQPMessage $msg)
{
$body = $msg->getBody();
echo 'Ну тут типа сообщение отправляю '.$body.' ...'.PHP_EOL;
try {
if ($body == 'bad') {
throw new Exception();
}
echo 'Успешно отправлено...'.PHP_EOL;
} catch (Exception $exception) {
echo 'ERROR'.PHP_EOL;
$this->delayedProducer->publish($body);
}
}
}
А вообще для вывода полезно использовать LoggerInterface — и красиво и масштабируется.
Но нам же лень и мы не хотим создавать дополнительные «думки», верно? Просто знайте.
Теперь мы должны прокинуть producer для отложенной очереди:
#app/config/services.yml
services:
app.consumer.mail_sender:
class: AppBundleConsumerMailSenderConsumer
arguments: ['@old_sound_rabbit_mq.delayed_send_email_producer']
И изменим команду:
namespace AppBundleCommand;
use SymfonyBundleFrameworkBundleCommandContainerAwareCommand;
use SymfonyComponentConsoleInputInputInterface;
use SymfonyComponentConsoleOutputOutputInterface;
class TestConsumerCommand extends ContainerAwareCommand
{
/**
* {@inheritdoc}
*/
protected function configure()
{
$this
->setName('app:test-consumer')
->setDescription('Hello PhpStorm');
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('Ура, сообщенька...');
$this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('bad');
}
}
Теперь вместе с нормальным сообщением она будет отправлять и плохое сообщение.
Если мы запустим, то увидим следующий вывод:
Ну тут типа сообщение отправляю Ура, сообщенька...
Успешно отправлено...
Ну тут типа сообщение отправляю bad...
ERROR
Спустя 30 секунд еще раз появится сообщение об обработке:
Ну тут типа сообщение отправляю bad...
ERROR
И так бесконечно. Логику максимальных попыток и т.п. продумывайте сами. Далее я дам пару советов для вашего прода и некоторых фич.
Теперь советы для вашего прода:
1)Не отходя от темы с максимальными попытками обработки: знайте на все 102% все возможные исключения контекста с которым вы работаете! Умейте представлять, когда повторная обработка требуется, а когда нет, иначе — привет мусорке из логов и отсутствия понимания что происходит. В случае, если битая задача будет крутится в RabbitMQ, с реальными данными, нормальными задачами, вы вряд ли сможете выкинуть сломанные задачи без костылей, не обновляя код консьюмера и не перезапуская его. Поэтому продумывайте это сразу. В данном случае правильным было бы ловить только лишь SMTPTimeOutException какой-нибудь.
Так же с такой моделью важно понимать, что: на 1 очередь — одна «глобальная ответственность смены состояния чего либо». Не стоит давать слишком много рискованных задач своему воркеру. Если рассмотреть вариант с 1С, то проблема может быть в следующем: допустим при успешном или неуспешном изменениидобавлении товара в 1С мы записываем в базу данных что-нибудь, например, дату последней удачной синхронизации или неудачной. Т.е. тут обновляются сразу 2 базы данных: бд 1С и бд вашего приложения. Допустим в 1С все успешно создалось, далее идет обновление в базе данных поля «дата последней удачной синхронизации» — хоп, вылезла ошибочка, опять же, сервер бд не отвечает — задача откладывается на «потом» и повторяется, пока база данных не начнет отвечать. И при этом каждый раз «подзадача» связанная с созданием сущности в 1С будет успешно выполняться, каждый раз при неудачной попытке записи в базу данных сайта, что неправильно.
2)Прочитайте про durable, раз уж мы с вами используем RabbitMQ. P.S: это заводится как truefalse флаг «durable» в конфиге бандла, конкретно — в exchange_options и queue_options
3)Всю свою жизнь закрывайте соединение к базе данных после выполнения работы программы. А так же запускайте очистку EM и после сборщик мусора для чистки ссылок. Т.е. в конце концов наш консьюмер должен выглядеть как-то так:
class MailSenderConsumer implements ConsumerInterface
{
private $delayedProducer;
private $entityManager;
/**
* MailSenderConsumer constructor.
* @param ProducerInterface $delayedProducer
* @param EntityManagerInterface $entityManager
*/
public function __construct(ProducerInterface $delayedProducer, EntityManagerInterface $entityManager)
{
$this->delayedProducer = $delayedProducer;
$this->entityManager = $entityManager;
gc_enable();
}
/**
* @var AMQPMessage $msg
* @return void
*/
public function execute(AMQPMessage $msg)
{
$body = $msg->getBody();
echo 'Ну тут типа сообщение отправляю '.$body.' ...'.PHP_EOL;
try {
if ($body == 'bad') {
throw new Exception();
}
echo 'Успешно отправлено...'.PHP_EOL;
} catch (Exception $exception) {
echo 'ERROR'.PHP_EOL;
$this->delayedProducer->publish($body);
}
$this->entityManager->clear();
$this->entityManager->getConnection()->close();
gc_collect_cycles();
}
}
Консьюмер работает как демон, поэтому постоянно копить в нем ссылки и держать соединение с бд — это плохо. В случае с MySQL вы получите MySQL server has gone away.
4)Много думайте, почему ваша модель отложенных сообщений может неожиданно убить ваш бизнес. Например у нас есть механизм, который при изменении товара в админке заливает эти изменения через очередь в 1С. Теперь представим ситуацию: администратор меняет товар -> создается задача #1 на попытку изменить те же данные в базе 1С. Сервер 1С не отвечает, поэтому задачка просто перекладывается постоянно, пока все не заработает. За это время администратор решил еще кое-что подправить в том же товаре, что он и делает. Регистрируется задача #2.
А теперь представьте ситуацию, когда поочередно выполняются и откладываются задачи #1 и #2.
Что если 1С заработает к моменту выполнения задачи #2? Задача выполнится и зальёт последние изменения. Далее пойдет в ход задача #1 и затрет собой стабильные изменения :)
Выход: отправляем timestamp в качестве version, и, если задача «из прошлого» — выкидываем её.
5)Идешь в асинхронность — прочитай про многие архитектурные проблемы, а также race condition, несогласованность консумеров на разных машинах и прочее.
6)Пишите версии вашим очередям… Ух как помогает на реальном проде. В принципе мы так и сделали в этом примере.
7)Возможно тебе не нужен RabbitMQ и целый AMQP протокол. Посмотри в сторону beanstalkd.
8)Запускайте консумеры и всякое прочее демоническое на php через supervisor и подключите полное логирование падения процессов в нём. У него так же есть web интерфейс для управления всем этим делом, что так же очень удобно. Проблемы будут всегда.
Автор: php_freelancer