Попробуем подключить Celery/RabbitMQ к нашему проекту. В качестве основы возьмем проект с Flask'ом. Celery займется вычислением случайного числа.
Установка проекта
Клонируем проект на свой компьютер:
git clone https://github.com/nomhoi/tic-tac-toe-part6.git
Запускаем контейнеры:
cd tic-tac-toe-part6
docker-compose up -d
Выполняем сборку веб-приложения:
cd front
npm install
npm run-script build
Открываем броузер по адресу http://localhost.
Docker контейнеры
Сервис nginx остался без изменений. В сервис flask добавили установку пакета Celery в файл requirements.txt и смонтировали папку с исходником проекта Celery:
volumes:
- ./flask:/code
- ./celery/app/proj:/code/proj
Добавились новые сервисы celery и rabbit.
celery:
container_name: celery
build:
context: celery/
dockerfile: Dockerfile
volumes:
- ./celery/app:/app
depends_on:
- rabbit
networks:
- backend
rabbit:
container_name: rabbit
hostname: rabbit
image: rabbitmq:3.7.15-alpine
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=CT2gNABH8eJ9yVh
ports:
- "5672:5672"
networks:
- backend
Сервис celery
Сервис celery выполнен на базе этого туториала. Кто не знаком с Celery, имеет смысл тут-же пройтись по этому туториалу:
$ docker exec -it celery python
Python 3.7.3 (default, May 11 2019, 02:00:41)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from proj.tasks import add
>>> add.delay(2, 2)
<AsyncResult: 43662174-657f-4dd3-ab1a-22f5950c8794>
>>>
Как видим, наш проект в Celery оформлен в виде пакета proj. В задачи Celery в файл tasks.py добавлена наша задача getnumber:
@app.task
def getnumber():
return randrange(9)
Сервис flask
Напомню, что в этот сервис мы добавили пакет Celery и смонтировали папку с проектом proj. Исходный код этого проекта теперь присутствует в двух сервисах flask и celery.
from flask import Flask, jsonify
from proj.tasks import getnumber
from proj.celery import app as celery
app = Flask(__name__)
@app.route('/number')
def number():
task = getnumber.delay()
return task.id
@app.route('/result/<task_id>')
def result(task_id):
task = getnumber.AsyncResult(task_id)
result = task.get(timeout = 3)
response = {
'state': task.state,
'number': result,
}
return jsonify(response)
В обработчике number мы вызываем задачу getnumber, которая выполняется в воркере celery и возвращаем идентификатор задачи. В обработчике result мы получаем результат выполненной задачи по идентификатору и возвращаем ответ в JSON формате фронтенду.
Фронтенд
В диспетчере нашей игры по нажатию кнопки Get random number сначала отправляем запрос бэкенду по адресу /number и получаем от него идентификатор задачи Celery. После этого в функции getResult периодически отправляем запрос бэкенду на получение результата по адресу /result/<task_id>.
async function getResult(task_id) {
var i = 1;
var timerId = setTimeout(async function go() {
console.log("Result request: " + i);
console.log("Task Id: " + task_id)
const res = await fetch(`result/` + task_id);
const response = await res.text();
if (res.ok) {
let result = JSON.parse(response);
console.log(result)
if (result.state === 'SUCCESS') {
let i = parseInt(result.number);
if ($status === 1 || $history.state.squares[i]) {
promise_number = result.number + ' - busy';
return;
}
promise_number = i;
history.push(new Command($history.state, i));
return;
}
}
if (i < 5)
setTimeout(go, 500);
i++;
}, 500);
}
Изменили вывод результатов запросов к бэкенду:
{#await promise}
<p>...подождите</p>
{:then taskid}
<p>Task Id: {taskid}</p>
{:catch error}
<p style="color: red">{error.message}</p>
{/await}
{#await promise_number}
<p>...подождите</p>
{:then number}
<p>Number: {number}</p>
{:catch error}
<p style="color: red">{error.message}</p>
{/await}
Домашнее задание
На самом деле сейчас результат приходит сразу после первого запроса. Попробуйте нашего интеллектуального агента живущего в celery сделать немного задумчивым, чтобы не сразу выдавал ответ.
Время от времени начинает приходить ошибка от flask'a "500 (INTERNAL SERVER ERROR)", это в celery поднимается исключение "celery.exceptions.TimeoutError: The operation timed out.". Помогает только перезагрузка сервисов. Пока не копал в чем дело, пожалуйста, посмотрите.
В getResult обрабатывается ответ только с состоянием SUCCESS, в остальных случаях выполняется повторный запрос. Можно добавить обработку ответов с FAILURE и PENDING. Формирование ответа в обработчике result также может зависеть от состояния задачи.
Вместо брокера сообщений RabbitMQ можно попробовать подключить Redis.
Можно попробовать выполнять запросы из приложения через брокеры сообщений.
А также попробовать выполнить это на базе примера с Boost.Beast.
Репозиторий на GitHub
https://github.com/nomhoi/tic-tac-toe-part6
Автор: Владимир Номхоев