В данной статье мы рассмотрим задачу массовой асинхронной обработки запросов с последующей синхронной и ресурсоёмкой (CPU-bound) логикой. Главная сложность в том, что асинхронный код отлично справляется с большим количеством запросов к внешним сервисам, но CPU-bound вычисления в той же среде могут существенно снизить пропускную способность. Решение — вынести тяжёлую обработку в отдельный пул процессов.
Суть задачи
-
Имеется список кортежей (url, json_data), которых может быть до 1 000 000:
request_samples = [
("https://api.example.com/products/segment_a", {"items": ["id1", "id2"]}),
("https://api.example.com/products/segment_b", {"items": ["id3", "id4"]}),
# ... до 1 000 000 подобных кортежей
]
-
Для каждого такого кортежа необходимо:
-
Отправить запрос на указанный url с параметрами json=json_data и получить base_resp.
-
Из base_resp извлечь данные для трёх сервисных запросов (к url_1, url_2, url_3).
-
Получить ответы resp1, resp2, resp3 от этих сервисов.
-
Передать resp1, resp2, resp3 в функцию business_logic(resp1, resp2, resp3), которая выполняет синхронную и CPU-bound обработку.
-
Результат business_logic добавляется к итоговому списку.
-
Цель — выполнить всю обработку максимально эффективно, не блокируя асинхронный event loop при выполнении тяжелой CPU-bound логики.
Почему ProcessPoolExecutor?
-
CPU-bound обработка: Если операция действительно нагружает CPU, то использование ThreadPoolExecutor может быть неэффективным из-за GIL (Global Interpreter Lock) в CPython.
-
ProcessPoolExecutor запускает обработку в отдельных процессах, что позволяет выполнять CPU-bound задачи параллельно на разных ядрах, минуя ограничения GIL.
Архитектура решения
-
Используем asyncio и aiohttp для асинхронной загрузки данных.
-
После получения трех ответов (resp1, resp2, resp3) для каждого запроса используем ProcessPoolExecutor и run_in_executor, чтобы выполнить business_logic в отдельном процессе.
-
Собираем результаты в единый список.
import asyncio
import aiohttp
from concurrent.futures import ProcessPoolExecutor
def business_logic(resp1, resp2, resp3):
# Синхронная, CPU-bound обработка данных.
# Здесь может быть сложная логика:
# обработка списков, вычисления, агрегации и т.д.
aggregated = {
"item_count": len(resp1["items"]) + len(resp2["items"]) + len(resp3["items"]),
"details": resp1["details"] + resp2["details"] + resp3["details"]
}
return aggregated
async def fetch(session, url, json_data):
async with session.post(url, json=json_data) as response:
return await response.json()
async def process_single_request(session, url, json_payload, url_1, url_2, url_3, executor):
# 1. Запрос к основному API
base_resp = await fetch(session, url, json_payload)
# Предположим, извлекаем из base_resp список товаров
items_for_services = base_resp.get("data", {}).get("items", [])
service_payload = {"items": items_for_services}
# 2. Запросы к трем сервисным API
resp1 = await fetch(session, url_1, service_payload)
resp2 = await fetch(session, url_2, service_payload)
resp3 = await fetch(session, url_3, service_payload)
# 3. Вызов синхронной CPU-bound логики в отдельном процессе
loop = asyncio.get_event_loop()
aggregated_result = await loop.run_in_executor(
executor, business_logic, resp1, resp2, resp3
)
return aggregated_result
async def main(request_samples, url_1, url_2, url_3, max_parallel=1000):
results = []
conn = aiohttp.TCPConnector(limit=max_parallel)
# Инициализируем пул процессов
# Число процессов можно подобрать на основе числа CPU ядер
executor = ProcessPoolExecutor(max_workers=4)
async with aiohttp.ClientSession(connector=conn) as session:
tasks = [
asyncio.create_task(
process_single_request(session, url, payload, url_1, url_2, url_3, executor)
)
for url, payload in request_samples
]
for fut in asyncio.as_completed(tasks):
result = await fut
results.append(result)
return results
if __name__ == "__main__":
# Пример исходных данных
request_samples = [
("https://api.example.com/products/segment_a", {"items": ["id1", "id2"]}),
("https://api.example.com/products/segment_b", {"items": ["id3", "id4"]}),
# ... и далее до 1 000 000
]
url_1 = "https://service-a.example.com/details"
url_2 = "https://service-b.example.com/prices"
url_3 = "https://service-c.example.com/inventory"
final_results = asyncio.run(main(request_samples, url_1, url_2, url_3))
# final_results теперь будет содержать список агрегированных результатов, по одному на каждую запись из request_samples
Итоги и преимущества
-
Асинхронный ввод-вывод обеспечивает высокую пропускную способность при работе с внешними API.
-
ProcessPoolExecutor решает проблему CPU-bound логики, позволяя эффективно параллелить тяжелые вычисления.
-
Подобный конвейер можно масштабировать, изменяя количество процессов, размер порций запросов и добавляя дополнительные оптимизации (кеширование, rate limiting и т.д.). Таким образом, предложенный подход позволяет эффективно совмещать массовую асинхронную загрузку данных с синхронными, ресурсоёмкими вычислительными задачами.
Автор: Khachatur86