Всем привет! Хотелось бы поделиться опытом использования python asyncio. За полтора года использования в продакшене накопился некоторый опыт, общие приемы, облегчающие жизнь. Естественно, были и грабли, о которых также стоит упомянуть, ибо это поможет сэкономить кучу времени тем, кто только начинает использовать в своих приложениях asyncio. Кому интересно — прошу под кат.
Немного истории
Asyncio появился в Pyhton версии 3.4, в 3.5 был добавлен более приятный глазу async/await синтаксис. Asyncio предоставляет из коробки Event loop, Future, Task, Coroutine, I/O multiplexing, Synchronization primitives. Это, конечно, не мало, но для полноценной разработки недостаточно. Для этого есть сторонние библиотеки. Отличная подборка есть вот тут. У себя в компании мы используем asyncio вместе с набором сторонних библиотек для написания микросервисов. По своей природе наши сервисы больше ориентированы на I/O нежели на CPU, так что для нас asyncio отлично подходит.
Собственно факты
Это не учебник по asyncio. Я не буду объяснять, почему асинхронный ввод/вывод это хорошо, или почему бы не использовать потоки. Не будет рассказов о корутинах, генераторах, event loop'ах и т.д. Также тут не будет никаких бенчмарков и сравнений с другими языками. Поехали!
Debug
Во-первых, PYTHONASYNCIODEBUG. Это переменная окружения, которая включает дебаг режим. Например, можно увидеть сообщения о том, что вы объявили функцию как корутину, но вызываете как обычную функцию(актуально для python3.4). Также необходимо настроить asyncio logger на уровень дебаг и еще разрешить вывод ResourseWarning. Можно увидеть много интересного: сообщения о том, что вы забыли закрыть транспорт или сам event loop(читай — забыли освободить ресурсы). Сравните запуск следующего кода с параметром интерпретатора -Wdefault и переменной окружения PYTHONASYNCIODEBUG=1 и без них (здесь и далее в примерах кода я буду опускать некоторые несущественные части такие как import или обработка исключений):
@asyncio.coroutine
def test():
pass
loop = asyncio.get_event_loop()
test()
Правильное завершение
Кстати об освобождении ресурсов. Event loop надо уметь правильно остановить, дождавшись корректного заверешения все тасок, закрытия соединений и т.д. И если с использованием run_until_complete() особых проблем нет, то с run_forever() все немного сложнее. Метод close() у event loop'а можно вызвать, только если он уже остановлен — т.е. после метода stop(). Лучше всего это сделать с помощью сигналов:
def handler(loop):
loop.remove_signal_handler(signal.SIGTERM)
loop.stop()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, handler, loop)
try:
loop.run_forever()
finally:
loop.close()
Далее в примерах кода я все же буду концентрироваться на самой сути, а не на правильном завершении программы.
Запуск блокирующего кода
Естественно, не для всего есть асинхронные библиотеки. Некоторый код так и остается блокирующим, и его надо как-то запускать, чтобы он не блокировал наш event loop. Для этого есть хороший метод run_in_executor(), который запускает то, что вы ему передали в одном из потоков встроенного пула, не блокируя основной поток с event loop'ом. Все бы хорошо, но с этим есть 2 проблемы. Во-первых, размер стандартного пула всего 5. Во-вторых, в asyncio синхронный dns resolver, который запускается именно таким образом во встроенном пуле. Значит, за пул всего в 5 потоков будут конкурировать ваши синхронные операции, плюс все кому надо сделать getaddrinfo(). Выход — использовать свой пул. Всегда:
def blocking_function():
time.sleep(42)
pool = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())
loop = asyncio.get_event_loop()
loop.run_in_executor(pool, blocking_function())
loop.close()
Коварные Future
У Future есть одна очень интересная особенность: если в ней произойдет исключение — вы об этом ничего не узнаете, если только явно не спросите об этом у самой future. В документации есть хороший пример на эту тему. Вы увидите, что было исключение, только когда gc будет удалять объект future. Отсюда следует простое правило — всегда проверяете результат вашей future. Даже если по вашей задумке код внутри future должен просто крутиться в бесконечном цикле, и, казалось бы, негде проверять результат — все равно надо обработать исключения, например так:
async def handle_exception():
try:
await bug()
except Exception:
print('TADA!')
async def bug():
raise Exception()
loop = asyncio.get_event_loop()
loop.create_task(handle_exception())
loop.run_forever()
loop.close()
await и __init__()
Невозможно. Магический метод __init__() не может содержать асинхронный код. Есть два пути. Или сделать у класса еще один метод, например, initialize(), который уже будет корутиной. Он будет содержать весь асинхронный код для инициализации, и его надо будет вызывать после создания объекта. Выглядит ужасно. Поэтому принято использовать функции-фабрики. Поясню на примере:
class Foo:
def __init__(self, reader, writer, loop, *args, **kwargs):
self._reader = reader
self._writer = writer
self._loop = loop
async def create_foo(loop):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888, loop=loop)
return Foo(reader, writer, loop)
loop = asyncio.get_event_loop()
foo = loop.run_until_complete(create_foo(loop))
print(foo)
loop.close()
Wake up, Neo
Скажем, у вас есть таска, которая крутится в event loop'е и периодически сбрасывает какой-нибудь буфер. Можно написать такой код:
async def flush_task():
while True:
# flushing...
await asyncio.sleep(FLUSH_TIMEOUT)
Сделать create_task() — и все вроде бы хорошо, кроме одного: что делать, если по завершении вам необходимо принудительно сбросить содержимое буфера? Как заставить таску «проснусться»? Тут на помощь приходят примитивы синхронизации:
class Foo:
def __init__(self, loop, *args, **kwargs):
self._loop = loop
self._waiter = asyncio.Event()
self._flush_future = self._loop.create_task(self.flush_task())
async def flush_task(self):
while True:
try:
await asyncio.wait_for(self._waiter.wait(), timeout=FLUSH_TIMEOUT, loop=self._loop)
except asyncio.TimeoutError:
pass
# flushing ...
self._waiter.clear()
def force_flush():
self._waiter.set()
loop = asyncio.get_event_loop()
foo = Foo(loop)
loop.run_forever()
loop.close()
Тестирования
Тестировать асинхронный код можно и нужно. И делать это так же просто, как и в случае синхронного кода:
class TestCase(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
def tearDown(self):
self.loop.close()
def test_001(self):
async def func():
self.assertEqual(42, 42)
self.loop.run_until_complete(func())
Тесты отлично изолированы, т.к. в каждом новом тесте используется свой event loop. А можно пойти дальше и использовать pytest, где есть удобные декораторы.
Источники вдохновения
Прежде всего — личный опыт. Многое из перечисленного было осознано в результате «ловли граблей», а затем изучения документации и исходников asyncio. Также отличными примерами послужили исходники популярных библиотек, таких как aiohttp, aioredis, aiopg.
Спасибо всем, кто дочитал статью до конца. Удачи с asyncio!
Автор: Amelius0712