- PVSM.RU - https://www.pvsm.ru -
Привет! Меня зовут Александр, я руководитель бэкенд-разработки в KTS [1].
Сегодня я покажу, как написать своего Телеграм-бота на основе asyncio и aiohttp. Мы не будем использовать ни aiogram, ни любые другие библиотеки, а напишем бота с нуля, чтобы немного познакомиться с асинхронным программированием, корутинами и некоторыми примитивами синхронизации.
Что будет в статье:
Echo-бот [3]
Архитектура бота [4]
Реализация схемы:
poller.py [5]
worker.py [6]
Class Bot [7]
Остановка бота [8]
Заключение [9]
Бот — сущность в мессенджере.
Они есть практически везде: в Телеграме, Инстаграме, ВК. Это программа, которая на основе определенных алгоритмов отвечает на сообщения пользователей. Задачи и направления самые разные: бот может просто присылать сообщения «С добрым утром!» А может, например, управлять участниками внутри чата.
Перед началом работы с ботом его нужно создать и познакомиться с Telegram API.
Шаг 1. Сначала создаем бота с помощью BotFather внутри Телеграма:
https://core.telegram.org/bots#6-botfather [10]
После создания вы получите сообщение:
Done! Congratulations on your new bot...
Use this token to access the HTTP API:
<token>
...
В сообщении будет токен, который нужно использовать для запросов в Телеграм.
Шаг 2. Выполните первый запрос к Telegram API по инструкции: https://core.telegram.org/bots/api#authorizing-your-bot [11]
В ответ вы должны получить что-то подобное:
{"ok":true,"result":{"id":2065163148,"is_bot":true,"first_name":"Metaclass","username":"metaclassbot","can_join_groups":true,"can_read_all_group_messages":false,"supports_inline_queries":false}}
Шаг 3. Далее нужно научиться получать уведомления из Телеграма. Существует 2 способа:
webhook [12] — инициатором запроса выступает Телеграм.
Когда пользователь пишет боту, Телеграм делает запрос на URL, который вы установите с помощью метода setwebhook. У этого метода есть недостатки: его трудно отлаживать, так как должен быть публичный адрес бота. Также на каждое сообщение выполняется HTTP-запрос, и при 1000+ сообщений в секунду серверы не справятся.
long polling [13] — инициатором является ваше приложение.
Оно обращается к Telegram API и получает уведомления или ожидает, если уведомлений нет — отсюда название long.
Мы для получения уведомлений будем использовать long polling. Описание метода [13].
Выполните в браузере такой запрос:https://api.telegram.org/bot<token>/getUpdates
Он завершится моментально и вернет:{"ok":true,"result":[]}
Теперь выполните другой запрос:https://api.telegram.org/bot<token>/getUpdates?timeout=30
Он будет висеть 30 секунд и, если ничего не написать боту, вернет: {"ok":true,"result":[]}
Если написать боту хоть что-то, он моментально вернет Update [14].
Шаг 4. Теперь давайте научимся отправлять сообщение пользователю в ответ. Сначала отправим сообщение боту, для чего используем метод sendMessage [15]. Метод принимает два обязательных параметра:
chat_id
— поле, которое пришло в объекте Update. Может быть как id персонального чата, так и id группового чата.
text
Выполните в браузере запрос. chat_id
нужно получить из предыдущего запроса: https://api.telegram.org/bot<token>/sendMessage?chat_id=85364161&text=hello.
В результате бот напишет вам сообщение hello.
Для работы с методами Telegram API будем использовать класс TgClient [16].
Для начала давайте получим сообщения из Telegram. Для этого нужно вызвать метод get_updates
класса TgClient
:
import asyncio
import os
from clients.tg import TgClient
async def run_echo():
c = TgClient(os.getenv("BOT_TOKEN"))
print(await c.get_updates(offset=0, timeout=5))
if __name__ == "__main__":
asyncio.run(run_echo())
Обратите внимание, что код получает токен бота из переменной окружения, поэтому перед запуском нужно установить значение переменной [17] BOT_TOKEN
.
При запуске кода может быть два исхода:
Если боту отправляли сообщение не более 24 часов назад, метод get_updates
вернет все последние сообщения. Чтобы было проще, далее по тексту я буду использовать термин «новые сообщения», а не «новые объекты update [14]».
{'ok': True, 'result': [{'update_id': 503972397, 'message': {'message_id': 335, 'from': {'id': 85364161, 'is_bot': False, 'first_name': 'Alexander', 'last_name': 'Opryshko', 'username': 'alexopryshko', 'language_code': 'en'}, 'chat': {'id': 85364161, 'first_name': 'Alexander', 'last_name': 'Opryshko', 'username': 'alexopryshko', 'type': 'private'}, 'date': 1644871206, 'text': '1'}}]}
Если боту не писали вообще, или писали давно, он зависнет на 5 секунд (timeout=5) и вернет пустой список:
{'ok': True, 'result': []}
при этом, если боту написать во время ожидания, он моментально вернет результат, как и в первом случае.
Чтобы получать сообщения из Телеграма постоянно, в цикле нужно вызывать get_updates
, так как он завершается сразу после написания боту нового сообщения.
async def run_echo():
c = TgClient(os.getenv("BOT_TOKEN"))
while True:
print(await c.get_updates(offset=0, timeout=60))
Но если запустить такой код, мы будем получать последние сообщения из Телеграма в бесконечном цикле. А чтобы получать только новые, нужно использовать параметр offset
.
async def run_echo():
c = TgClient(os.getenv("BOT_TOKEN"))
offset = 0
while True:
res = await c.get_updates(offset=offset, timeout=60)
for item in res["result"]:
offset = item["update_id"] + 1
print(item)
Правило Телеграма: после получения новой пачки сообщений нужно взять из ответа параметр update_id
и следующий запрос выполнять с offset
на единицу больше, чем последнее сообщение, пришедшее из get_updates
.
Теперь можно завершить написание полноценного echo-бота:
async def run_echo():
c = TgClient(os.getenv("BOT_TOKEN"))
offset = 0
while True:
res = await c.get_updates_in_objects(offset=offset, timeout=60)
for item in res.result:
offset = item.update_id + 1
await c.send_message(item.message.chat.id, item.message.text)
В функции заменили get_updates
на get_updates_in_objects
, потому что гораздо удобнее оперировать объектами, чем словарями.
Текущая реализация имеет большой недостаток — бот не работает параллельно: после получения обновления он сразу начинает выполнять бизнес-логику бота. В нашем случае он отправляет echo-сообщение, и в это время новые сообщения от бота получаться не будут. Получается, другие пользователи простаивают. Нужно как-то организовать параллельную обработку пользователей и обеспечить возможность масштабирования.
Поэтому мы пойдем другим путем.
Введем сущность poller. Он будет получать сообщения из Телеграма и ставить их в очередь, никакую бизнес-логику он не реализует. Он должен быть в единственном экземпляре.
Введем сущность worker. Он будет выполнять все рабочие задачи. worker берет задачу из очереди и каким-то образом выполняет ее.
Сущностей worker может быть много:
Такая схема лучше изначальной по двум причинам:
при возрастании нагрузки мы можем соответственно увеличить количество worker;
сообщения от пользователей обрабатываются параллельно.
import asyncio
from clients.tg import TgClient
class Poller:
def __init__(self, token: str, queue: asyncio.Queue):
self.tg_client = TgClient(token)
self.queue = queue
async def _worker(self):
offset = 0
while True:
res = await self.tg_client.get_updates_in_objects(offset=offset, timeout=60)
for u in res.result:
offset = u.update_id + 1
print(u)
self.queue.put_nowait(u)
async def start(self):
asyncio.create_task(self._worker())
poller в точности повторяет логику echo-бота, за исключением отправки echo-сообщения:
работает бесконечно;
получает уведомления из Телеграма;
кладет сообщения в очередь.
Логика получения уведомлений описана в методе _worker
, для запуска получения уведомлений нужно запустить именно его. Но просто вызвать await self._worker()
не получится, потому что мы заблокируем основной поток выполнения, а нам еще нужно запустить worker, который будет вычитывать сообщения из очереди. Поэтому нужно запустить фоновую задачу с помощью asyncio.create_task. [18]
Теперь рассмотрим, как запустить poller.
import asyncio
import os
from bot.poller import Poller
async def start():
q = asyncio.Queue()
poller = Poller(os.getenv("BOT_TOKEN"), q)
await poller.start()
def run():
loop = asyncio.get_event_loop()
try:
print('bot has been started')
loop.create_task(start())
loop.run_forever()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
run()
Так как бот должен работать бесконечно, то необходимо организовать бесконечный цикл. В echo-боте мы просто оставили while True
, но в текущей реализации так сделать будет неудобно, поэтому лучше использовать метод запуска run_forever
, предварительно положив все необходимые задачи в event loop с помощью метода create_task.
Документация по методу run_forever [19]
Документация по методу create_task [20]
Обратите внимание:
1. В коде poller используется
asyncio.create_task
, а при его запуске используетсяloop.create_task
.
Отличие заключается в том, что мы явно указали, какойevent loop
нужно использовать в синхронной функцииdef run
. В асинхронных функцияхasync def loop
явно можно не указывать, потому что Python сам знает текущий цикл событий и прикрепляет задачу к нему. Функция, из которой запускаетсяcreate_task,
тоже запущена в этомloop
2. Если в запущенной в фоне корутине
create_task
происходит исключение, мы можем не увидеть его сразу, только после остановкиevent loop
. Из-за этого могут возникнуть сложности в нахождении ошибок. Почитать подробнее в этой статье [21].
import asyncio
from clients.tg import TgClient
from clients.tg.dcs import UpdateObj
class Worker:
def __init__(self, token: str, queue: asyncio.Queue, concurrent_workers: int):
self.tg_client = TgClient(token)
self.queue = queue
self.concurrent_workers = concurrent_workers
async def handle_update(self, upd: UpdateObj):
print("before", upd)
await asyncio.sleep(1)
print("after", upd)
async def _worker(self):
while True:
upd = await self.queue.get()
await self.handle_update(upd)
async def start(self):
for _ in range(self.concurrent_workers):
asyncio.create_task(self._worker())
У worker есть несколько кардинальных отличий от poller:
для обеспечения параллельной обработки входящих сообщений запускается несколько _worker
, а количество параллельных воркеров регулируется параметром concurrent_workers
;
новые сообщения приходят не из Telegram, а из очереди, которую предварительно заполнил poller;
внутри handle_update
— который запускается при появлении нового сообщения в очереди — реализуется бизнес-логика обработки сообщения, т.е. бизнес-логика бота.
Теперь добавим запуск worker в корутину async def start.
Обратите внимание, что очередь должна быть общая между poller и worker:
async def start():
q = asyncio.Queue()
poller = Poller(os.getenv("BOT_TOKEN"), q)
await poller.start()
worker = Worker(os.getenv("BOT_TOKEN"), q, 2)
await worker.start()
На этом минимальная реализация бота готова, но есть нюансы, которые стоит улучшить:
в корутине start
мы оперируем внутренними компонентами бота: очередью, poller, worker. Было бы хорошо иметь сущность Bot с одним методом “start”;
при остановке бота он завершается моментально, не дожидаясь выполнения запущенной логики бота. Поэтому может быть такое, что мы прервем пользовательский сценарий на середине.
Всю работу с компонентами бота вынесем в отдельный класс Bot:
import asyncio
from bot.poller import Poller
from bot.worker import Worker
class Bot:
def __init__(self, token: str, n: int):
self.queue = asyncio.Queue()
self.poller = Poller(token, self.queue)
self.worker = Worker(token, self.queue, n)
async def start(self):
await self.poller.start()
await self.worker.start()
И перепишем функцию run
:
import asyncio
import os
from bot.base import Bot
def run():
loop = asyncio.get_event_loop()
bot = Bot(os.getenv("BOT_TOKEN"), 2)
try:
print('bot has been started')
loop.create_task(bot.start())
loop.run_forever()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
run()
Код запуска стал чище, теперь не нужно думать про внутренние компоненты бота. Достаточно запустить bot.start(), и бот начнет функционировать
Сейчас остановка происходит при нажатии Ctrl + C. Возникает исключение KeyboardInterrupt
, которое мы ловим и молча завершаем работу бота:
try:
print('bot has been started')
loop.create_task(bot.start())
loop.run_forever()
except KeyboardInterrupt:
pass
Почему нужно делать красивое завершение (graceful shutdown):
1. Бизнес-логика бота может прерваться посередине, и для пользователя это будет выглядеть багом.
Пример: пользователь отправил файл боту, бот отправил сообщение, что файл загружается, и в этот момент его остановили. Бот будет загружать файл вечно, а пользователь останется в недоумении.2. Если в боте есть подключения к другим компонентам, например, к базе данных или очереди, их нужно корректно завершать.
Поэтому введем функцию stop
в Bot и каждый внутренний компонент. Она будет отвечать за корректное завершение. Начнем с poller.
Чтобы отменить запущенную задачу, нужно вызвать у нее метод cancel
. https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel [22]
class Poller:
def __init__(self, token: str, queue: asyncio.Queue):
self.tg_client = TgClient(token)
self.queue = queue
self._task: Optional[Task] = None
async def _worker(self): ...
async def start(self):
self._task = asyncio.create_task(self._worker())
async def stop(self):
self._task.cancel()
Для этого введем переменную _task,
в которую сохраним объект созданной задачи, а в момент остановки poller вызовем cancel
.
В случае с worker нельзя просто взять и вызвать cancel
у всех запущенных задач:
нужно обработать все задачи, которые poller положил в очередь, иначе обновления из Телеграма просто потеряются;
нельзя прерывать обработку конкретной задачи, т.е. у запущенной задачи cancel вызвать нельзя.
Получается, нужно дождаться выполнения всех задач, которые находятся в очереди. Для такой задачи есть метод join
.
https://docs.python.org/3/library/asyncio-queue.html#asyncio.Queue.join [23]
class Worker:
def __init__(self, token: str, queue: asyncio.Queue, concurrent_workers: int):
...
self._tasks: List[asyncio.Task] = []
async def handle_update(self, upd: UpdateObj):
...
async def _worker(self):
while True:
try:
upd = await self.queue.get()
await self.handle_update(upd)
finally:
self.queue.task_done()
async def start(self):
self._tasks = [asyncio.create_task(self._worker()) for _ in range(self.concurrent_workers)]
async def stop(self):
await self.queue.join()
for t in self._tasks:
t.cancel()
Давайте разберемся, что мы сделали:
в методе start
сохранили все запущенные задачи в self._tasks
;
внутри метода stop
перед вызовом метода cancel
у всех задач дождались, когда все задачи из очереди будут выполнены с помощью await self.queue.join()
;
с помощью self.queue.task_done()
помечаем «выполненными» задачи внутри метода _worker
.
В итоге получается:
остановили poller, новые задачи не добавляются в очередь.
внутри worker ждем, пока выполнятся все задачи (self.queue.join())
, и только после этого вызываем отмену воркеров.
так как все задачи завершились, а новые не поступают — poller остановлен — то можно вызывать cancel
у задач и не бояться прервать бизнес-логику бота.
Теперь добавим в Bot метод stop
:
async def stop(self):
await self.poller.stop()
await self.worker.stop()
И после возникновения исключения KeyboardInterrupt
запустим остановку бота с помощью loop.run_until_complete.
https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_until_complete [24]
try:
print('bot has been started')
loop.create_task(bot.start())
loop.run_forever()
except KeyboardInterrupt:
print("nstopping", datetime.datetime.now())
loop.run_until_complete(bot.stop())
print('bot has been stopped', datetime.datetime.now())
Чтобы проверить, что мы все сделали правильно, можно добавить asyncio.sleep(10)
в метод handle_update
, отправить боту сообщение и попробовать завершить бота раньше, чем через 10 секунд. В итоге должна получиться подобная картина:
Цель этой статьи — показать на примере, как работать с базовым асинхронным программированием и примитивами синхронизации asyncio, а задача написания бота хорошо подходит для этих целей. Исходный код [25].
Написать такого бота — одно из домашних заданий курса «Асинхронное программирование» в нашей школе Metaclass [26].
11-го апреля у нас стартует третий поток курса. Если хотите разобраться в этой теме, записывайтесь по ссылке выше.
Чтобы быть в курсе новых потоков, вступайте в чат школы в Телеграме: https://t.me/kts_dev [27]
Автор:
alexopryshko
Источник [28]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/python/373024
Ссылки в тексте:
[1] KTS: https://kts.studio/
[2] Основы ботоведения: #1
[3] Echo-бот: #2
[4] Архитектура бота: #3
[5] poller.py: #4
[6] worker.py: #5
[7] Class Bot: #6
[8] Остановка бота: #7
[9] Заключение: #8
[10] https://core.telegram.org/bots#6-botfather: https://core.telegram.org/bots#6-botfather
[11] https://core.telegram.org/bots/api#authorizing-your-bot: https://core.telegram.org/bots/api#authorizing-your-bot
[12] webhook: https://core.telegram.org/bots/api#setwebhook
[13] long polling: https://core.telegram.org/bots/api#getupdates
[14] Update: https://core.telegram.org/bots/api#update
[15] sendMessage: https://core.telegram.org/bots/api#sendmessage
[16] TgClient: https://github.com/ktsstudio/webinar-tgbot/blob/4483de35056d1628357c4f64749ca622850b2e38/clients/tg/api.py#L8
[17] установить значение переменной: https://www.twilio.com/blog/2017/01/how-to-set-environment-variables.html
[18] запустить фоновую задачу с помощью asyncio.create_task.: https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task)
[19] Документация по методу run_forever: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_forever
[20] Документация по методу create_task: https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
[21] статье: https://quantlane.com/blog/ensure-asyncio-task-exceptions-get-logged/
[22] https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel: https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel
[23] https://docs.python.org/3/library/asyncio-queue.html#asyncio.Queue.join: https://docs.python.org/3/library/asyncio-queue.html#asyncio.Queue.join
[24] https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_until_complete: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_until_complete
[25] Исходный код: https://github.com/ktsstudio/webinar-tgbot
[26] «Асинхронное программирование» в нашей школе Metaclass: http://metaclass.kts.studio/aiohttp?utm_source=habr
[27] https://t.me/kts_dev: https://www.youtube.com/redirect?event=video_description&redir_token=QUFFLUhqbFhhcUNTQVlQMm4xZDA2XzJJRVpXM2NTaGJBd3xBQ3Jtc0trY0Z1MGduZmlGY1NOQ3RfUmlCTUNuTWJ4eHFrT1NWWGthUWJxNDhia01QLVU5MXRvNzc1SkNDRWx2d2FwMVlJY0VhTnpLNHAzWGowZmpzREJKOGlnY015WFJtV0RhYVpSaE43VGExal9yUmhTcm5HQQ&q=https%3A%2F%2Ft.me%2Fkts_dev
[28] Источник: https://habr.com/ru/post/598575/?utm_source=habrahabr&utm_medium=rss&utm_campaign=598575
Нажмите здесь для печати.