EagleMQ – это новый высокопроизводительный менеджер очередей. Основные решаемые задачи это эффективное распределение сообщений между процессами, межпроцессорная коммуникация и уведомления реального времени.
Для решения основных задач имеется 3 главных примитива: очереди (queue), маршруты (route) и каналы (channel).
Очереди
Очереди являются самым главным примитивом. Очереди позволяют хранить сообщения и выдавать их клиенту в том же порядке в котором они пришли (принцип LILO).
Каждая очередь перед использованием должна быть декларирована. Если очередь не декларирована то при работе с ней не будет доступен весь набор команд. Декларирование позволяет отслеживать использование очереди клиентами и автоматически ее удалять если она никем не используется и при создании очереди был указан флаг AUTODELETE.
Сообщения могут иметь гарантированную доставку. Для применения гарантированной доставки нужно при извлечении сообщения из очереди командой .queue_pop указать timeout возврата отличным от нуля (в милисекундах). Затем при получении сообщения подтвердить доставку командой .queue_confirm (по уникальному идентификатору сообщения). При неподтверждении доставки в указанное время сообщение будет возвращено в очередь как самое старое и выдано следующему клиенту как можно скорее.
Также сообщения могут быть с ограниченым сроком хранения (expiration time) и будут автоматически удалены в случае истечения срока. Срок хранения является атрибутом посылаемого в очередь сообщения.
Очереди имеют поддержку асинхронной доставки поступающих сообщений клиентам. Для использования асинхронной доставки нужно подписаться на очередь командой .queue_subscribe с указанием в флагах режима подписки. Очереди имеют 2 режима подписки: сообщение и уведомление.
В режиме сообщения клиентам посылается событие в котором содержится название очереди, в которую поступило сообщение и само сообщение. Сообщение в очередь в таком случае не попадает т. к. уже послано клиентам. Если очередь создана с флагом ROUND_ROBIN тогда сообщение будет послано только одному клиенту каждый раз, а не всем. Для распределения посылки сообщений клиентам используется алгоритм round-robin.
В режиме уведомления всем клиентам подписанным на очередь с использованием такого режима посылается сообщение в котором содержится только название очереди. Данное событие будет послано всем подписчикам очереди с таким режимом, даже если сообщение было кому-то отправлено и не попало в очередь.
Отписаться от асинхронной доставки сообщений можно командой .queue_unsubscribe.
При создании очереди указывается само название очереди, максимальное количество сообщений которое можно хранить, максимальный размер сообщения и флаги.
Максимальное количество сообщений позволяет прекратить поступление новых сообщений в очередь при достижения лимита.
Максимальный размер сообщения позволяет запретить отправку сообщений с большим размером. Данный параметр не может иметь значение больше чем 2147483647.
Очередь поддерживает 4 флага: AUTODELETE, FORCE_PUSH, ROUND_ROBIN, DURABLE.
Флаг AUTODELETE используется для автоматического удаления очереди если она ни кем не декларирована и не имеет подписчиков.
Флаг FORCE_PUSH позволяет отправлять в очередь сообщения при достижении лимита максимального количества сообщений. Это достигается путем удаления более старого сообщения чтобы сохранить новое.
Флаг ROUND_ROBIN позволяет посылать сообщение только одному подписчику каждый раз(см. описание выше).
Флаг DURABLE указывает что очередь является персистентной и может быть сохранена вместе со всеми сообщениями в хранилище.
Список команд для работы с очередями:
- .queue_create(name, max_msg, max_msg_size, flags)
- .queue_declare(name)
- .queue_exist(name)
- .queue_list
- .queue_rename(from, to)
- .queue_size(name)
- .queue_push(name, message)
- .queue_get(name)
- .queue_pop(name, timeout)
- .queue_confirm(name, tag)
- .queue_subscribe(name, flags)
- .queue_unsubscribe(name)
- .queue_purge(name)
- .queue_delete(name)
Маршруты
Маршруты являются впомогательным примитивом для использования очередей. Маршруты позволяют связывать очереди с определенным ключом и организовывать быструю и эффективную доставку сообщений.
Для связывания очереди с ключом используется команда .route_bind. За посылку сообщения в маршрут отвечает команда .route_push.
Если при посылке сообщения в маршрут по ключу было связано несколько очередей, то копирование сообщения не производится (references counter) что позволяет ускорить выполнение команды и снизить потребление памяти сервером.
Маршрут поддерживает 3 флага: AUTODELETE, ROUND_ROBIN, DURABLE.
Флаг AUTODELETE используется для автоматического удаления маршрута если он не имеет связей с очередями по ключу.
Флаг ROUND_ROBIN позволяет при получении сообщения маршрутом посылать сообщение только одной очереди связанной по ключу. Для распределения сообщений по очередям используется алгоритм round-robin.
Флаг DURABLE указывает что маршрут является персистентным и может быть сохранен вместе со всеми ключами в хранилище.
Список команд для работы с маршрутами:
- .route_create(name, flags)
- .route_exist(name)
- .route_list
- .route_keys(name)
- .route_rename(from, to)
- .route_bind(name, queue, key)
- .route_unbind(name, queue, key)
- .route_push(name, key, message)
- .route_delete(name)
Каналы
Каналы это примитив для удобной доставки сообщений клиентам в реальном времени.
Принцип работы заключается в том, что клиент может подписаться на определенный topic командой .channel_subscribe. При отправке сообщения в канал другим клиентом командой .channel_publish на данный topic клиенту который подписался на этот topic будет послано событие с сообщением.
Также можно подписать на группу topic'ов по шаблону командой .channel_psubscribe. Форматом шаблона является glob-style pattern.
Канал поддерживает 3 флага: AUTODELETE, ROUND_ROBIN, DURABLE.
Флаг AUTODELETE используется для автоматического удаления канала если он не имеет подписчиков.
Флаг ROUND_ROBIN позволяет посылать сообщение только одному из множества подписчиков. Для распределения сообщений между подписчиками используется алгоритм round-robin.
Флаг DURABLE указывает что канал является персистентным и может быть сохранен в хранилище.
Список команд для работы с каналами:
- .channel_create(name, flags)
- .channel_exist(name)
- .channel_list
- .channel_rename(from, to)
- .channel_publish(name, topic, message)
- .channel_subscribe(name, topic)
- .channel_psubscribe(name, pattern)
- .channel_unsubscribe(name, topic)
- .channel_punsubscribe(name, pattern)
- .channel_delete(name)
Пользователи
Каждый клиент EagleMQ должен быть аутентифицирован чтобы иметь права на выполнение команд. Без аутентификации доступна только одна команда — .disconnect.
EagleMQ может иметь множество пользователей и каждый пользователь может иметь свой набор прав.
Пользовательские права имеют большую гибкость в настройке. Можно отключить любую команду (кроме .disconnect) и следовательно любой примитив.
Это может быть полезно во множестве случаев. Например вы используете сервер как очередь задач и у вас имеется множество клиентов которые создают задачи, а также множество клиентов которые их обрабатывают. В таком случае клиентам которые создают задачи рекомендуется разрешить использование только команды .queue_push. Клиентам обработчикам будет достаточно прав только на команду .queue_pop. Это поможет сохранить данные и четко задать клиенту роль в системе.
Также это может быть полезно если вы поставили работать сервер во внешней сети (например публичная real-time рассылка предупреждений о землетресении) и позволяете любому пользователю подключится на прямую (в таком случае пользователю достаточно прав на .channel_subscribe и .channel_psubscribe).
Список команд для работы с пользователями:
- .user_create(name, password, perm)
- .user_list
- .user_rename(from, to)
- .user_set_perm(name, perm)
- .user_delete(name)
Всего в EagleMQ на текущий момент имеется 44 команды. Список всех команд есть в документации.
Производительность
EagleMQ имеет достаточно высокую производительность. Строгие комплексные измерения и сравнительные тесты еще не проводились. Это связано во многом с тем, что некоторые подобные системы имеют возможность тонкой настройки под задачу (например RabbitMQ) и мой опыт в этих системах не позволяет оценить объективно.
В отличии от остальных систем EagleMQ почти не нуждается в дополнительном конфигурировании влияющем на производительность. Также очень мала вероятность ошибиться при выборе примитива или использовать его неправильно.
Для тестирования использовалась тестовая утилита benchmark из состава libemq.
Суть тестирования заключается в том, чтобы отправить сообщения в очередь командой .queue_push. Приложение использует 50 потоков. Для соединения с сервером использовались локальные TCP сокеты.
Intel® Core(TM) i5-2450M CPU @ 2.50GHz
Clients | Total Messages | Message Size | Time, seconds | Req/Sec |
---|---|---|---|---|
50 | 1 000 000 | 1000 | 9.20 | 108283.71 |
50 | 1 000 000 | 100 | 8.07 | 123931.09 |
50 | 1 000 000 | 10 | 8.05 | 124300.80 |
50 | 100 000 | 1000 | 0.95 | 105596.62 |
50 | 100 000 | 100 | 0.85 | 117096.02 |
50 | 100 000 | 10 | 0.81 | 123304.56 |
Intel® Core(TM) i5-3470 CPU @ 3.20GHz
Clients | Total Messages | Message Size | Time, seconds | Req/Sec |
---|---|---|---|---|
50 | 1 000 000 | 1000 | 5.19 | 192566.92 |
50 | 1 000 000 | 100 | 4.47 | 223613.59 |
50 | 1 000 000 | 10 | 4.40 | 227169.47 |
50 | 100 000 | 1000 | 0.53 | 189393.94 |
50 | 100 000 | 100 | 0.46 | 219298.25 |
50 | 100 000 | 10 | 0.45 | 220750.55 |
Персистентность
Каждый примитив EagleMQ обладает персистентностью. Обычно для поддержки этой функциональности примитив должен быть создан с флагом DURABLE. Также в конфигурационном файле должен быть указан путь к хранилищу и задан интервал сохранения.
Все данные постоянно хранятся в оперативной памяти.
Принцип работы механизма сохранения состоит в клонировании процесса (fork) по заданому интервалу и записи всех необходимых данных в файл. Для сохранения данных по требованию клиента имеется команда .save.
Все сохраняемые данные по возможности сжимаются библиотекой liblzf.
Протокол
EagleMQ имеет свой собственный бинарный протокол.
Бинарные протоколы почти всегда используются в подобных системах (AMQP, ZMQ) и помогают достигнуть высокую производительность при обработке запроса.
Имеются конечно же и минусы:
- Бинарный протокол не является human-readable.
- Могут быть сложности при реализации клиентских библиотек на скриптовых языках программирования.
- Необходимость использовать совместимый клиент т.к. бинарный протокол может измениться.
Протокол EagleMQ может изменяться только в главных (major) релизах.
Roadmap
- Улучшение поддержки packet aggregation для возможности посылки нескольких команд за один раз. Сейчас такая поддержка уже имеется, но с потерей пакетов.
- Документирование бинарного протокола.
- Написание CLI для управления EagleMQ с помощью командной строки.
- Доработка и сопровождение сайта www.eaglemq.com.
- Поддержка little и big engian архитектур.
- Покрытие всей функциональности тестами.
Все желающие могут задать мне свой вопрос в коментариях или по почте. Буду рад ответить.
Репозиторий проекта: GitHub
Автор: yakushstanislav