Предисловие
Мой сайт, которым я занимаюсь в качестве хобби, предназначен для хранения интересных домашних страниц и персональных сайтов. Эта тема стала интересовать меня в самом начале моего пути в программировании, в тот момент меня восхищало нахождение больших профессионалов, которые пишут о себе, своих увлечениях и проектах. Привычка открывать их для себя осталась и сейчас: почти на каждом коммерческом и не очень сайте я продолжаю заглядывать в футер в поисках ссылок на авторов.
Реализация идеи
Первая версия была просто html-страницей на моём персональном сайте, где я складывал ссылки с подписями в ul-список. Набрав за какое-то время страниц 20, я начал думать, что это не очень эффективно и решил попробовать автоматизировать процесс. На stackoverflow я замечал, что многие указывают сайты в своих профилях, поэтому я написал парсер на php, который просто шел по профилям, начиная с первого (адреса на SO и по сей день такого вида: `/users/1`), извлекал ссылки из нужного тега и складывал в SQLite.
Это можно назвать второй версией: коллекция из десятка тысяч урлов в SQLite табличке, которая заменила статический список в html. По этому списку я сделал простой поиск. Т.к. были только урлы, то и поиск был просто по ним.
На этом этапе я забросил проект и вернулся к нему, спустя долгое время. На этом этапе опыт моей работы составлял уже больше трёх лет и я чувствовал, что могу сделать что-то посерьёзнее. К тому же было большое желание осваивать относительно новые для себя технологии.
Современная версия
Проект развернут в докере, база переведена на mongoDb, и с относительно недавних пор, добавлен редис, который сначала был просто для кэширования. В качестве основы используется один из микрофреймворков PHP.
Проблема
Новые сайты добавляются консольной командой, которая синхронно делает следующее:
- Скачивает контент по URL
- Выставляет флаг о том, доступен ли был HTTPS
- Сохраняет сущность веб-сайта
- Исходный HTML и заголовки сохраняет в историю «индексирования»
- Парсит контент, извлекает Title и Description
- Данные сохраняет в отдельную коллекцию
Этого было достаточно, чтобы просто хранить сайты и отображать их в списке:
Но идея всё автоматически индексировать, категоризировать и ранжировать, держа всё в актуальном состоянии в эту парадигму укладывалась слабо. Даже простое добавление web-метода для добавления страниц потребовало дублирования кода и блокировок для избежания потенциального DDoS.
Вообще, конечно, всё можно делать и синхронно, а в web-методе производить просто сохранение УРЛа для того, чтобы монструозный демон выполнял все задачи для УРЛов из списка. Но всё равно даже тут напрашивается слово «очередь». А если очередь внедрить, то можно все задачи разделить и выполнять по крайней мере асинхронно.
Решение
Внедрить очереди и сделать event-driven систему обработки всех задач. И как раз давно хотелось попробовать Redis Streams.
Использование Redis streams в PHP
Т.к. фреймворк у меня не из тройки гигантов Symfony, Laravel, Yii, то и библиотеку хотелось бы найти независимую. Но, как оказалось (при первом рассмотрении) — отдельных серьёзных библиотек найти невозможно. Всё, что связано с очередями, либо является проектиком из 3 коммитов пятилетней давности, либо привязано к фреймворку.
Я наслышан о Symfony, как о поставщике отдельных полезных компонентов, к тому же некоторые я уже использую. А также от Laravel кое-что тоже можно использовать, например их ORM, без присутствия самого фреймворка.
symfony/messenger
Первый же кандидат сразу же показался идеальным и безо всяких сомнений я его установил. Но нагуглить примеры использования вне Symfony оказалось сложнее. Как собрать из кучи классов с универсальными, ни о чём не говорящими названиями, шину для передачи сообщений, да еще и на Redis?
Документация на официальном сайте была достаточно подробной, но инициализация была описана только для Symfony с помощью их любимого YML и других магических методов для не симфониста. Интереса в самом процессе установки у меня не было, особенно в новогодние каникулы. Но пришлось заниматься этим и неожиданно долго.
Попытка разобраться с инстанциированием системы по исходникам Symfony задача тоже не самая тривиальная для сжатых сроков:
Поковырявшись в этом всём и попытавшись что-то сделать руками, я пришел к выводу, что занимаюсь какими-то костылями и решил попробовать что-нибудь еще.
illuminate/queue
Оказалось, что эта библиотека намертво привязана к инфраструктуре Laravel и куче других зависимостей, поэтому много времени я на нее не тратил: поставил, посмотрел, увидел зависимости и удалил.
yiisoft/yii2-queue
Ну тут сразу предполагалось из названия, опять же жёсткая привязка к Yii2. Этой библиотекой мне приходилось пользоваться и она была неплохой, но о том, что она полностью зависит от Yii2 я не думал.
Остальные
Всё другое, что я находил на гитхабе — ненадежные устаревшие и заброшенные проектики без звезд, форков и большого количества коммитов.
Возврат к symfony/messenger, технические подробности
Пришлось разобраться с этой библиотекой и, потратив еще какой-то время, я смог. Оказалось, что всё достаточно лаконично и просто. Для инстанциирования шины я сделал небольшую фабрику, т.к. шин у меня предполагалось несколько и с разными обработчиками.
Всего несколько шагов:
- Создаем обработчики сообщений, которые должны быть просто callable
- Заворачиваем их в HandlerDescriptor (класс из библиотеки)
- Эти «Дескрипторы» заворачиваем в инстанс HandlersLocator
- Добавляем HandlersLocator в инстанс MessageBus
- Передаем в SendersLocator набор `SenderInterface`, в моём случае инстансы классов `RedisTransport`, которые конфигурируются очевидным образом
- Добавляем SendersLocator в инстанс MessageBus
MessageBus имеет метод `->dispatch()`, который ищет соответствующие обработчики в HandlersLocator и передает сообщение им, пользуясь соответствующими `SenderInterface` для отправки через шину (Redis streams).
В конфигурации контейнера (в данном случае php-di) вся эта связка может быть законфигурирована так:
CONTAINER_REDIS_TRANSPORT_SECRET => function (ContainerInterface $c) {
return new RedisTransport(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
$c->get(CONTAINER_SERIALIZER))
;
},
CONTAINER_REDIS_TRANSPORT_LOG => function (ContainerInterface $c) {
return new RedisTransport(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
$c->get(CONTAINER_SERIALIZER))
;
},
CONTAINER_REDIS_STREAM_RECEIVER_SECRET => function (ContainerInterface $c) {
return new RedisReceiver(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
$c->get(CONTAINER_SERIALIZER)
);
},
CONTAINER_REDIS_STREAM_RECEIVER_LOG => function (ContainerInterface $c) {
return new RedisReceiver(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
$c->get(CONTAINER_SERIALIZER)
);
},
CONTAINER_REDIS_STREAM_BUS => function (ContainerInterface $c) {
$sendersLocator = new SendersLocator([
AppMessagesSecretJsonMessages::class => [CONTAINER_REDIS_TRANSPORT_SECRET],
AppMessagesDaemonLogMessage::class => [CONTAINER_REDIS_TRANSPORT_LOG],
], $c);
$middleware[] = new SendMessageMiddleware($sendersLocator);
return new MessageBus($middleware);
},
CONTAINER_REDIS_STREAM_CONNECTION_SECRET => function (ContainerInterface $c) {
$host = 'bu-02-redis';
$port = 6379;
$dsn = "redis://$host:$port";
$options = [
'stream' => 'secret',
'group' => 'default',
'consumer' => 'default',
];
return Connection::fromDsn($dsn, $options);
},
CONTAINER_REDIS_STREAM_CONNECTION_LOG => function (ContainerInterface $c) {
$host = 'bu-02-redis';
$port = 6379;
$dsn = "redis://$host:$port";
$options = [
'stream' => 'log',
'group' => 'default',
'consumer' => 'default',
];
return Connection::fromDsn($dsn, $options);
},
Тут видно, что в SendersLocator для двух разных сообщений мы присвоили разный «транспорт», каждый из которых имеет свой коннект на соответствующие стримы.
Я сделал отдельный демо-проект, демонстрирущий приложение из трёх демонов, общающихся между собой с помощью такой шины: https://github.com/backend-university/products/tree/master/products/02-redis-streams-bus.
Но покажу как может быть устроен консьюмер:
use AppMessagesDaemonLogMessage;
use SymfonyComponentMessengerHandlerHandlerDescriptor;
use SymfonyComponentMessengerHandlerHandlersLocator;
use SymfonyComponentMessengerMessageBus;
use SymfonyComponentMessengerMiddlewareHandleMessageMiddleware;
use SymfonyComponentMessengerMiddlewareSendMessageMiddleware;
use SymfonyComponentMessengerTransportSenderSendersLocator;
require_once __DIR__ . '/../vendor/autoload.php';
/** @var PsrContainerContainerInterface $container */
$container = require_once('config/container.php');
$handlers = [
DaemonLogMessage::class => [
new HandlerDescriptor(
function (DaemonLogMessage $m) {
error_log('DaemonLogHandler: message handled: / ' . $m->getMessage());
},
['from_transport' => CONTAINER_REDIS_TRANSPORT_LOG]
)
],
];
$middleware = [];
$middleware[] = new HandleMessageMiddleware(new HandlersLocator($handlers));
$sendersLocator = new SendersLocator(['*' => [CONTAINER_REDIS_TRANSPORT_LOG]], $container);
$middleware[] = new SendMessageMiddleware($sendersLocator);
$bus = new MessageBus($middleware);
$receivers = [
CONTAINER_REDIS_TRANSPORT_LOG => $container->get(CONTAINER_REDIS_STREAM_RECEIVER_LOG),
];
$w = new SymfonyComponentMessengerWorker($receivers, $bus, $container->get(CONTAINER_EVENT_DISPATCHER));
$w->run();
Использование этой инфраструктуры в приложении
Реализовав шину в своём бэкенде, я выделил отдельные ступени из старой синхронной команды и сделал отдельные хэндлеры, каждый из которых занимается своим делом.
Пайплайн добавления нового сайта в базу данных получился таким:
И сразу после этого мне стало гораздо проще добавлять новый функционал, например, извлечение и парсинг Rss. Т.к. этот процесс также требует исходный контент, то хэндлер-извлекатель ссылки на rss также как и WebsiteIndexHistoryPersistor подписывается на сообщение «Content/HtmlContent», обрабатывает его и передает нужное сообщение по своему пайплайну дальше.
В конечном итоге получилось несколько демонов, каждый из которых держит подключения только к нужным ресурсам. Например демон crawlers содержит в себе все обработчики, которые требуют похода в интернет за контентом, а демон persister держит подключение к базе данных.
Теперь вместо селектов из базы данных, нужные id после вставки persister’ом просто передаются через шину всем заинтересованным обработчикам.
Автор: Борис