- PVSM.RU - https://www.pvsm.ru -
Привет! Умение работать с брокерами сообщений — едва ли не базовый навык для бэкенд‑разработчиков уровня Junior и Middle. В этом базовом материале по RabbitMQ посмотрим, как развернуть решение на собственном сервере, а также установим и настроим панель управления. Обычно эту задачу выполняет DevOps‑инженер, но ничего сложного нет и в самостоятельном администрировании. Главное — научиться качественно и грамотно обрабатывать сообщения, чтобы они не терялись, не считывались повторно, не оставались в брокере. Никаких ошибок ни в коем случае нельзя допускать, этим моментам тоже уделим внимание.
Текст подготовил Артём Шумейко — внештатный райтер, амбассадор Selectel и автор YouTube-канала [1] о разработке.
Используйте навигацию, если не хотите читать текст полностью:
→ Немного общей информации [2]
→ Подготовка [3]
→ Поставщик сообщений [4]
→ Потребитель сообщений [5]
→ Развертывание инструментов на сервере [6]
Брокеры сообщений необходимы для управления и упрощения обмена данными между различными системами и приложениями в распределенных архитектурах. Они выполняют несколько ключевых функций:
При работе с брокерами сообщений используется определенная терминология. Продюсеры порождают и записывают сообщения, консьюмеры — их считывают и обрабатывают. Подобная технология используются в больших системах и, как не сложно догадаться, продюсеров и консьюмеров обычно много.
Схема работы брокеров сообщений.
Мы рассмотрим упрощенный пример: создадим один продюсер и один консьюмер — очень легковесные и без лишнего кода. Написаны они могут быть на любом языке программирования. Мы для наших примеров возьмем Python.
Перед началом работы заглянем в панель управления, которая называется RabbitMQ Management. Интерфейс может быть не самый современный, но хорошо отображает все необходимые данные и события.
Панель управления RabbitMQ.
Все поля мы видим пустыми, так как сейчас с брокером сообщений программы не взаимодействуют: нет соединений, нет потока данных, нет очередей.
RabbitMQ обладает мощными встроенными инструментами, такими как регулярные выражения. Для поддержание простоты изложения сегодня постараемся их не трогать. Отметим лишь Exchanges — компоненты, которые получают сообщения от продюсеров и направляют их в очередь на основе правил маршрутизации. Есть несколько типов Exchanges в RabbitMQ:
Создадим два файла: producer.py и consumer.py — и поднимем виртуальное окружение. Нам понадобится pika — самая популярная библиотека для RabbitMQ. Для асинхронного взаимодействия используется iopika, но эту тему мы прибережем на будущее.
Работа с брокером начинается с подключения к нему. Укажем параметры соединения. Используем localhost. Имя пользователя по умолчанию и пароль совпадают — guest, порт — 5762.
from pika import ConnectionParameters
connection_params = ConnectionParameters(
host = "localhost",
port = 5672,
)
При подключении к брокеру в Python удобно использовать его контекстный менеджер with, который всегда выполнит предусмотренные действия при выходе из окна контекста — например, при переходе к следующей части кода или возникновении исключительной ситуации. В данном случае контекстный менеджер корректно закроет соединение с RabbitMQ.
Класс BlockingConnections используется для работы с блокирующими сообщениями. Перед началом работы с соединением нужно создать канал — все созданные каналы можно будет увидеть на соответствующей вкладке в панели управления.
def main():
with BlockingConnection(connection_params) as conn:
with conn.channel() as ch:
Все необходимые подготовительные мероприятия выполнены. Можно наконец переходить к логике работы. Прежде всего нужно создать очередь и задать ей произвольное имя. Назовем ее незамысловато — messages:
ch.queue_declare(queue="messages")
Опубликуем сообщение с помощью метода basic_publish, который содержит несколько параметров.
Первый из них — это exchange. Если оставить кавычки пустыми, будет использоваться direct exchange — прямой обмен, который задан по умолчанию.
Direct exchange на панели управления.
В routing_key передается название очереди для отправки сообщений. В данном случае — это messages. Параметр body содержит текст сообщения. Получившийся код для отправки выглядит следующим образом:
ch.basic_publish(
exchange="",
routing_key="",
body="Hello RabbitMQ!"
)
Обратите внимание на вызов функции print. Мы хотим иметь где-то записанное подтверждение того, что сообщение отправлено. На практике, разумеется, для таких целей правильнее использовать логирование — мы же хотим сейчас максимально упростить код для понимания главного.
Модуль готов, его можно попробовать использовать для отправки сообщений.
from pika import ConnectionParameters
connection_params = ConnectionParameters(
host = "localhost",
port = 5672,
)
def main():
with BlockingConnection(connection_params) as conn:
with conn.channel() as ch:
ch.queue_declare(queue="messages")
ch.basic_publish(
exchange="",
routing_key="",
body="Hello RabbitMQ!"
)
print("Message sent")
Запускаем получившийся файл в терминале:
python3 producer.py
Message sent
Если зайти в панель управления, то видно, что активных подключений нет. Действительно, все произошло очень быстро: наш файл запустился, отработал и закрыл соединение. То же самое можно сказать и о каналах — в панели управления и здесь пусто.
Ну что-то все-таки происходило. Активность можно увидеть на вкладке Overview.
Вкладка Overview на панели управления.
На графиках выше видно, что происходило отправление, затем наступила «тишина», при этом в очереди находится одно непрочитанное сообщение.
Запустим наш скрипт еще несколько раз. Данные на панели управления обновляются каждые пять секунд:
Вкладка Overview отражает активности.
Создадим консьюмер, который будет потреблять все сообщения, поступающие в брокер.
Работа продюсера и консьюмера в чем‑то схожа. Мы можем скопировать целиком файл producer.py и вставить его в consumer.py, лишь подправив несколько строчек — вместо метода basic_publish определяем basic_consume. Название очереди остается прежним — messages.
Некоторые отличия в логике работы продюсера и консьюмера все-таки есть. В первом случае, когда посылаем сообщение в брокер, мы делаем это явно: понимаем, в какой момент это требуется, видим строчку кода, которая это осуществляет. С чтением сообщений так не получится — мы не можем знать, когда оно придет, заранее.
Программировать самостоятельно периодический опрос какой‑то структуры неразумно — такой подход загромождает код и ставит эффективность реализации в зависимость от специфических знаний разработчика. Библиотека pika подразумевает иной подход — создание функции обратного вызова, которую внутренние механизмы будут пробуждать в момент прихода новых сообщений. В документации для нее принято название callback, мы будем использовать более понятное — process_message.
def main():
with BlockingConnection(connection_params) as conn:
with conn.channel() as ch:
ch.queue_declare(queue="messages")
ch.basic_consume(
queue="messages",
on_message_callback=process_message,
)
print("Жду сообщений")
ch.start_consuming()
В коде выше вызов функции start_consuming — и есть ожидание от внутренних механизмов сигнала о необходимости извлечения сообщения из очереди.
Когда консьюмер забирает сообщения из брокера, он руководствуется какой-то логикой: работает с базой данных, уведомляет пользователя, инициирует банковскую транзакцию, обращается к какому-то внешнему API… Наша функция обратного вызова скорее демонстрационная — мы просто выведем все полученные аргументы:
def process_message(*args):
for arg in args:
print(arg, "nn")
Полностью код консьюмера выглядит так:
from pika import ConnectionParameters
connection_params = ConnectionParameters(
host = "localhost",
port = 5672,
)
def process_message(*args):
for arg in args:
print(arg, "nn")
def main():
with BlockingConnection(connection_params) as conn:
with conn.channel() as ch:
ch.queue_declare(queue="messages")
ch.basic_consume(
queue="messages",
on_message_callback=process_message,
)
print("Жду сообщений")
ch.start_consuming()
Запустим созданный консьюмер и посмотрим, что произойдет:
python3 consumer.py
Приведем лишь часть вывода — он слишком объемный:
Часть вывода консьюмера: видны все передаваемые параметры.
Мы получили распечатку данных по всем трем сообщениям, которые до этого передавали в RabbitMQ. На вкладке Overview в панели управления тоже произошли изменения:
Панель управления RabbitMQ.
Обратите внимание: консьюмер по-прежнему запущен, в этом можно убедиться, взглянув на вкладку соединений в панели управления. Если придет какое-то новое сообщение, оно появится в выводе. Чтобы продолжить совершенствовать наш консьюмер, его надо остановить принудительно. Сейчас это можно сделать, нажав предусмотренную комбинацию клавиш в терминале.
Вкладка соединений в панели управления.
Что за параметры приходят в функцию обратного вызова?
BlockingChannel — канал, с которым мы работаем:
with conn_channel() as ch:
BasicDeliver — метод, в котором отражены параметры, указанные ранее: exchange, routing key и другие. BasicProperties в нашем случае пуст. Наконец, самый последний параметр — текст сообщения. Таким образом, обработчик можно переписать, явно указав название аргументов:
def process_message(ch, method, properties, body):
print(f"Получено сообщение: {body.decode()}")
Обратите внимание, что приходит именно байтовая строка — ее нужно преобразовывать в текстовую.
Если запустить наш консьюмер опять, вывод будет другим (поскольку мы поменяли содержимое функции process_message). Однако снова извлекаются три сообщения!
python3 consumer.py
Жду сообщений
Получено сообщение: Hello RabbitMQ!
Получено сообщение: Hello RabbitMQ!
Получено сообщение: Hello RabbitMQ!
Сообщения по-прежнему находятся в RabbitMQ. Очень важно указывать брокеру, что они обработаны, и разрешить их удаление.
Здесь проявляется одно из отличий RabbitMQ от Kafka, которая будет хранить сообщения всегда и дает возможность работать с их историей. В RabbitMQ такая функциональность тоже осуществима, но для этого придется основательно поработать с настройками.
Удалением сообщений в RabbitMQ можно управлять двумя способами.
Первый из них — установить соответствующий флаг в функции basic_consume, который по умолчанию имеет значение False:
auto_ack=True,
Однако тогда удаление сообщения произойдет до начала его обработки. Такой подход не всегда может устроить разработчика. Представим, что сообщение несет критически важную информацию и должно быть обработано корректным образом в любом случае. Появляется риск, что этого не произойдет в случае непредвиденной ошибки.
Второй способ — явно вызывать в функции обратного вызова метод basic_ack. Так сообщение удалится только после того, как будет благополучно обработано:
ch.basic_ack(delivery_tag=method.delivery_tag)
В протоколе AMQP delivery_tag — уникальный идентификатор доставки сообщения.
Обычно используется два сервиса: RabbitMQ и RabbitMQ Management — ядро и средство графического представления. Разместим их на облачном сервере [7]. Как это сделать, подробно описано в нашей докуменотации [8].
Для экспериментов лучше выбрать последнюю Ubuntu. Сгенерировать SSH‑ключ для безопасного подключения поможет наша статья [9]. Оперативной памяти будет достаточно 512 МБ. Особо мощный процессор не требуется, можно ограничиться одним ядром. Для хранения информации на жестком диске достаточно 5 ГБ. По ценам на конец 2024 года выйдет около 25 ₽ в сутки.
Для дальнейших действий понадобится терминал. Можно использовать консоль в панели управления Selectel. Кому-то покажется удобным работать с привычным терминалом своего компьютера. В таком случае копируем сначала IP‑адрес нашего созданного сервера и подключаемся к нему:
ssh root@87.228.12.94
Затем копируем и вставляем пароль, который задается в панели управления сервером:
Фрагмент панели управления сервером в Selectel.
Переходим к документации Docker, разделу установки Ubuntu [10]. Все необходимые команды нужно будет скопировать и перенести в терминал. Сначала подключаем репозиторий Docker:
Документация Docker: подключение репозитария.
Затем устанавливаем все необходимые Docker‑файлы. Для этого также воспользуемся копированием или онлайн‑документацией Docker [10].
Документация Docker: установка.
Не забудьте проверить корректность установки Docker способом, указанным на той же странице документации. Нам также понадобится Docker Compose, который запускает контейнеры в самом Docker.
Обратите внимание на порт 5672 — именно он используется RabbitMQ. Создадим файл docker-compose.yml следующего содержания в любой подходящей директории:
services:
rabbitmq:
image: rabbitmq:3.10.7-management
hostname: rabbitmq
ports:
- 15672:15672
- 5672:5672
Для этого воспользуемся терминалом.
touch docker-compose.yml
nano docker-compose.yml
Запустим приложения, описанные в YAML‑файле:
docker compose up --build
Теперь можно войти в систему управления RabbitMQ. Для этого вводим в в адресную строку браузера IP-адрес нашего сервера и указываем порт, который использует RabbitMQ. Не забываем имя пользователя и пароль по умолчанию, которые мы не изменяли: guest, guest. Появляется уже знакомая нам панель управления RabbitMQ:
Панель управления RabbitMQ на сервере.
Мы не создавали специального пользователя, не меняли пароли по умолчанию — стенд нужен только для демонстрации. На практике лучше получше позаботиться о безопасности и работать не из под root.
Наконец, подключимся к нашей удаленной очереди. Для этого в коде наших продюсера и консьермера меняем значение host на IP-адрес нашего сервера:
connection_params = ConnectionParameters(
host="87.228.12.94",
port=5672,
)
Вот так, буквально за несколько минут мы подняли свой сервер в облаке Selectel [7], установили и настроили инструментарий Docker, запустили RabbitMQ, подключились к его мониторингу через веб-браузер.
Автор: El_Gato_Grande
Источник [11]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/python/408879
Ссылки в тексте:
[1] YouTube-канала: https://www.youtube.com/@artemshumeiko
[2] Немного общей информации: #1
[3] Подготовка: #2
[4] Поставщик сообщений: #3
[5] Потребитель сообщений: #4
[6] Развертывание инструментов на сервере: #5
[7] на облачном сервере: https://selectel.ru/services/cloud/servers/?utm_source=habr.com&utm_medium=referral&utm_campaign=cloud_article_rabbitmq_230125_content
[8] в нашей докуменотации: https://docs.selectel.ru/cloud/servers/create/create-server/
[9] наша статья: https://selectel.ru/blog/tutorials/how-to-generate-ssh/
[10] разделу установки Ubuntu: https://docs.docker.com/engine/install/ubuntu/
[11] Источник: https://habr.com/ru/companies/selectel/articles/875880/?utm_source=habrahabr&utm_medium=rss&utm_campaign=875880
Нажмите здесь для печати.