Хочу поделиться простым рецептом, как можно эффективно выполнять большое число http-запросов и других задач ввода-вывода из обычного Питона. Самое правильное, что можно было бы сделать — использовать асинхронные фреймворки вроде Торнадо или gevent. Но иногда этот вариант не подходит, потому что встроить event loop в уже существующий проект проблематично.
В моем случае уже существовало Django-приложение, из которого примерно раз в месяц нужно было выгрузить немного очень мелких файлов на AWS s3. Шло время, количество файлов стало приближаться к 50 тысячам, и выгружать их по очереди стало утомительным. Как известно, s3 не поддерживает множественное обновление за один PUT-запрос, а установленная опытным путем максимальная скорость запросов с сервера ec2 в том же датацентре не превышает 17 в секунду (что очень не мало, кстати). Таким образом, время обновления для 50 тысяч файлов стало приближаться к одному часу.
Питонисты с детства знают, что от использования потоков (тредов операционной системы) нет никакого толка из-за глобального лока интерпретатора. Но немногие догадываются, что как и любой лок, этот время от времени освобождается. В частности, это происходит при операциях ввода-вывода, в том числе и сетевых. А значит, потоки можно использовать для распараллеливания http-запросов — пока один поток ожидает следующего ответа, другой спокойно обрабатывает результат предыдущего или готовит следующий.
Получается, всего-то нужен пул потоков, который будет выполнять запросы. К счастью, такой пул уже написан. Начиная с версии 3.2 для унификации всей асинхронной работы в Питоне появилась библиотека concurrent.futures
. Для второй версии Питона есть бекпорт под именем futures. Код до безобразия прост:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(concurrency) as executor:
for _ in executor.map(upload, queryset):
pass
Здесь concurrency
— число рабочих потоков, upload
— функция, выполняющую саму задачу, queryset
— итератор объектов, которые по одному будут передаваться в задачу. Уже этот код при concurrency в 150 смог пропихнуть на сервера Амазона ≈450 запросов в секунду.
Тут необходимо замечание относительно задач: они должны быть потокобезопасны. Т.е. несколько паралельно выполняющихся задач не должны иметь общих ресурсов, либо должны ими правильно управлять. Глобальный лок интерпретатора тут плохой помощник — он не гарантирует, что выполнение потока не прервется в самом неподходящем месте. Если вы пользуетесь только urllib3, requests или boto, волноваться не о чем, они уже потокобезопасны. Про другие библиотеки нужно уточнять. Также потоконебезопасным может оказаться ваш собственный код.
Шло время, количество файлов стало приближаться к 200 тысячам. Как думаете, сколько памяти могут занимать 200 тысяч Django-моделей? А 200 тысяч фьючерсов? А 200 тысяч поставленных задач? Все вместе около гигабайта. Стало понятно, что посылать в экзекутор все сразу — не выход. Но почему бы не добавлять новые задачи по окончании предыдущих? В самом начале добавляем количество задач, равное количеству потоков, ведем учет сколько задач поставлено, сколько выполнено. Сами фьючерсы не храним, наружу не отдаем. Получается очень классная функция, которую можно использовать повторно (осторожно, это не окончательный вариант):
from concurrent.futures import ThreadPoolExecutor, Future
def task_queue(task, iterator, concurrency=10):
def submit():
try:
obj = next(iterator)
except StopIteration:
return
stats['delayed'] += 1
future = executor.submit(task, obj)
future.add_done_callback(upload_done)
def upload_done(future):
submit()
stats['delayed'] -= 1
stats['done'] += 1
executor = ThreadPoolExecutor(concurrency)
stats = {'done': 0, 'delayed': 0}
for _ in range(concurrency):
submit()
return stats
В ней всего три действия: функция submit
, которая выбирает следующий объект из итератора и создает для него задачу, upload_done
, которая вызывается по окончании выполнения задачи и ставит следующую, и цикл, в котором ставятся первые задачи. Пробуем запустить:
stats = task_queue(upload, queryset.iterator(), concurrency=5)
while True:
print 'rdone {done}, in work: {delayed} '.format(**stats),
sys.stdout.flush()
if stats['delayed'] == 0:
break
time.sleep(0.2)
Отлично, работает! Тут уже используется метод iterator
кверисета. Кажется, что его можно было бы использовать и в первом примере с функцией executor.map
, но executor.map
выбирает сразу весь итератор и делает его бесполезным. Тут же объекты действительно выбираются по одному на каждый работающий поток.
Правда, есть проблема: стоит увеличить кол-во потоков, как начинают сыпаться исключения «ValueError: generator already executing». Код использует один и тот же генератор из всех потоков, поэтому рано или поздно два потока пытаются выбрать значения одновременно (на самом деле это может произойти когда потоков всего два, но с меньшей вероятностью). Это же касается и счетчиков, рано или поздно два процесса одновременно считают одно значение, потом оба прибавят единицу и оба запишут «исходное число + 1», а не «исходное число + 2». Поэтому всю работу с разделяемыми объектами нужно обернуть в локи.
Есть и другие проблемы. Нет обработки ошибок, которые могут произойти во время выполнения задачи. Если прервать выполнение с помощью ctrl+c, в основном потоке будет выброшено исключение, а остальные продолжат выполнение до самого конца, поэтому нужен механизм принудительного завершения очереди. У экзекутора как раз есть метод shutdown для этих целей и можно было бы отдавать экзекутор наружу, чтобы останавливать его, когда пользователь нажимает ctrl+c. Но есть вариант получше: можно создать фьючерс, который будет резолвится по окончании всех работ и подчищать экзекутор, если кто-то извне его отменит. Вот версия, в которой учтены все эти ошибки:
def task_queue(task, iterator, concurrency=10, on_fail=lambda _: None):
def submit():
try:
obj = next(iterator)
except StopIteration:
return
if result.cancelled():
return
stats['delayed'] += 1
future = executor.submit(task, obj)
future.obj = obj
future.add_done_callback(upload_done)
def upload_done(future):
with io_lock:
submit()
stats['delayed'] -= 1
stats['done'] += 1
if future.exception():
on_fail(future.exception(), future.obj)
if stats['delayed'] == 0:
result.set_result(stats)
def cleanup(_):
with io_lock:
executor.shutdown(wait=False)
io_lock = threading.RLock()
executor = ThreadPoolExecutor(concurrency)
result = Future()
result.stats = stats = {'done': 0, 'delayed': 0}
result.add_done_callback(cleanup)
with io_lock:
for _ in range(concurrency):
submit()
return result
Тут нужно использовать reentrant лок, потому что есть определенная вероятность, что очень короткая задача успеет выполнится до навешивания обработчика в add_done_callback
, и тогда обработчик будет выполнен немедленно в том же потоке и попытается еще раз захватить лок. Получится дедлок. Reentrant лок позволит тому же потоку, что захватил его в первый раз, спокойно зайти еще раз, но не даст себя захватить из другого потока, пока первый поток не освободит его столько же раз, сколько захватывал. Немного меняется и код, который использует эту очередь задач:
from concurrent.futures import ThreadPoolExecutor, Future, TimeoutError
result = task_queue(upload, queryset.iterator(), concurrency=5)
try:
while not result.done():
try:
result.result(.2)
except TimeoutError:
pass
print 'rdone {done}, in work: {delayed} '.format(**result.stats),
sys.stdout.flush()
except KeyboardInterrupt:
result.cancel()
raise
Больше не нужно тупо засыпать каждые 200 миллисекунд, можно засыпать по умному, ожидая завершения очереди. А в случае прерывания останавливать очередь.
Смеркалось. Шло время, количество файлов стало приближаться к 1,5 миллионам. Несмотря на то, что все выглядело так, как будто все работает с фиксированным потреблением памяти (кол-во тредов, фьючерсов и Django-моделей на протяжении всего выполнения не должно меняться), потребление памяти все равно росло. Оказалось, что queryset.iterator()
работает немного не так, как ожидалось. Объекты действительно создаются только тогда, когда явно выбираются из итератора, а вот сырой ответ базы данных все равно выгребается драйвером сразу. Получается около 500 мегабайт на миллион строк. Решение этой проблемы довольно очевидно: нужно делать запросы не на все объекты сразу, а разделять порции. При этом следует избегать выборки со смещением, потому что запрос вида LIMIT 100 OFFSET 200000 на самом деле означает, что СУБД нужно пробежаться по 200100 записям. Вместо смещения следует использовать выборку по полю с индексом.
def real_queryset_iterator(qs, pk='pk', chunk_size=5000):
qs = qs.order_by(pk)
chunk = list(qs[:chunk_size])
while chunk:
for item in chunk:
yield item
last_pk = getattr(chunk[-1], pk)
chunk = list(qs.filter(**{pk + '__gt': last_pk})[:chunk_size])
Здесь pk — скорее pagination key, нежели primary. Впрочем, зачастую primary хорошо подходит на эту роль. Такой итератор действительно расходует фиксированное количество памяти и работает не медленнее выборки за один раз. Но если увеличить кол-во потоков, возникает еще одна проблема. В Джанге соединения с базой данных являются локальными для потоков, поэтому, когда очередной поток делает запрос, создается новое соединение. Рано или поздно количество соединений доходит до критического числа и возникает исключение, подобное этому:
OperationalError: FATAL: remaining connection slots are reserved for non-replication superuser connections
Правильным решением было бы использовать для всех потоков одно и то же соединение, т.к. мы уже ограничили возможность одновременно делать запросы из разных потоков. Стандартных средств для этого в Джанге нет, но это можно сделать с помощью хака, заменив объект threading.local
на обычный объект:
from django.db import connections, DEFAULT_DB_ALIAS
connections._connections = type('empty', (object,), {})()
connections[DEFAULT_DB_ALIAS].allow_thread_sharing = True
Но надо понимать, что это убъет потокобезопасность базы данных во всем остальном приложении, поэтому такой вариант годится только для команд, запускаемых из консоли. Более гуманный вариант — закрывать соединение после каждого запроса, или после каждого элемента, что дает не сильно большой оверхэд.
def close_connection_iterator(iterator, db=DEFAULT_DB_ALIAS):
for item in iterator:
connections[db].close()
yield item
result = task_queue(
upload,
close_connection_iterator(real_queryset_iterator(queryset)),
concurrency=150
)
Есть и третье решение: использовать отдельный поток, который будет общаться с базой данных, отдавая объекты в остальные потоки. Этот вариант ничего не ломает в остальном приложении и не привносит накладных расходов на постоянное переоткрытие соединений. Но его реализация довольно сложна и тянет не меньше чем на отдельную статью.
Возможно, пройдет еще время, кол-во файлов возрастет до 10 миллионов и появятся новые проблемы. Но пока кажется, что основная проблема будет в том, что такое обновление займет около восьми часов и будет стоит $50 только за PUT-запросы по текущим ценам Амазона.
Некоторые тезисы из прочитанного:
- Потоки для ввода-вывода на Питоне работают хорошо, но надо позаботиться об изоляции.
- Запускать десятки и сотни тысяч задач нужно очень аккуратно, следя за потреблением памяти.
queryset.iterator()
в Джанговской ORM работает не совсем так, как ожидается.
Хелперы task_queue
и real_queryset_iterator
на гитхабе:
https://gist.github.com/homm/b8caf60c11997da69b1e
Автор: homm