- PVSM.RU - https://www.pvsm.ru -

RabbitMQ — Hello World

RabbitMQ — Hello World
RabbitMQ позволяет взаимодействовать различным программам при помощи протокола AMQP. RabbitMQ является отличным решением для построения SOA (сервис-ориентированной архитектуры) и распределением отложенных ресурсоемких задач.
Под катом перевод первого [1] из шести уроков официального сайта. Примеры на python, но его знание вовсе не обязательно. Аналогичные примеру программы можно воспроизвести практически на любом популярном ЯП. [так выглядят комментарии переводчика, т.е. меня]

Вступление

RabbitMQ ‒ это брокер сообщений. Его основная цель ‒ принимать и отдавать сообщения. Его можно представлять себе, как почтовое отделение: когда Вы бросаете письмо в ящик, Вы можете быть уверены, что рано или поздно почтальон доставит его адресату [видимо, автор ни разу не имел дела с Почтой России]. В этой аналогии RabbitMQ является одновременно и почтовым ящиком, и почтовым отделением, и почтальоном.
Наибольшее отличие RabbitMQ от почтового отделения в том, что он не имеет дела с бумажными конвертами ‒ RabbitMQ принимает, хранит и отдает бинарные данные ‒ сообщения.
В RabbitMQ, а также обмене сообщениями в целом, используется следующая терминология:

  • Producer (производитель) [далее в переводе будет использован термин «продюсер» ] ‒ программа, отправляющая сообщения. В схемах он будет представлен кругом с буквой «P»:
    RabbitMQ — Hello World
  • Queue (очередь) ‒ имя «почтового ящика». Она существует внутри RabbitMQ. Хотя сообщения проходят через RabbitMQ и приложения, хранятся они только в очередях. Очередь не имеет ограничений на количество сообщений, она может принять сколь угодно большое их количество ‒ можно считать ее бесконечным буфером. Любое количество продюсеров может отправлять сообщения в одну очередь, также любое количество клиентов может получать сообщения из одной очереди. В схемах очередь будет обозначена стеком и подписана именем:
    RabbitMQ — Hello World
  • Consumer (клиент) ‒ программа, принимающая сообщения. Обычно клиент находится в состоянии ожидания сообщений. В схемах он будет представлен кругом с буквой «C»:
    RabbitMQ — Hello World

Продюсер, клиент и брокер не обязаны находиться на одной физической машине, обычно они находятся на разных.

Hello World!

Первый пример не будет особо сложным ‒ давайте просто отправим сообщение, примем его и выведем на экран. Для этого нам потребуется две программы: одна будет отправлять сообщения, другая ‒ принимать и выводить их на экран.
Общая схема такова:
RabbitMQ — Hello World
Продюсер отправляет сообщения в очередь с именем «hello», а клиент получает сообщения из этой очереди.

Библиотека RabbitMQ

RabbitMQ использует протокол AMQP. Для использования RabbitMQ необходима библиотека, поддерживающая этот протокол. Такие библиотеки можно найти практически для каждого языка программирования. Python ‒ не исключение, для него есть несколько библиотек:

В примерах будет использована библиотека pika. Ее можно установить при помощи менеджера пакетов pip:

$ sudo pip install pika==0.9.5

Если отсутствуют pip или git-core, то сначала необходимо установить их:

  • Для Ubuntu:
    $ sudo apt-get install python-pip git-core
    

  • Для Debian:
    $ sudo apt-get install python-setuptools git-core
    $ sudo easy_install pip
    

  • Для Windows (для установки easy_install необходимо запустить MS Windows Installer для setuptools):
    > easy_install pip
    > pip install pika==0.9.5
    

Отправка сообщений

RabbitMQ — Hello World
Наша первая программа send.py будет просто отправлять одно сообщение в очередь.

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()

Мы подключились к брокеру сообщений, находящемуся на локальном хосте. Для подключения к брокеру, находящемуся на другой машине, достаточно заменить «localhost» на IP адрес этой машины.
Перед отправкой сообщения мы должны убедиться, что очередь, получающая сообщение, существует. Если отправить сообщение в несуществующую очередь, RabbitMQ его проигнорирует. Давайте создадим очередь, в которую будет отправлено сообщение, назовем ее «hello»:

channel.queue_declare(queue='hello')

Теперь все готово для отправки сообщения. Наше первое сообщение будет содержать строку «Hello World!» и будет отправлено в очередь с именем «hello».
Вообще, в RabbitMQ сообщения не отправляются непосредственно в очередь, они должны пройти через exchange (точка обмена). Но сейчас мы не будем заострять на этом внимание, точки обмена будут рассмотрены в третьей части учебника. Сейчас достаточно знать, что точку обмена по-умолчанию можно определить, указав пустую строку. Это специальная точка обмена ‒ она позволяет определять, в какую именно очередь отправлено сообщение. Имя очереди должно быть определено в параметре routing_key:

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"

Перед выходом из программы необходимо убедиться, что буфер был очищен и сообщение дошло до RabbitMQ. В этом можно быть уверенным, если использовать безопасное закрытие соединения с брокером.

connection.close()

Получение сообщений

RabbitMQ — Hello World
Наша вторая программа receive.py будет получать сообщения из очереди и выводить их на экран.
Также как и в первой программе, сначала необходимо подключиться к RabbitMQ. Для этого следует использовать тот же код, как и ранее. Следующий шаг, как и прежде ‒ убедиться, что очередь существует. Команда queue_declare не будет создавать новую очередь, если она уже существует, поэтому сколько бы раз не была вызвана эта команда, все-равно будет создана только одна очередь.

channel.queue_declare(queue='hello')

Вы можете задаться вопросом, почему мы объявляем очередь снова, ведь она уже была объявлена в первой программе. Это нужно, чтобы быть уверенным в существовании очереди, так будет, если сначала будет запущена программа send.py. Но мы не знаем, какая программа будет запущена раньше. В таких случаях лучше объявить очередь в обеих программах.

Мониторинг очередей

Если Вы хотите посмотреть, какие очереди существуют в RabbitMQ на данный момент, Вы можете сделать это с помощью команды rabbitmqctl (потребуются права суперпользователя):

$ sudo rabbitmqctl list_queues
Listing queues ...
hello    0
...done.

(для Windows ‒ без sudo)

[в нашей компании используют более удобный скрипт мониторинга:]

watch 'sudo /usr/sbin/rabbitmqctl list_queues name messages_unacknowledged messages_ready messages durable auto_delete consumers | grep -v "..." | sort | column -t;'

[скрипт выводит и обновляет каждые 2 секунды таблицу со списком очередей: имя очереди; количество сообщений в обработке; количество сообщений готовых к обработке; общее количество сообщений; устойчивость очереди к перезагрузке сервиса; является ли временной очередью; количество подключенных клиентов]

Получение сообщений из очереди более сложный процесс, чем отправка. Получение осуществляется при помощи подписки с использованием callback функции. При получении каждого сообщения библиотека Pika вызывает эту callback функцию. В нашем примере она будет выводить на экран текст сообщения.

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

Далее, нам нужно обозначить, что callback функция будет получать сообщения из очереди с именем «hello»:

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

Здесь необходимо быть уверенным в том, что очередь, на которую мы хотим подписаться, была объявлена. Мы сделали это ранее с помощью команды queue_declare.
Параметр no_ack будет рассмотрен позже [во второй части учебника].
И, наконец, запуск бесконечного процесса, который ожидает сообщения из очереди и вызывает callback функцию, когда это необходимо.

print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

Ну а теперь все вместе

Полный код send.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

Полный код receive.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

channel.start_consuming()

Теперь мы можем попробовать запустить наши программы в терминале. Сначала отправим сообщение при помощи программы send.py:

$ python send.py
[x] Sent 'Hello World!'

Выполнение этой программы будет завершаться после отправки каждого сообщения. Теперь сообщение нужно получить:

$ python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'

Отлично! Мы отправили наше первое сообщение через RabbitMQ. Как Вы могли заметить, выполнение программы receive.py не завершилось. Она будет ожидать следующих сообщений, а остановить ее можно, нажав Ctrl+C.
Попробуйте запустить send.py снова в новом окне терминала.
Мы изучили, как отправлять и получать сообщения через именованные очереди. В следующей части учебника мы создадим простую очередь задач [ресурсоемких].

UPD: библиотеку, работающую с RabbitMQ, для своего любимого ЯП Вы можете найти на официальном сайте тут [5].

Автор: zTrue


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/python/13266

Ссылки в тексте:

[1] первого: http://www.rabbitmq.com/tutorials/tutorial-one-python.html

[2] py-amqplib: http://barryp.org/software/py-amqplib/

[3] txAMQP: https://launchpad.net/txamqp

[4] pika: http://github.com/pika/pika

[5] тут: http://www.rabbitmq.com/devtools.html