В этой статье я предлагаю читателю совершить со мной в меру увлекательное путешествие в недра asyncio, чтобы разобраться, как в ней реализовано асинхронное выполнение кода. Мы оседлаем коллбэки и промчимся по циклу событий сквозь пару ключевых абстракций прямо в корутину. Если на вашей карте питона еще нет этих достопримечательностей, добро пожаловать под кат.
Для затравки — краткая справка о раскинувшейся перед нами местности
asyncio — библиотека асинхронного ввода/вывода которая, согласно pep3153, была создана, чтобы предоставить стандартизованную базу для создания асинхронных фреймворков. pep3156 так же приписывает ей необходимость обеспечить предельно простую интеграцию в уже существовавшие асинхронные фреймворки (Twisted, Tornado, Gevent). Как мы можем сейчас наблюдать, эти цели были успешно достигнуты — появился новый фреймворк на основе asyncio: aiohttp, в Tornado AsyncioMainLoop является циклом событий по умолчанию с версии 5.0, в Twisted asyncioreactor доступен с версии 16.5.0, а для Gevent есть сторонняя библиотека aiogevent.
asyncio — это гибридная библиотека, использующая одновременно два подхода к реализации асинхронного выполнения кода: классический на коллбэка и относительно новый (по крайней мере для питона) на корутинах. В её основе лежат три основные абстракции, являющиеся аналогами абстракций, существующих в сторонних фреймворках:
- Pluggable Event Loop
Подключаемый цикл событий. Подключаемый значит, что он может быть в две строчки кода заменен на другой, реализующий такой же интерфейс. Сейчас есть реализации на cython поверх libuv (uvloop)и на языке Rust (asyncio-tokio). - Future
Результат операции, который будет доступен в будущем. Нужен, чтобы получать в корутинах результат выполнения коллбэков. - Task
Специальный подкласс класса Future для запуска корутин на цикле событий.
Поехали!
Цикл событий — основная составляющая библиотеки, по дорогам, пролегающим через него, данные доставляются в любые её компоненты. Он большой и сложный, поэтому сначала рассмотрим его урезанный вариант.
# ~/inside_asyncio/base_loop.py
import collections
import random
class Loop:
def __init__(self):
# Очередь для хранения коллбэков
self.ready = collections.deque()
def call_soon(self, callback, *args):
# складывает кортэж из коллбэка и его аргументов в очередь
self.ready.append((callback, args))
def run_until_complete(self, callback, *args):
# Этот метод выполняет всё работу по запуску коллбэков
self.call_soon(callback, *args)
# Перекресток вех дорог - основной цикл
# он крутится пока очередь не опустеет
while self.ready:
ntodo = len(self.ready)
# внутренний цикл итерируется столько раз
# сколько было коллбэков в очереди на момент его запуска
for _ in range(ntodo):
# на каждой интерации достаёт из очереди
# один коллбэк и его параметры и запускает
callback, args = self.ready.popleft()
callback(*args)
def callback(loop):
print('Рассказчик')
loop.call_soon(print, 'Читатель')
loop = Loop()
loop.run_until_complete(c, loop)
Оседлав наш маленький коллбэк, мы отправляемся в путь через call_soon, попадаем в очередь и после краткого ожидания будем выведены на экран.
Эпизод про плохие коллбэки
Стоит упомянуть, что коллбэки это опасные лошадки — если они сбросят вас посреди дороги, интерпретатор питона не сможет помочь понять, где это произошло. Если не верите, покатайтесь той же дорогой на коллбэке maybe_print, приходящем к финишу примерно в половине случаев.
# ~/inside_asyncio/base_loop.py
def maybe_print(msg):
if random.randint(0, 1):
raise Exception(msg)
else:
print(msg)
def starting_point(loop): # Место посадки
print('Рассказчик')
loop.call_soon(maybe_print, 'Читатель')
def main(loop):
loop.call_soon(starting_point, loop)
loop.call_soon(starting_point, loop)
loop = Loop()
loop.run_until_complete(main, loop)
Ниже показан полный трейсбэк запуска предыдущего примера. Из-за того, что функция maybe_print была запущена циклом событий, а не напрямую из starting_point, трейсбэк заканчивается именно на нём, в методе run_until_complete. По такому трейсбэку невозможно определить, где в коде находится starting_point, что значительно усложнит отладку, если starting_point будут находится в нескольких местах кодовой базы.
$: python3 base_loop.py
>> Рассказчик # Доехал первый раз
>> Читатель # Доехал первый раз
>> Рассказчик # Доехал второй раз
>> Traceback (most recent call last):
>> File "base_loop.py", line 42, in <module>
>> loop.run_until_complete(main, loop)
>> File "base_loop.py", line 17, in run_until_complete
>> callback(*args)
>> File "base_loop.py", line 29, in maybe_print
>> raise Exception(msg)
>> Exception: Читаель # не доехал второй раз
Непрерывный стек вызовов нужен не только для вывода полного трейсбэка, но и для реализации других возможностей языка. Например, на нём основана обработка исключений. Пример ниже не заработает, потому что к моменту запуска starting_point, функция main уже будет выполнена:
# ~/inside_asyncio/base_loop.py
def main(loop):
try:
loop.call_soon(starting_point, loop)
loop.call_soon(starting_point, loop)
except:
pass
Loop().run_until_complete(main, loop)
Следующий пример тоже не заработает. Менеджер контекста в функции main откроет и закроет файл ещё до того, как будет запущена его обработка.
# ~/inside_asyncio/base_loop.py
def main(loop):
with open('file.txt', 'rb') as f:
loop.call_soon(process_file, f)
Loop().run_until_complete(main, loop)
# тут аналогия с путешествием достигла моего лимита, дальше без неё =(
Отсутствие непрерывного стека вызовов ограничивает использование привычных возможностей языка. Для частичного обхода этого недостатка в asyncio пришлось добавить много дополнительного кода, не относящeгося напрямую к решаемой ей задаче. Этот код, по большей части, отсутствует в примерах — они и без него довольно сложны.
Из цикла событий во внешний мир и обратно
Цикл событий сообщается с внешним миром через операционную систему посредством событий. Код, который умеет с ней работать, предоставляется модулем стандартной библиотеки под названием selectors. Он позволяет сказать операционной системе, что мы ждем какого-то события, а потом спросить, произошло ли оно. В примере ниже ожидаемым событием будет доступность сокета на чтение.
# ~/inside_asyncio/event_loop.py
import selectors
import socket
import collections
from future import Future
from handle import Handle
from task import Task
class EventLoop:
def __init__(self):
self.ready = collections.deque()
# Добавляем селектор
self.selector = selectors.DefaultSelector()
def add_reader(self, sock, callback):
# Регистрируем ожидание доступности сокета на чтение
# параметры:
# сокет,
# константа содержащая битовую маску доступности сокета на чтение
# кортеж с данными которые мы хотим ассоциировать с этим событием
self.selector.register(
sock, socket.EVENT_READ, (self._accept_conn, sock, callback)
)
def _accept_conn(self, sock, callback):
# принимаем входящее соединение
conn, addr = sock.accept()
conn.setblocking(False)
# регистрируем ожидание данных на сокете
self.selector.register(
conn, socket.EVENT_READ, (callback, conn)
)
def run_until_complete(self, callback, *args):
self.call_soon(callback, *args)
# основной цикл крутится пока очередь не пустая или мы ожидаем каких-то событий
while self.ready or self.selector.get_map():
ntodo = len(self._ready)
for _ in range(ntodo):
callback, args = self.ready.popleft()
callback(*args)
# второй подцикл итерируется по наступившим событиям
for key, events in self.selector.select(timeout=0):
# достает коллбэк и аргументы из кортежа с ассоциированными данными
callback, *args = key.data
# добавляет их в очередь на выполнение
self.call_soon(callback, *args)
def call_soon(self, callback, *args):
self.ready.append((callback, args))
def print_data(conn):
print(conn.recv(1000))
def main(loop):
# создаём сокет
sock = socket.socket()
# привязываем к локалхосту на 8086 порту
sock.bind(('localhost', 8086))
sock.listen(100)
sock.setblocking(False)
# добавляем коллбэк для чтения данных
loop.add_reader(sock, print_data)
loop = EventLoop()
# запускаем цикл событий
loop.run_until_complete(main, loop)
Гонец из внешнего мира оставляет своё сообщение или посылку в селекторе, а селектор передаёт её получателю. Теперь стало возможным читать из сокета, используя цикл событий. Если запустить этот код и подключиться с помощью netcat, то он будет добросовестно выводить всё, что в него будет отправлено.
$: nc localhost 8086 $: python3 event_loop.py
"Hi there!" b'"Hi there!"n'
"Hello!" b'"Hello!"n'
"Answer me, please!" b'"Answer me, please!"n'
В начале статьи говорилось, что asyncio — гибридная библиотека, в которой корутины работают поверх коллбэков. Для реализации этой функциональности используются две оставшиеся основные абстракции: Task и Future. Далее будет показан код этих абстракций, а затем, как, используя их цикл событий, выполняются корутины.
Future
Ниже представлен код класса Future. Он нужен для того, чтобы в корутине можно было дождаться завершения выполнения коллбэка и получить его результат.
# ~/inside_asyncio/future.py
import sys
from asyncio import events, CancelledError
class Future:
# хранит состояние коллбэка результат выполнения которого представляет
_state = 'PENDING' # FINISHED, CANCELLED
# стек вызовов до того места где был создан экземпляр Future
# нужен чтобы в случае возникновения исключения вывести понятный трейсбэк
_source_traceback = None
# список коллбэков которые должны быть вызваны когда изменится состояние ожидаемого коллбэка
_callbacks = []
# исключение если оно было возбуждено во время выполения ожидаемого коллбэка
_exception = None
# цикл событий чтобы знать где запускать коллбэки на смену состояния
_loop = None
# результат выполнения ожидаемого коллбэка
_result = None
def __init__(self, loop):
self._loop = loop
self._source_traceback = events.extract_stack(sys._getframe(1))
def add_done_callback(self, callback):
# добавляет коллбэки на смену состояния в список
self._callbacks.append(callback)
def _schedule_callbacks(self):
# запускает коллбэки на смену состояния на цикле событий
for callback in self._callbacks:
self._loop.call_soon(callback, self)
self._callbacks[:] = []
# Один из следующих трёх методов должен быть вызван для изменения состояния Future
# когда ожидаемый коллбэк каким-либо образом завершит свое выполнение
def set_exception(self, exception):
# в случае возникновения исключения сохраняем его
self._exception = exception
# меняем состояние
self._state = 'FINISHED'
# запускаем коллбэки на смену состояния
self._schedule_callbacks()
def set_result(self, result):
# если ожидаемый коллбэк завершился успешно сохраняем результат выполнения
self._result = result
self._state = 'FINISHED'
self._schedule_callbacks()
def cancel(self):
# в случае отмены просто меняем состояние
self._state = 'CANCELLED'
self._schedule_callbacks()
def result(self):
# метод для получения результат
# возбуждает исключение если ожидание завершения коллбэка было отменено
if self._state == 'CANCELLED':
raise CancelledError
# или оно возникло во время выполнения ожидаемого коллбэка
if self._exception is not None:
raise self._exception
# иначе возвращает результат
return self._result
def __await__(self):
# магический метод, вызывается ключевым словом await
# если находимся в состоянии ожидания йилдим себя
if self._state == 'PENDING':
yield self
# иначе пытаемся вернуть результат
return self.result()
Task
Это специальный подкласс класса Future. Он нужен для запуска корутины на коллбэчном цикле событий.
# ~/inside_asyncio/task.py
from asyncio import futures
from future import Future
class Task(Future):
def __init__(self, coro, *, loop=None):
super().__init__(loop=loop)
# сохраняет выполняемую корутину
self._coro = coro
def _step(self, exc=None):
# метод вызываемы циклом событий, нужен чтобы крутить корутину
try:
if exc is None:
# если не получено исключение отправляем в корутину None
# что заставляет её прокрутится на один шаг
result = self._coro.send(None)
else:
# если получено исключение возбуждаем его в корутине
self._coro.throw(exc)
except StopIteration:
result = None
except Exception as exc:
self.set_exception(exc)
else:
# если получили Future из корутины добавляем ей метод
# wakeup как коллбэк на смену состояния
if isinstance(result, Future):
result.add_done_callback(self._wakeup)
# иначе шедулим вызов метода step циклом событий еще раз
elif result is None:
self._loop.call_soon(self._step)
def _wakeup(self, future):
# метод с помощью которого Future возвращает поток выполнения в ожидающую её Task
# с исключением
try:
future.result()
except Exception as exc:
self._step(exc)
# или без в зависимости от успешности завершения Future
else:
self._step()
Цикл событий, умеющий работать с Future
# ~/inside_asyncio/future_event_loop.py
import selectors
from selectors import EVENT_READ, EVENT_WRITE
import socket
import collections
from future import Future
from task import Task
class EventLoop:
def __init__(self):
self._ready = collections.deque()
self.selector = selectors.DefaultSelector()
def run_until_complete(self, callback, *args):
self.call_soon(callback, *args)
while self._ready or self.selector.get_map():
ntodo = len(self._ready)
for _ in range(ntodo):
callback, args = self._ready.popleft()
callback(*args)
for key, events in self.selector.select(timeout=0):
callback, *args = key.data
self.call_soon(callback, *args)
def call_soon(self, callback, *args):
self._ready.append((callback, args))
# Два метода умеющих работать с Future
def sock_accept(self, sock, fut=None):
# метод принимающий входящее соединение на сокете
# создаёт Future если не получил её
fut = fut if fut else Future(loop=self)
try:
# пытается принять входящее соединение
conn, address = sock.accept()
conn.setblocking(False)
except (BlockingIOError, InterruptedError):
# если входящего соединия нет
# регистрирует сам себя на ожидание
# передав свежесозданную Future в качестве параметра
self.selector.register(
sock, EVENT_READ, (self.sock_accept, sock, fut)
)
except Exception as exc:
fut.set_exception(exc)
self.selector.unregister(sock)
else:
# если соединение установлено
# вызывает метод Future для сохранения результата
fut.set_result((conn, address))
self.selector.unregister(sock)
return fut
def sock_recv(self, sock, n, fut=None):
# метод для получения данных из сокета
# практичесски идентичен предыдущему за тем исключением,
# что пытается не принять соединение, а получить данные из сокета
fut = fut if fut else Future(loop=self)
try:
data = sock.recv(n)
except (BlockingIOError, InterruptedError):
self.selector.register(
sock, EVENT_READ, (self.sock_recv, sock, n, fut)
)
except Exception as exc:
fut.set_exception(exc)
self.selector.unregister(sock)
else:
fut.set_result(data)
self.selector.unregister(sock)
return fut
async def main(loop):
sock = socket.socket()
sock.bind(('localhost', 8080))
sock.listen(100)
sock.setblocking(False)
# ожидаем входящего соединения
conn, addr = await loop.sock_accept(sock)
# получаем из него данные
result = await loop.sock_recv(conn, 1000)
print(result)
loop = EventLoop()
# заворачиваем корутину в Task
task = Task(coro=main(loop), loop=loop)
# шедулим метод степ для запуска на цикле событий
loop.run_until_complete(task._step)
Двинемся дальше
Теперь проследим за тем, как корутина main будет выполняться:
__________________________________________________________________
class EventLoop:
def run_until_complete(self, callback, *args):
# task._step добавляется в очередь
self.call_soon(callback, *args)
while self._ready or self.selector.get_map():
ntodo = len(self._ready)
for _ in range(ntodo):
callback, args = self._ready.popleft()
# и практически сразу вызывается
callback(*args) # task._step()
___________________________________________________________________
clsss Task:
def _step(self, exc=None):
try:
if exc is None:
# отправляет None в корутину
result = self._coro.send(None)
else:
___________________________________________________________________
async def main(loop):
# корутина прокручивается на один шаг
# создаётся сокет
sock = socket.socket()
sock.bind(('localhost', 8080))
sock.listen(100)
sock.setblocking(False)
# вызывается метод цикла событий для ожидания входящего соединения
conn, addr = await loop.sock_accept(sock)
result = await loop.sock_recv(conn, 1000)
print(result)
___________________________________________________________________
class EventLoop:
def sock_accept(self, sock, fut=None):
# создаёт экземпляр Future
fut = fut if fut else Future(loop=self)
try:
# пытается принять входящее соединение
conn, address = sock.accept()
conn.setblocking(False)
except (BlockingIOError, InterruptedError):
# так как соединения нет
# регистрирует сам себя на ожидание
# передав свежесозданную Future в качестве параметра
self.selector.register(
sock, EVENT_READ, (self.sock_accept, sock, fut)
)
except Exception as exc:
--------------------------------------------
self.selector.unregister(sock)
# возвращает Future в корутину
return fut
___________________________________________________________________
async def main(loop):
sock = socket.socket()
sock.bind(('localhost', 8080))
sock.listen(100)
sock.setblocking(False)
# ключевое слово await вызывает метод __await__ полученной Future
conn, addr = await loop.sock_accept(sock)
result = await loop.sock_recv(conn, 1000)
print(result)
___________________________________________________________________
class Future:
def __await__(self):
# так как Future находится в состоянии ожидания она йилдит саму себя
if self._state == 'PENDING':
yield self
return self.result()
___________________________________________________________________
class Task(Future):
def _step(self, exc=None):
try:
if exc is None:
# результат йилда пробрасывается напрямую в то место откуда ворутину пришел None
result = self._coro.send(None) # result = fut
--------------------------------
else:
# получили Future из корутины добавляем ей метод
# wakeup как коллбэк на смену состояния
if isinstance(result, Future):
result.add_done_callback(self._wakeup)
elif result is None:
self._loop.call_soon(self._step)
# тут выполнение данной корутины останавливается - крутящие её эксземпляры Task и Future
# ждут входящего соединения
# если бы в очереди были другие коллбэки цикл событий бы переключился на их выполнение
___________________________________________________________________
class EventLoop:
def run_until_complete(self, callback, *args):
self.call_soon(callback, *args)
while self._ready or self.selector.get_map():
ntodo = len(self._ready)
for _ in range(ntodo):
callback, args = self._ready.popleft()
callback(*args)
for key, events in self.selector.select(timeout=0):
# пришло входящее соединение
callback, *args = key.data
self.call_soon(callback, *args) # loop.sock_accept(sock, fut)
___________________________________________________________________
class EventLoop:
def sock_accept(self, sock, fut=None):
fut = fut if fut else Future(loop=self)
try:
# принимаем входящее соединение
conn, address = sock.accept()
conn.setblocking(False)
except (BlockingIOError, InterruptedError):
--------------------------------
else:
# устанавливаем результат Future
fut.set_result((conn, address))
self.selector.unregister(sock)
return fut
___________________________________________________________________
class Future:
def set_result(self, result):
# устанавливает результат
self._result = result
# меняет состояние
self._state = 'FINISHED'
# вызывает коллбэки на смену состояния
self._schedule_callbacks()
def _schedule_callbacks(self):
for callback in self._callbacks:
# у нас только один коллбэк на смену состояния task.wakeup
self._loop.call_soon(callback, self) # (task.wakeup, fut)
self._callbacks[:] = []
___________________________________________________________________
class EventLoop:
def run_until_complete(self, callback, *args):
self.call_soon(callback, *args)
while self._ready or self.selector.get_map():
ntodo = len(self._ready)
for _ in range(ntodo):
callback, args = self._ready.popleft()
# на следующей итерации главного цикла
# будет вызван метод task.wakeup
callback(*args) # task.wakeup(fut)
___________________________________________________________________
class Task(Future):
def _wakeup(self, future):
try:
future.result()
except Exception as exc:
self._step(exc)
else:
# так как Future завершилась успешно он вызовет метод task._step
self._step()
def _step(self, exc=None):
try:
if exc is None:
# который отправит в корутину ещё один None
result = self._coro.send(None)
else:
___________________________________________________________________
async def main(loop):
sock = socket.socket()
sock.bind(('localhost', 8080))
sock.listen(100)
sock.setblocking(False)
# ключевое слово await вызывает метод __awai__ второй раз
conn, addr = await loop.sock_accept(sock)
result = await loop.sock_recv(conn, 1000)
print(result)
___________________________________________________________________
class Future:
def __await__(self):
if self._state == 'PENDING':
yield self
# так как Future завершена возвращаем результат
return self.result()
___________________________________________________________________
async def main(loop):
sock = socket.socket()
sock.bind(('localhost', 8080))
sock.listen(100)
sock.setblocking(False)
# результат возвращенный из Future помещается в переменные conn и addr
conn, addr = await loop.sock_accept(sock)
result = await loop.sock_recv(conn, 1000)
print(result)
Вот таким нехитрым способом asyncio выполняет корутины.
Итоги
Цель создания asyncio была успешно достигнута. Она не только решила проблему совместимости, но и вызвала огромный рост интереса к конкурентному программированию в сообществе. Новые статьи и библиотеки начали появляться, словно грибы после дождя. Кроме того, asyncio повлияла и на сам язык: в него были добавлены нативные корутины и новые ключевые слова async/await. В предыдущий раз новое ключевое слово добавлялось в далеком 2003 году, это было ключевое слово yield.
Один из целей создания asyncio было обеспечить предельно простую интеграцию в уже существовавшие асинхронные фреймворки (Twisted, Tornado, Gevent). Из этой цели логически вытекает выбор инструментов: если бы не было требования совместимости, возможно, корутинам была бы отдана главная роль. Из-за того, что при программировании на коллбэках невозможно сохранить непрерывный стэк вызовов, на границе между ними и корутинами пришлось создать дополнительную систему, обеспечивающую поддержку опирающихся на него возможностей языка.
Теперь главный вопрос. Зачем всё это знать простому пользователю библиотеки, который следует рекомендациям из документации и использует лишь корутины и высокоуровневый API?
Вот кусок документации класса StreamWriter
Его экземпляр возвращается функцией asyncio.open_connection и является async/await API поверх API на коллбэках. И эти коллбэки из него торчат. Функции write и writelines синхронные, они пытаются писать в сокет, а если не получается, то сбрасывают данные в нижележащий буфер и добавляют коллбэки на запись. Корутина drain нужна для того, чтобы обеспечить возможность дождаться, пока количество данных в буфере не опустится до заданного значения.
Если забыть вызвать drain между вызовами write, то внутренний буфер может разрастись до неприличных размеров. Однако, если помнить об этом, то остается пара неприятных моментов. Первый: если коллбэк на запись «сломается», то корутина, использующая этот API никак об этом не узнает и, соответственно, не сможет обработать. Второй: если корутина «сломается», то коллбэк на запись никак об этом не узнает и продолжит писать данные из буфера.
Таким образом, даже используя только корутины, будьте готовы к тому, что коллбэки напомнят о себе.
О том, как работать с базами данных из асинхронного кода, вы можете прочитать в этой статье нашего корпоративного блога Antida software.
Автор: nosterx