В данной статье мы рассмотрим основы асинхронного программирования в python, фокусируясь на ключевых концепциях и их практическом применении. Мы начнем с изучения генераторов и итераторов — фундаментальных механизмов, лежащих в основе асинхронности python. Затем поговорим о потоках и процессах, чтобы понять, как они соотносятся с асинхронным подходом.
Основная цель статьи — создание собственной упрощенной реализации asyncio, включая цикл событий, задачи и примитивы синхронизации. Это позволит глубже понять внутреннее устройство асинхронной разработки в python.
Содержание
Генераторы
В общих чертах
Что такое генератор? Генератор — это специальный тип итератора, возвращаемые значения которого вычисляются по мере их востребования. Такой подход называется lazy evaluation (ленивые вычисления). Он позволяет экономить ресурсы за счёт того, что не хранит в себе готовый список элементов, а возвращает их по мере итерации. Для начала давайте рассмотрим в коде, как создавать генератор, и сравним его с кортежем на предмет экономии ресурсов:
import sys
from typing import Generator # импортируем тип генератора
def integer_generator() -> Generator[int, None, None]:
for i in range(100_000):
yield i # обратите ваше внимание на данный кейворд
g = (integer_generator()) # функция возвращает генератор
t = tuple(range(100_000)) # создаём кортеж на 100'000 элементов
print(sys.getsizeof(g)) # генератор: 192 байта
print(sys.getsizeof(t)) # кортеж: 800'040 байт
ge = (i for i in range(100_000)) # Generator Expression (Выражение-генератор)
равны = sys.getsizeof(g) == sys.getsizeof(ge) # однострочник равноценен
if равны:
print("да")
Как видно из примера, мы неплохо сэкономили память, генерируя данные на ходу. А откуда их берёт range
, спросишь ты? А всё оттуда же: range
работает по тому же принципу ленивых вычислений и является, пожалуй, эталонным примером использования генератора.
Для затравки давайте также рассмотрим пример, в котором мы будем сохранять состояние генератора более явно для нашего глаза.
from typing import Generator
def generate_trip() -> Generator[str, None, None]:
yield "сели в транспорт"
yield f"отправились в {destination}"
yield "покинули транспорт"
route = generate_trip()
print(next(route))
destination = "Икстлан"
print(next(route))
print(next(route))
print(next(route)) # exception StopIterator
Как вы могли заметить, в нашем коде появилась встроенная функция next
, с помощью которой мы, казалось бы, "обращаемся", а на самом деле итерируемся по экземпляру нашего генератора. Каждый такой вызов переносит нас к следующему ключевому слову yield
, расположенному в коде нашего генератора. Более того, переменную destination
, к которой мы обращаемся во втором yield
, мы создали между вызовами next
. И пока генератор не дошёл до этого шага, интерпретатор, разумеется, не искал эту переменную.
Уже на этом этапе у вас могут появиться первые догадки о том, как реализовать асинхронность на основе генераторов... Но сперва давайте научимся обрабатывать исключение StopIteration
.
Что такое итератор
В сети уже достаточно информации об итераторах, но поскольку я хочу сделать эту статью комплексной, мы вскользь пройдёмся по основным аспектам итераторов.
Объясню по-питоньи... Итератор — это класс, реализующий методы:
-
__iter__
— возвращает экземпляр итератора. Этот метод необязательно должен быть реализован в самом классе-итераторе; он может возвращать другой класс, реализующий интерфейс итератора. -
__next__
— метод, который возвращает следующее значение из последовательности. По завершении последовательности он поднимает исключение StopIteration, сигнализирующее о том, что элементы закончились.
Давайте реализуем небольшой итератор:
class Fridge:
def __init__(self) -> None:
self.__products: tuple[str] = (
"молоко",
"яйца",
"сыр",
"творог",
"сметана",
)
self.__cursor_index: int = -1
def __iter__(self) -> "Fridge":
return self
def __next__(self) -> str:
if len(self.__products)-1 == self.__cursor_index:
self.__cursor_index = -1
raise StopIteration
self.__cursor_index += 1
return self.__products[self.__cursor_index]
fridge = Fridge()
for product in fridge:
print(product)
Я вас не осужу если вы попробуете сделать что-то вроде этого:
def __next__(self) -> str:
for product in self.__products:
return product
raise StopIteration
Вы на собственном опыте поймёте, что обычная функция, в отличие от функции-генератора, не сохраняет своё текущее состояние для следующего вызова и просто прекращает свой жизненный цикл после return
. Тогда вы, скорее всего, сделаете нечто такое:
def __next__(self) -> Generator[str, None, None]:
for product in self.__products:
yield product
raise StopIteration
Хорошая попытка... Что произойдёт уже ясно из аннотации. Кстати к аннотации генераторов мы ещё вернёмся чуть позже.
Также стоит ответить на вопрос, какую функцию мы использовали в нашем примере с Икстланом. Встроенная функция next()
принимает в качестве аргумента итератор и вызывает у него дандер-метод __next__
. Благодаря этому происходит пошаговая итерация по нашему итератору до тех пор, пока не будет встречено исключение StopIteration. Эта функция поможет сделать работу с генераторами более контролируемой.
yield и return
Давайте напишем ещё один генератор:
from typing import Generator
def system_boot() -> Generator[str, None, str]:
yield "Firmware Initialization"
yield "Bootloader"
yield "Kernel Initialization"
yield "System Initializer"
return "SUCCESS"
for stage in system_boot():
print(stage)
Запустив его, у вас должен возникнуть вопрос: почему саксесс не саксесс? Мы вроде итерировались до талого, но в итоге дошли лишь до System Initializer . Всё дело в том, что используя ключевое слово yield
, мы сохраняем контекст выполнения нашего генератора и возвращаем (или принимаем) промежуточный результат. Однако, используя return
, мы бы свернули контекст, удалили локальные переменные и указатель на байт-код.
А где же тогда результат работы нашего генератора? Он спрятался, пожалуй, не в самом очевидном месте:
stages = system_boot()
while True:
try: next(stages)
except StopIteration as err:
print(err.value) # а вот же он :3
break
Вам это может показаться уродливым, но мы ещё причешемся:
def main() -> Generator[None, None, None]:
stages = system_boot()
result = yield from stages # так лучше
print(result)
def run() -> None:
main_gen = main()
while True:
try: next(main_gen)
except StopIteration:
return
run()
Появилось какое-то yield from
, и выглядит оно изящнее, чем бесконечный цикл и отлавливание ответа через обработку исключения. Однако код оброс каким-то мусором. Этот run()
вам ничего не напоминает?
Также предлагаю вам подумать, сколько циклов мы пройдём в нашей функции run()
. Может показаться, что всего два: на первом мы доберёмся до yield from
, получив значение, а на втором перехватим исключение и завершим функцию. Однако это не так! Мы прошли его целых пять раз.
Сам yield from
служит лишь связующим звеном, которое поднимает результаты yield
выше. Он выступает простым посредником. Следовательно, для завершения итерации по всем вложенным генераторам нам нужно будет выполнить цикл столько раз, сколько необходимо для полной итерации по всем генераторам в сумме.
Выше я упомянул, что yield
может как возвращать, так и принимать промежуточные значения. В данной статье нашим генераторам не потребуется принимать значения извне, однако о этой, а также других функциональностях генераторов, мы должны знать, чтобы наконец разобраться с их аннотацией.
Аннотация
Аннотация - это лучшее что появилось в python начиная с версии 3.5. Об аннотации можете почитать PEP 484. В случае с генераторами, вы можете заметить что мы добавляем три типа в наши квадратные скобки:
from typing import Generator
Generator[YieldType, SendType, ReturnType] # но зачем?
А всё очень просто!
-
YieldType — это тип того, что возвращает наш генератор после очередного
yield
в качестве промежуточного результата. -
SendType — второй тип указывает, какие промежуточные значения может принимать и обрабатывать наш генератор. В данной статье этот аспект неактуален для нас.
-
ReturnType — это тип значения, которое возвращает наш генератор через
return
.
Если вы ранее писали асинхронный код, и быть может даже делали это не пренебрегая аннотацией, вы вероятнее всего уже встречали подобную конструкцию. Так вот, корутины и генераторы - это объекты тесно связанные! Более того, если вы откроете CPython 3.14, и отыщете там файл Include/internal/pycore_genobject.h, вы обнаружите что помимо структуры _PyGenObject там расположилась структура корутины - _PyCoroObject, благодаря чему мы интуитивно должны проследить между ними взаимосвязь, ибо без интуиции рыскать по CPython совсем невесело!
С генераторами всё!
Подведём итоги по пройденному материалу: мы познакомились с генераторами и провели параллели с итераторами. Мы узнали, как создавать генераторы, какие методы они реализуют и в каких областях демонстрируют свои сильные стороны. На основе этих знаний мы можем перейти к следующему этапу и увидеть, как наши генераторы обёрнуты в красивый синтаксис асинхронности.
Асинхронность
База
Асинхронность - это такая форма конкурентности, которая нашла своё применение в области операций ввода-вывода, таких как: сетевые запросы, обращение к базам данных, и работа с файлами.
В python асинхронность реализована через фреймворк asyncio
, который является стандартной библиотекой для работы с асинхронными операциями ввода-вывода.
Давайте рассмотрим пример из реальной жизни. Предположим, что вы — это экземпляр человека (что, безусловно, является действительностью). Вы занимаетесь домашними делами, а ваши действия по дому можно представить как методы, которые вы реализуете в качестве человека, выполняющего бытовые задачи.
Представьте, что сегодня у вас есть следующий список задач:
-
Постирать в стиральной машинке
-
Помыть посуду в посудомойке
-
Протереть пыль
Как бы мы это сделали в реальности? Мы загрузили бы вещи в стиральную машину и запустили её. Затем мы загрузили бы посудомойку посудой и также запустили бы её. После этого мы могли бы заняться протиранием пыли с поверхностей. Где тут асинхронность?
Если бы мы были не человеком, а интерпретатором Python, всё выглядело бы менее эффективно. Мы загрузили бы вещи в стиральную машину, запустили её и, вместо того чтобы переключиться на посудомойку, просто сидели бы рядом со стиралкой, ожидая её завершения. Аналогично поступили бы и с посудомойкой — дождались бы её окончания, прежде чем приступить к следующему делу.
Мы сами не замечаем, какие мы нынче асинхронные. Мы можем одновременно запускать десятки или даже сотни различных процессов, которые не требуют нашего постоянного внимания, пока мы заняты тем, что можно назвать блокирующей операцией — протиранием пыли...
Давайте напишем код:
import asyncio
import time
async def start_washing_machine() -> None:
print("загрузили машину бельём, и запустили стирку")
time.sleep(3) # машина стирает, а мы смотрим на неё
print("стирка завершена, вешаем бельё")
async def run_dishwasher() -> None:
print("загрузили машину посудой, и запустили мойку")
time.sleep(3) # машина моет, а мы смотрим на неё
print("мойка завершена, ставим посуду на просушку")
async def main() -> None:
# и создаём несколько "асинхронных задач"
asyncio.create_task(start_washing_machine())
asyncio.create_task(run_dishwasher())
asyncio.run(main()) # здесь мы запускаем нашу точку входа
Суммарное время выполнения составило 6 секунд, то есть каждое действие заняло ровно 3 секунды, как и было указано в определении каждой функции. На первый взгляд всё выглядит хорошо, но, как мы уже отметили, нет необходимости сидеть у стиральной машины и ждать, пока она закончит работу.
Чтобы оптимизировать процесс, нужно внести изменения в код: использовать ключевое слово await
, за которым следует вызов функции, возвращающей корутину. Такие функции объявляются с помощью конструкции async def
.
import asyncio
import time
async def start_washing_machine() -> None:
print("загрузили машину бельём, и запустили стирку")
await asyncio.sleep(3) # теперь стиралка сама по себе
print("стирка завершена, вешаем бельё")
async def run_dishwasher() -> None:
print("загрузили машину посудой, и запустили мойку")
await asyncio.sleep(3) # как и посудомойка
print("мойка завершена, ставим посуду на просушку")
async def main() -> None:
await asyncio.gather( # тут мы также асинхронно ждём
asyncio.create_task(start_washing_machine()),
asyncio.create_task(run_dishwasher())
) # ждём пока наши задачи не завершат своё выполнение
asyncio.run(main())
Совсем другое дело! Время ожидание сократилось до 3-х секунд благодаря тому, что мы перестали сталкерить нашу бытовую технику, и нашли чем себя ещё занять. А что там про пыль?
Как я уже говорил, пыль нам придётся протирать своими двумя, ведь робота для этих дел у нас пока нет. Поэтому мы вынуждены смириться с тем, что эта операция для нас является условно блокирующей. Мы не можем одновременно протирать пыль и вешать бельё. В контексте этой статьи не можем, но в пределах вашей квартиры вам доступны любые клининговые эксцессы.
Из этого следует важный вывод: не все функции в python имеют асинхронную версию себя, которая возвращает корутину. Например, вычисления, выполняемые на CPU, являются блокирующими операциями.
Только разница между кодом и реальностью заключается в том, как обрабатываются блокирующие операции. Представьте, что ваше тело вмещает в себя и стиральную машину, и посудомоечную машину, и ваши две руки для протирания пыли. В реальной жизни даже если вы заняты блокирующей операцией (например, протиранием пыли), вы всё равно можете дожидаться завершения работы стиральной машины. Однако в коде ситуация менее оптимистична: когда интерпретатор попадает на участок блокирующего кода, где он должен "протирать пыль", его внутренняя "стиральная машина" останавливается (в рамках этого потока, но об этом пока не будем думать). Она ждёт, пока "тело" интерпретатора завершит работу с пылью, и только после этого снова запустит барабан.
Реализация в Python
В python асинхронность работает на основе событийного цикла (event loop) в рамках которого выполняются поставленные задачи.
ОСНОВНЫЕ КОМПОНЕНТЫ:
-
Цикл событий:
-
Занимает центральное место в асинхронном коде.
-
Принимает в себя задачи, следит за ходом их выполнения.
-
Предоставляет методы для взаимодействия с задачами.
-
Задачи:
-
Объекты оборачивающие в себя специальные функции/процедуры - корутины. Каждая задача следит за промежуточными результатами работы корутины, и отвечает за её текущий статус перед циклом событий.
-
Работают конкурентно, могут синхронизироваться между собой. Работают в рамках одного потока, однако используя специальные инструменты могут быть запущены в другом потоке, оставаясь при этом в одном цикле событий.
-
Корутина:
-
Специальная функция/процедура, которая должна быть спроектирована как неблокирующая операция.
-
Объявляется с использованием специального кейворда
async def
. -
Построена по принципу генераторов.
ПРИНЦИП РАБОТЫ:
-
Инициализация цикла событий.
-
Событийный цикл просматривает пул задач, и выбирает первую готовую к работе, после чего передаёт её контроль.
-
Задачи возвращают контроль циклу событий, когда встречается кейворд
await
в обёрнутый внутрь её корутины. -
Цикл событий проходится по всем задачам, проверяет статус выполнения, передает контроль задачам, которые готовы возобновить, или начать работу.
-
Задачи возобновляют работу корутины с той точки, где преостановили выполнение ранее.
-
Когда цикл событий не находит больше задач в пуле, он завершает свою работу.
Потоки, процессы
Думаю, мы не будем углубляться в написание примеров многопоточности или мультипроцессорного кода, а просто разберём важное отличие между этими формами конкурентности. В конце этой короткой главы я дам тебе волшебную палочку, которая поможет разблокировать заблокированное.
Думаю, подсознательно вы ощущаете, что мультипроцессорность — это нечто связанное с ядрами CPU, и вы будете правы! Это очень ресурсоемкая форма конкурентности, которая позволяет распределить ваши задачи на все ядра вашего CPU. А в чем её дороговизна? В том, что для этого необходимо поддерживать индивидуальное адресное пространство для каждого ядра, а к тому же синхронизация между ядрами — тоже не самый сладкий сахар. Этот подход можно использовать в научных вычислениях, когда требуется задействовать все ресурсы вашего CPU. Можно также задействовать GPU с помощью CUDA, но эта статья посвящена другим темам.
Потоки же в свою очередь темка более запутанная, для меня уж точно... По сути, потоки — это участки машинного кода, которые выполняются в рамках одного процесса, благодаря чему их выполнение происходит внутри единого адресного пространства, за счёт чего поток является более дешёвой версией параллелизма.
Мы не будем говорить про GIL.
Совет: не пытайтесь разогнать свой квантовый компьютер, используя все возможные потоки в асинхронном режиме на всех ядрах своего третьего пня. Каждая из форм конкурентности хорошо справляется с конкретной задачей.
А теперь вкусненькое - волшебная палочка:
import asyncio
import time
async def main() -> None: # время выполнения не 100, а 10 сек.
for i in range(10):
asyncio.create_task(
asyncio.to_thread(time.sleep, (10)) # магия здесь
)
asyncio.run(main())
С помощью функции-корутины to_thread
мы можем запустить блокирующую операцию в отдельном потоке, что бывает очень полезно. Однако не стоит этим злоупотреблять, так как количество одновременно запущенных потоков ограничено на уровне операционной системы.
Синхронизация
Почему конкурентность называется конкурентностью? Всё дело в том, что в основе конкурентности лежит совместный доступ к ресурсам, в следствии чего возникает конкурентность за их эксплуатацию.
Представьте, что вы студент живущий в общаге, и в одной квартире, предположим шестнадцатой, есть восемь комнат, и всего один холодильник. Ваша задача — приготовить себе покушать, к примеру, картошку и сосиски. Сосиски вы уже купили, и сложили в холодильник, а вот картофель забыли, и пришлось повторно пойти в магазин.
И вот вы уже возвратились с магазина, обратились к холодильнику, а сосисочек то и нет!
...вы стали жертвой конкурентности за общий ресурс!
Давайте воспроизведём ситуацию в коде:
import asyncio
collapse = exit # сделаем алиас для выразительности
ACTION_TIME: float = .5 # можете отрегулировать
fridge: set[str] = { # ресурс за который конкурируют
"сосисочки",
"мазик",
"какой-то пакет, был когда заехали"
}
class Student:
def __init__(self, name: str) -> None:
self.name: str = name
self.hunger: bool = True # студент всегда голодный
async def eat(self, food: str) -> None:
if food in fridge:
fridge.remove(food)
await asyncio.sleep(ACTION_TIME) # ест
print(f"{self.name} съел {food}")
async def cook(self, foods: set[str]) -> bool:
if not foods.issubset(fridge):
return False
fridge.difference(foods)
await asyncio.sleep(ACTION_TIME*3) # готовит
fridge.add("сосисочки с картофелем")
return True
async def buy(self, foods: set[str]) -> None:
await asyncio.sleep(ACTION_TIME*3) # идет в магазин
fridge.update(foods)
purchased: str = ', '.join(map(str, foods))
print(f"{self.name} купил {purchased}")
async def you_day(you: Student) -> None:
ingredients = {"сосисочки", "картофель"}
while you.hunger:
if await you.cook(ingredients):
await you.eat("сосисочки с картофелем")
you.hunger = False
else:
await you.buy(ingredients-fridge)
collapse()
async def sanya_day(sanya: Student) -> None:
while sanya.hunger:
if not fridge:
await asyncio.sleep(ACTION_TIME) # грустит
foods = list(fridge)
for food in foods:
await sanya.eat(food)
async def main() -> None:
you = Student("Ты")
sanya = Student("Саня")
await asyncio.gather(
asyncio.create_task(you_day(you)),
asyncio.create_task(sanya_day(sanya))
)
asyncio.run(main())
Как вы можете заметить, Саня всегда оказывается проворнее вас, и опустошает ресурс без которого не может схлопнуться эта вселенная страданий.
Так быть не должно, и потому существуют инструменты для сдерживания Сани, эти инструменты и называются примитивами синхронизации.
Чтобы не растилать очередную простыню, я покажу более схематичный пример:
import asyncio
resource: int = 0
lock = asyncio.Lock() # блокирующий примитив
async def work(name: str) -> None:
global resource
async with lock: # дальше таски входят по одному
print(f"{name} принялся за работу")
await asyncio.sleep(5)
resource += 1
print(f"{name} выполнил работу")
async def main() -> None:
tasks: tuple[asyncio.Task] = (
asyncio.create_task(
work(f"Работник-{i}")
) for i in range(100)
)
await asyncio.gather(*tasks)
asyncio.run(main())
В промежуточном результате работы мы можем видеть, что задачи выполняются последовательно, несмотря на то, что они были запущены конкурентно.
Это не единственный способ синхронизации, однако в рамках этой статьи нам этого будет достаточно.
Что есть в asyncio
На примере to_thread
мы можем увидеть как бывает полезно знать функциональность стандартной библиотеки, так что предлагаю вам углубиться немного в asyncio, а здесь мы разберём функции и методы, которые мы будем реализовывать в собственном коде.
Данный фреймворк предоставляет нам широчайший спектр возможностей для создания, обработки, и синхронизации задач. Это позволяет нам полностью контролировать ход выполнения работы каждой задачи.
Пробежимся по основным методам и функциям которые нам потребуются:
1. run(main: coroutine) -> _T@run
-
Принимает корутину в качестве аргумента и запускает событийный цикл для её выполнения.
-
Возвращает результат завершения переданной корутины.
-
Эта функция обычно используется как точка входа в асинхронную программу. При анализе кода именно её стоит искать, чтобы понять, где начинается управление событийным циклом.
2. create_task(coro: coroutine) -> Task
-
Принимает корутину и оборачивает её в объект задачи (
Task
), который автоматически добавляется в текущий событийный цикл. -
Возвращает созданную задачу.
-
Важно отметить, что
create_task
не являетсяawaitable
объектом, поэтому к нему нельзя применить ключевое словоawait
. Вызов этой функции просто регистрирует задачу в событийном цикле, после чего выполнение программы продолжается дальше, пока сама задача ждёт своей очереди на выполнение.
3. gather(*coros: coroutine) -> Future
-
Принимает произвольное количество корутин или задач и выполняет их параллельно, дожидаясь завершения всех переданных аргументов.
-
Возвращает список результатов в том же порядке, в котором корутины были переданы.
-
Хотя документация указывает, что результатом должна быть коллекция типа
tuple
, на практике возвращаетсяlist
. Это может быть связано с особенностями реализации или изменениями в новейших версиях Python.
4. sleep(delay: float)
-
Приостанавливает выполнение текущей задачи на указанное количество секунд, предоставляя контроль над выполнением другим задачам в событийном цикле.
-
Принимает аргумент
delay
типаfloat
, который определяет время приостановки. -
Если передать
0
в качестве аргумента, это позволит временно передать управление другим задачам без фактической задержки. Однако, несмотря на очевидную логичность,asyncio.sleep
не имеет значения по умолчанию для параметраdelay
.
5. Lock()
-
Lock
— это примитив синхронизации, используемый для защиты участков кода, чувствительных к конкурентности. -
Объект
Lock
можно использовать в контекстеasync with
, что позволяет безопасно ограничить доступ к общим ресурсам, предотвращая конфликты между задачами. -
Например, если несколько задач пытаются одновременно изменить один и тот же ресурс,
Lock
гарантирует, что только одна задача сможет получить доступ в любой момент времени.
Разумеется это лишь меньшая часть того, что есть в asyncio
, но именно этой малой части нам достаточно чтобы реализовать наш собственный asyncio
.
Пишем свой asyncio с yield'ой и тасками
Затянутое предисловие
Вам могло показаться, что мы здесь занимаемся всем, чем угодно, но только не написанием собственной асинхронности. В свою защиту могу сказать, что первые две главы были вынужденным предисловием — даже если предположить, что вы не узнали для себя ничего нового, кроме моих писательских потуг. Вынужденными они были по той причине, что мне хотелось сконцентрировать ваше внимание именно на тех участках ваших знаний, которые помогут вам самостоятельно написать то, о чем мы здесь говорим...
Я предлагаю вам свернуть данную статью на какое-то время, а затем попытаться написать код самостоятельно. Это позволит вам получить практический опыт и лучше понять материал, что сделает дальнейшее чтение моего кода более сознательным для вас. Это полезная практика — написать подобие того, что вам вскоре придётся прочитать. Именно регулярное написание кода своими силами помогает опытным программистам читать чужой код, а чтение чужого (а часто даже своего собственного) кода, пожалуй, самая сложная задача в разработке.
...
попробовали? отлично! идём дальше!
План работ
Для нашего проекта я хотел бы использовать собственный нейминг. Например, я переименовал бы EventLoop
в Dispatcher
или Manager
, поскольку в нашем проекте цикл событий будет ВСЕГО ОДИН, без возможности создавать экземпляры циклов. Его поведение будет больше соответствовать диспетчеру задач, а скорее даже их менеджеру.
ОДНАКО! Поскольку в этой статье мы проводим аналогии, мне приходится следовать общепринятому неймингу. В рамках архитектуры нашего проекта такой выбор нейминга не является оптимальным, но придётся принять его как есть...
Что нам предстоит сделать:
-
Мы реализуем цикл событий, и, чтобы не размазывать его по отдельному модулю, сделаем это внутри одного класса. Этот класс будет состоять из статических методов и нужен лишь для того, чтобы всё писать в одном файле. Как я уже говорил, нам нет необходимости создавать экземпляры событиного цикла, поэтому нет смысла делать этот класс полноценным.
-
Мы реализуем класс
Task
, который будет оборачивать корутины. Напомню, если вы уже забыли, что в качестве корутин мы будем использовать генераторы. МЫ НЕ БУДЕМ РЕАЛИЗОВЫВАТЬ КЛАСС КОРУТИНЫ, так как это абсолютно бессмысленная трата времени, если мы не меняем синтаксис языка на уровне CPython! Однако мы найдём элегантный способ обозначения корутин, чтобы вы точно знали, где находится обычный генератор, а где наша кустарная корутина. -
Мы реализуем класс, который будет выполнять роль замка для синхронизации задач, и назовём его, разумеется,
Lock
. -
Мы напишем функцию
sleep
, которая поможет нам эмулировать активность на первых этапах работы.
Пишем...
Для начала нам нужно сделать так, чтобы наши генераторы стали похожи на корутины:
from typing import Generator, TypeVar
CORO_RETURN_TYPE = TypeVar("ReturnType")
Coro = Generator[None, None, CORO_RETURN_TYPE]
def coroutine() -> coro[bool]: # пример объявления нашей корутины
return True # возвращает тот тип, который указали в аннотации
Вы могли заметить, что в нашем алиасе используется тип генератора, который не принимает значения извне и не возвращает промежуточных значений. Это сделано в рамках нашей карикатуристики. На данном этапе нам не требуется ничего большего, кроме возможности приостанавливать и возобновлять работу корутин.
Мы написали микро, теперь напишем макро и свяжем их крепко:
from typing import Generator, TypeVar, Generic
from types import GeneratorType
CORO_RETURN_TYPE = TypeVar("ReturnType")
Coro = Generator[None, None, CORO_RETURN_TYPE]
class EventLoop:
tasks: list["Task[CORO_RETURN_TYPE]"] = []
@staticmethod
def run(entry: Coro[CORO_RETURN_TYPE]) -> CORO_RETURN_TYPE:
...
@staticmethod
def create_task(coroutine: Coro[CORO_RETURN_TYPE]) -> "Task[CORO_RETURN_TYPE]":
...
@staticmethod
def gather(*tasks: "Task[CORO_RETURN_TYPE]") -> Coro[tuple[CORO_RETURN_TYPE, ...]]:
...
class Task(Generic[CORO_RETURN_TYPE]):
def __init__(self, coroutine: Coro[CORO_RETURN_TYPE]) -> None:
...
def wake(self) -> None:
...
def wait(self) -> Coro[CORO_RETURN_TYPE]:
...
Будет хорошим тоном заложить в основу нашего проекта чётко определённую архитектуру, импортировать зависимости и в целом набросать схему. Если вы не можете сделать этого в начале, значит, вы плохо представляете, как должна работать ваша программа. Скорее всего, она будет писаться понаитию, а это, как правило, приводит к тому, что вам придётся переписывать её с нуля — и, скорее всего, не один раз... снова и снова.
Чёткое понимание того, как работает наша программа, помогает нам писать чистый и аккуратный код, больше похожий на навигацию, чем на обычный код. Это так же естественно, как то, как рисует художник: мы создаём набросок тонкими линиями, и уже сейчас можем легко читать код, не запутываясь в жирной начинке функций и процедур.
Так и что мы видим в коде?
-
Мы видим импорты, и наш алиас. Типизация
asyncio
во время первого знакомства покажется вам очень путанной. Наш код такой же. -
Мы видим наш псевдо-класс состоящий из одних лишь статических методов.
-
В классе том мы видим коллекцию, к тому же мутабельную - список состоящий из экземпляров класса
Task
. -
Мы видим что экземпляры параметризованные, и из названия понятно что это тип значения которая вернёт корутина обёрнутая в экземпляр
Task
. -
Мы видим три статических метода:
run
,create_task
,gather
:-
run
- принимает корутину, и возвращает результат того типа, который возвращает корутина. Другими словами, возрващает результат корутины. -
create_task
- принимает корутину, и возвращает экземпляр классаTask
. -
gather
- принимает в себя произвольное количество позиционных аргументов типаTask
, и возвращает кортеж с результатом их работы. Если вы увидели ту ошибку котороую я здесь допустил, попрошу вас задержаться в комментариях и предложить вариант её решения в коде!
-
-
Мы видим класс
Task
который может работать с разными типами результатов нашей корутины. -
Инициализация экземпляра класса
Task
происходит с корутиной в качестве аргумента. -
Класс
Task
имеет два метода:wake
иwait
:-
wake
- процедурный метод, о нём мы можем судить пока только по названию. Он что-то пробуждает... Не является корутиной о чём мы можем судить из аннотации, следовательно его использование будет протикать либо вне цикла событий, либо поверх него. -
wait
- корутина возвращающая результат выполнения. Похоже, что это - обёртка.
-
Мы увидели достаточно... Теперь давайте определим атрибуты класса Task
. Что должна хранить в себе задача? Думаю, задача должна хранить в себе:
-
Инструкцию работы - корутину.
-
Статус готовности.
-
Результат работы.
-
Исключение из инструкции. Ошибку, которая помешала выполнению. Идём в код:
def __init__(self, coroutine: Coro[CORO_RETURN_TYPE]) -> None:
self.coroutine: Coro[CORO_RETURN_TYPE] = coroutine
self.done: bool = False
self.result: CORO_RETURN_TYPE | None = None
self.error: BaseException | None = None
Статус выполнения может быть либо ложным, либо истинным — среднего не дано. Результат может быть, а может и не быть, как и исключения.
Отлично, задачу мы наметили. Думаю, создание экземпляров задач мы делегируем нашему событийному циклу, и это будет выполнять create_task
.
@staticmethod
def create_task(coroutine: Coro[CORO_RETURN_TYPE]) -> "Task[CORO_RETURN_TYPE]":
task: Task[CORO_RETURN_TYPE] = Task(coroutine)
EventLoop.tasks.append(task)
return task
Можно было бы разделить создание задачи, и её добавление в пул задач, но мы этого делать не будем.
Ну, а теперь, давайте уже наконец пропишем логику нашего событийного цикла:
@staticmethod
def run(entry: Coro[CORO_RETURN_TYPE]) -> CORO_RETURN_TYPE:
task: Task = EventLoop.create_task(entry) # точку входа в пул
while EventLoop.tasks:
for task in EventLoop.tasks[:]: # для безопасного удаления
task.wake() # пробуждаем... что-то...
if task.done: # удалим задачу если она завершила работу
EventLoop.tasks.remove(task)
return task.result # вернём результат нашему синхронному коду
Всё оказывается проще, чем можно было представить. По сути, мы просто проходимся по всем задачам, пробуждаем то, что в них спрятано, а после удаляем их, если они закончили работу.
Так, а что же мы пробуждаем-то? А пробуждаем мы корутину, которая обёрнута внутри нашей задачи. Не забыли про то, как работают генераторы? Напишем же тогда wake
:
def wake(self) -> None:
try:
next(self.coroutine) # делаем шаг
except StopIteration as err: # приехали
self.done = True
self.result = err.value
except BaseException as err: # любое возникшее исключение
self.done = True
self.error = err.__class__
В том случае, если в нашей корутине было поднято исключение на каком-либо шаге выполнения, мы не можем продолжать выполнение инструкций дальше и должны сохранить это исключение для его последующей обработки.
Отлично! Сейчас я предлагаю написать функцию-корутину, которая нам понадобится для первых тестов:
from datetime import datetime
def sleep(seconds: float = 0.0) -> Coro[None]:
start_time: datetime = datetime.now()
while (datetime.now() - start_time).total_seconds() < seconds:
yield
И теперь давайте взглянем как выглядит наш код на нынешнем этапе:
from typing import Generator, TypeVar, Generic
from datetime import datetime
CORO_RETURN_TYPE = TypeVar("ReturnType")
Coro = Generator[None, None, CORO_RETURN_TYPE]
class EventLoop:
tasks: list["Task[CORO_RETURN_TYPE]"] = []
@staticmethod
def run(entry: Coro[CORO_RETURN_TYPE]) -> CORO_RETURN_TYPE:
task: Task = EventLoop.create_task(entry)
while EventLoop.tasks:
for task in EventLoop.tasks[:]:
task.wake()
if task.done:
EventLoop.tasks.remove(task)
return task.result
@staticmethod
def create_task(coroutine: Coro[CORO_RETURN_TYPE]) -> "Task[CORO_RETURN_TYPE]":
task: Task[CORO_RETURN_TYPE] = Task(coroutine)
EventLoop.tasks.append(task)
return task
@staticmethod
def gather(*tasks: "Task[CORO_RETURN_TYPE]") -> Coro[tuple[CORO_RETURN_TYPE, ...]]:
...
class Task(Generic[CORO_RETURN_TYPE]):
def __init__(self, coroutine: Coro[CORO_RETURN_TYPE]) -> None:
self.coroutine: Coro[CORO_RETURN_TYPE] = coroutine
self.done: bool = False
self.result: CORO_RETURN_TYPE | None = None
self.error: BaseException | None = None
def wake(self) -> None:
try:
next(self.coroutine)
except StopIteration as err:
self.done = True
self.result = err.value
except BaseException as err:
self.done = True
self.error = err.__class__
def wait(self) -> Coro[CORO_RETURN_TYPE]:
...
def sleep(seconds: float = 0.0) -> Coro[None]:
start_time: datetime = datetime.now()
while (datetime.now() - start_time).total_seconds() < seconds:
yield
def main() -> Coro[None]:
...
EventLoop.run(main())
Предлагаю сделать первое тестирование. И убедиться что код работает асинхронно. Напишем функцию-корутину, которую отправим в обёртке задачи в наш событийный цикл:
def worker(name: str) -> Coro[None]:
print(f"{name}: приступил к работе")
yield from sleep(10) # аналогично await asyncio.sleep(10)
print(f"{name}: завершил работу")
def main() -> Coro[None]:
for i in range(100):
EventLoop.create_task(worker(f"Работник {i}"))
yield # main всё также должна возвращать генератор
EventLoop.run(main())
# Работник 0: приступил к работе
# Работник 1: приступил к работе
# ...
Асинхронность!
И давайте мы заполним наши пробелы, а после приступим к реализации синхронизации Lock
:
from typing import Generator, TypeVar, Generic
from datetime import datetime
CORO_RETURN_TYPE = TypeVar("ReturnType")
Coro = Generator[None, None, CORO_RETURN_TYPE]
class EventLoop:
tasks: list["Task[CORO_RETURN_TYPE]"] = []
@staticmethod
def run(entry: Coro[CORO_RETURN_TYPE]) -> CORO_RETURN_TYPE:
task: Task = EventLoop.create_task(entry)
while EventLoop.tasks:
for task in EventLoop.tasks[:]:
task.wake()
if task.done:
EventLoop.tasks.remove(task)
return task.result
@staticmethod
def create_task(coroutine: Coro[CORO_RETURN_TYPE]) -> "Task[CORO_RETURN_TYPE]":
task: Task[CORO_RETURN_TYPE] = Task(coroutine)
EventLoop.tasks.append(task)
return task
@staticmethod
def gather(*tasks: "Task[CORO_RETURN_TYPE]") -> Coro[tuple[CORO_RETURN_TYPE, ...]]:
while any(not task.done for task in tasks):
yield
return tuple(task.result for task in tasks)
class Task(Generic[CORO_RETURN_TYPE]):
def __init__(self, coroutine: Coro[CORO_RETURN_TYPE]) -> None:
self.coroutine: Coro[CORO_RETURN_TYPE] = coroutine
self.done: bool = False
self.result: CORO_RETURN_TYPE | None = None
self.error: BaseException | None = None
def wake(self) -> None:
try:
next(self.coroutine)
except StopIteration as err:
self.done = True
self.result = err.value
except BaseException as err:
self.done = True
self.error = err.__class__
def wait(self) -> Coro[CORO_RETURN_TYPE]:
while not self.done:
yield
return self.result
def sleep(seconds: float = 0.0) -> Coro[None]:
start_time: datetime = datetime.now()
while (datetime.now() - start_time).total_seconds() < seconds:
yield
Теперь у нас есть методы и функции, позволяющие нам дожидаться выполнения конкретных задач или групп задач. А теперь — Lock
:
class Lock:
def __init__(self) -> None:
self.lock: bool = False
def acquire(self) -> Coro[None]:
while self.lock:
yield
self.lock = True
def release(self) -> None:
self.lock = False
Минималистично... У нас нет контекстного менеджера по той же причине, по которой мы создавали алиас для корутин. Язык не предоставляет нам синтаксического сахара для реализации нашей собственной асинхронности, и asyncio здесь монополист, который обладает следующими возможностями: __aenter__
, __aexit__
, __aiter__
, __anext__
и даже __await__
.
Нам же это всё недоступно...
Давайте теперь перепишем наш пример для проверки замка:
def worker(name: str, lock: Lock) -> Coro[None]:
yield from lock.acquire()
print(f"{name}: приступил к работе")
yield from sleep(1)
print(f"{name}: завершил работу")
lock.release()
def main() -> Coro[None]:
lock = Lock()
for i in range(100):
EventLoop.create_task(worker(f"Работник {i}", lock))
yield
EventLoop.run(main())
# Работник 0: приступил к работе
# Работник 0: завершил работу
# ...
А теперь я предлагаю взглянуть на наш MVP целиком. Напоминаю, мы работаем в рамках одного файла, чтобы не бегать по модулям:
from typing import Generator, TypeVar, Generic
from types import GeneratorType
from datetime import datetime
CORO_RETURN_TYPE = TypeVar("ReturnType")
Coro = Generator[None, None, CORO_RETURN_TYPE]
class EventLoop:
tasks: list["Task[CORO_RETURN_TYPE]"] = []
@staticmethod
def run(entry: Coro[CORO_RETURN_TYPE]) -> CORO_RETURN_TYPE:
task: Task = EventLoop.create_task(entry)
while EventLoop.tasks:
for task in EventLoop.tasks[:]:
task.wake()
if task.done:
EventLoop.tasks.remove(task)
return task.result
@staticmethod
def create_task(coroutine: Coro[CORO_RETURN_TYPE]) -> "Task[CORO_RETURN_TYPE]":
task: Task[CORO_RETURN_TYPE] = Task(coroutine)
EventLoop.tasks.append(task)
return task
@staticmethod
def gather(*tasks: "Task[CORO_RETURN_TYPE]") -> Coro[tuple[CORO_RETURN_TYPE, ...]]:
while any(not task.done for task in tasks):
yield
return tuple(task.result for task in tasks)
class Task(Generic[CORO_RETURN_TYPE]):
def __init__(self, coroutine: Coro[CORO_RETURN_TYPE]) -> None:
if not isinstance(coroutine, GeneratorType):
raise TypeError(f"{coroutine} is not coroutine")
self.coroutine: Coro[CORO_RETURN_TYPE] = coroutine
self.done: bool = False
self.result: CORO_RETURN_TYPE | None = None
self.error: BaseException | None = None
def wake(self) -> None:
try:
next(self.coroutine)
except StopIteration as err:
self.done = True
self.result = err.value
except BaseException as err:
self.done = True
self.error = err.__class__
def wait(self) -> Coro[CORO_RETURN_TYPE]:
while not self.done:
yield
return self.result
class Lock:
def __init__(self) -> None:
self.lock: bool = False
def acquire(self) -> Coro[None]:
while self.lock:
yield
self.lock = True
def release(self) -> None:
self.lock = False
def sleep(seconds: float = 0.0) -> Coro[None]:
start_time: datetime = datetime.now()
while (datetime.now() - start_time).total_seconds() < seconds:
yield
Также мы добавили проверку типа при инициализации задачи, чтобы вы не забывали передавать туда генератор!
В вашей функции-корутине должен быть хотя бы один yield
, потому как мы не можем использовать async def
в нашей реализации для явного объявления корутины!
Заключение
В этой статье мы рассмотрели различные формы конкурентности и разобрали одну из них на примере python — асинхронность. Узнали, при чём здесь генераторы, что они из себя представляют, а также реализовали собственную лабораторную версию asyncio.
Подобная тема достаточно обширна, и статья уже подошла к некоторым лимитам. Поэтому такую тему следовало бы разбить на несколько частей, чтобы реализовать механизмы работы с файловыми дескрипторами и создать свой примитивный aiohttp
или aiofiles
.
Проект будет сохранён под названием gaio
(Generator Asynchronous Input Output) как модуль и, возможно, со временем будет расширяться в обучающих целях.
Буду благодарен за конструктив в комментариях, желательно с примерами и ссылками.
Репозиторий gaio
:
Автор: tamerlanlarsanov