RabbitMQ позволяет взаимодействовать различным программам при помощи протокола AMQP. RabbitMQ является отличным решением для построения SOA (сервис-ориентированной архитектуры) и распределением отложенных ресурсоемких задач.
Под катом перевод первого из шести уроков официального сайта. Примеры на python, но его знание вовсе не обязательно. Аналогичные примеру программы можно воспроизвести практически на любом популярном ЯП. [так выглядят комментарии переводчика, т.е. меня]
Вступление
RabbitMQ ‒ это брокер сообщений. Его основная цель ‒ принимать и отдавать сообщения. Его можно представлять себе, как почтовое отделение: когда Вы бросаете письмо в ящик, Вы можете быть уверены, что рано или поздно почтальон доставит его адресату [видимо, автор ни разу не имел дела с Почтой России]. В этой аналогии RabbitMQ является одновременно и почтовым ящиком, и почтовым отделением, и почтальоном.
Наибольшее отличие RabbitMQ от почтового отделения в том, что он не имеет дела с бумажными конвертами ‒ RabbitMQ принимает, хранит и отдает бинарные данные ‒ сообщения.
В RabbitMQ, а также обмене сообщениями в целом, используется следующая терминология:
- Producer (производитель) [далее в переводе будет использован термин «продюсер» ] ‒ программа, отправляющая сообщения. В схемах он будет представлен кругом с буквой «P»:
- Queue (очередь) ‒ имя «почтового ящика». Она существует внутри RabbitMQ. Хотя сообщения проходят через RabbitMQ и приложения, хранятся они только в очередях. Очередь не имеет ограничений на количество сообщений, она может принять сколь угодно большое их количество ‒ можно считать ее бесконечным буфером. Любое количество продюсеров может отправлять сообщения в одну очередь, также любое количество клиентов может получать сообщения из одной очереди. В схемах очередь будет обозначена стеком и подписана именем:
- Consumer (клиент) ‒ программа, принимающая сообщения. Обычно клиент находится в состоянии ожидания сообщений. В схемах он будет представлен кругом с буквой «C»:
Продюсер, клиент и брокер не обязаны находиться на одной физической машине, обычно они находятся на разных.
Hello World!
Первый пример не будет особо сложным ‒ давайте просто отправим сообщение, примем его и выведем на экран. Для этого нам потребуется две программы: одна будет отправлять сообщения, другая ‒ принимать и выводить их на экран.
Общая схема такова:
Продюсер отправляет сообщения в очередь с именем «hello», а клиент получает сообщения из этой очереди.
Библиотека RabbitMQ
RabbitMQ использует протокол AMQP. Для использования RabbitMQ необходима библиотека, поддерживающая этот протокол. Такие библиотеки можно найти практически для каждого языка программирования. Python ‒ не исключение, для него есть несколько библиотек:
В примерах будет использована библиотека pika. Ее можно установить при помощи менеджера пакетов pip:
$ sudo pip install pika==0.9.5
Если отсутствуют pip или git-core, то сначала необходимо установить их:
- Для Ubuntu:
$ sudo apt-get install python-pip git-core
- Для Debian:
$ sudo apt-get install python-setuptools git-core $ sudo easy_install pip
- Для Windows (для установки easy_install необходимо запустить MS Windows Installer для setuptools):
> easy_install pip > pip install pika==0.9.5
Отправка сообщений
Наша первая программа send.py будет просто отправлять одно сообщение в очередь.
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
Мы подключились к брокеру сообщений, находящемуся на локальном хосте. Для подключения к брокеру, находящемуся на другой машине, достаточно заменить «localhost» на IP адрес этой машины.
Перед отправкой сообщения мы должны убедиться, что очередь, получающая сообщение, существует. Если отправить сообщение в несуществующую очередь, RabbitMQ его проигнорирует. Давайте создадим очередь, в которую будет отправлено сообщение, назовем ее «hello»:
channel.queue_declare(queue='hello')
Теперь все готово для отправки сообщения. Наше первое сообщение будет содержать строку «Hello World!» и будет отправлено в очередь с именем «hello».
Вообще, в RabbitMQ сообщения не отправляются непосредственно в очередь, они должны пройти через exchange (точка обмена). Но сейчас мы не будем заострять на этом внимание, точки обмена будут рассмотрены в третьей части учебника. Сейчас достаточно знать, что точку обмена по-умолчанию можно определить, указав пустую строку. Это специальная точка обмена ‒ она позволяет определять, в какую именно очередь отправлено сообщение. Имя очереди должно быть определено в параметре routing_key:
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
Перед выходом из программы необходимо убедиться, что буфер был очищен и сообщение дошло до RabbitMQ. В этом можно быть уверенным, если использовать безопасное закрытие соединения с брокером.
connection.close()
Получение сообщений
Наша вторая программа receive.py будет получать сообщения из очереди и выводить их на экран.
Также как и в первой программе, сначала необходимо подключиться к RabbitMQ. Для этого следует использовать тот же код, как и ранее. Следующий шаг, как и прежде ‒ убедиться, что очередь существует. Команда queue_declare не будет создавать новую очередь, если она уже существует, поэтому сколько бы раз не была вызвана эта команда, все-равно будет создана только одна очередь.
channel.queue_declare(queue='hello')
Вы можете задаться вопросом, почему мы объявляем очередь снова, ведь она уже была объявлена в первой программе. Это нужно, чтобы быть уверенным в существовании очереди, так будет, если сначала будет запущена программа send.py. Но мы не знаем, какая программа будет запущена раньше. В таких случаях лучше объявить очередь в обеих программах.
Мониторинг очередей
Если Вы хотите посмотреть, какие очереди существуют в RabbitMQ на данный момент, Вы можете сделать это с помощью команды rabbitmqctl (потребуются права суперпользователя):
$ sudo rabbitmqctl list_queues
Listing queues ...
hello 0
...done.
(для Windows ‒ без sudo)
[в нашей компании используют более удобный скрипт мониторинга:]
watch 'sudo /usr/sbin/rabbitmqctl list_queues name messages_unacknowledged messages_ready messages durable auto_delete consumers | grep -v "..." | sort | column -t;'
[скрипт выводит и обновляет каждые 2 секунды таблицу со списком очередей: имя очереди; количество сообщений в обработке; количество сообщений готовых к обработке; общее количество сообщений; устойчивость очереди к перезагрузке сервиса; является ли временной очередью; количество подключенных клиентов]
Получение сообщений из очереди более сложный процесс, чем отправка. Получение осуществляется при помощи подписки с использованием callback функции. При получении каждого сообщения библиотека Pika вызывает эту callback функцию. В нашем примере она будет выводить на экран текст сообщения.
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
Далее, нам нужно обозначить, что callback функция будет получать сообщения из очереди с именем «hello»:
channel.basic_consume(callback,
queue='hello',
no_ack=True)
Здесь необходимо быть уверенным в том, что очередь, на которую мы хотим подписаться, была объявлена. Мы сделали это ранее с помощью команды queue_declare.
Параметр no_ack будет рассмотрен позже [во второй части учебника].
И, наконец, запуск бесконечного процесса, который ожидает сообщения из очереди и вызывает callback функцию, когда это необходимо.
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
Ну а теперь все вместе
Полный код send.py:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
Полный код receive.py:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
Теперь мы можем попробовать запустить наши программы в терминале. Сначала отправим сообщение при помощи программы send.py:
$ python send.py
[x] Sent 'Hello World!'
Выполнение этой программы будет завершаться после отправки каждого сообщения. Теперь сообщение нужно получить:
$ python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
Отлично! Мы отправили наше первое сообщение через RabbitMQ. Как Вы могли заметить, выполнение программы receive.py не завершилось. Она будет ожидать следующих сообщений, а остановить ее можно, нажав Ctrl+C.
Попробуйте запустить send.py снова в новом окне терминала.
Мы изучили, как отправлять и получать сообщения через именованные очереди. В следующей части учебника мы создадим простую очередь задач [ресурсоемких].
Автор: zTrue