В работе веб-сервиса, да и вообще многих других систем, часто встречается необходимость выполнения различных фоновых задач. Для этого пишут скрипты — воркеры — которые берут список имеющихся задач и начинают их выполнять — с какой-то скоростью и в какой-то последовательности.
Понятное дело, хорошо, когда все задачи выполняются быстро и без проволочек.
Для ускорения выполнения задач желательно решить две проблемы:
- Научить воркер не ждать выполнения каждого отдельного этапа задачи (асинхронность)
- Научить воркер выполнять одновременно несколько задач (многопоточность) (disclaimer: на самом деле термин «многопоточность» тут используется в значении «многопроцессность»)
В этой статье мы рассмотрим вариант реализации воркера, который будет одновременно асинхронным и многопоточным.
Модуль AnyEvent
Для программирования в асинхронном режиме в Перле есть отличный модуль AnyEvent.
На всякий случай следует сказать, что на самом деле AnyEvent является оберткой над другими низкоуровневыми асинхронными модулями. Как DBI является оберткой и универсальным интерфейсом к разным базам данных, так и AnyEvent является оберткой и универсальным интерфейсом к различным реализациям асинхронных движков.
Для AnyEvent имеется огромное количество всевозможных расширений, в том числе есть и расширение для написания многопоточных приложений — модуль AnyEvent::Fork::Pool.
Модуль AnyEvent::Fork::Pool предоставляет простой способ создания пула воркеров, которые будут обрабатывать задачи в асинхронном многопоточном режиме.
Скрипт
Рассмотрим скрипт anyevent_pool.pl:
#!/usr/bin/perl
use strict;
use warnings;
use AnyEvent::Fork::Pool;
use Worker;
# Модуль воркера
my $mod = 'Worker';
# Функция воркера
my $sub = 'work';
# Определить количество ядер в системе
my $cpus = AnyEvent::Fork::Pool::ncpu 1;
# Создать пул воркеров
my $pool = AnyEvent::Fork
->new
->require ($mod)
->AnyEvent::Fork::Pool::run(
"${mod}::$sub", # Модуль::Функция - рабочая функция воркера
init => "${mod}::init", # Модуль::init - функция инициализации воркера
max => $cpus, # Количество воркеров в пуле
idle => 0, # Количество воркеров при простое
load => 1, # Размер очереди воркера
);
# Поставить пулу задачи
for my $str (qw{q2 rtr4 ui3 asdg5}) {
$pool->($str, sub {
print "result: @_n";
});
};
AnyEvent->condvar->recv;
Несмотря на небольшой объем, этот скрипт представляет собой полноценное асинхронное многопоточное приложение.
Разберем его по частям.
Переменные
# Модуль воркера
my $mod = 'Worker';
# Функция воркера
my $sub = 'work';
Эти переменные задают связку между пулом и тем кодом, который будет выполять конкретные фоновые задачи. Пул — он один на всех, а задачи могут быть разные. Эти переменные указывают пулу, какой именно код (какую функцию из какого модуля) вы хотите запустить для выполнения конкретной задачи.
Например, у вас может быть модуль Text для обработки текста, а в модуле функции length и trim. И еще у вас может быть модуль Image, в котором могут быть функции resize и crop. Пулу совершенно без разницы, что делают ваши функции и как они устроены. Вам нужно просто сказать пулу, в каком модуле они находятся и как они называются, и пул их выполнит.
Важно! Модуль воркера не нужно подключать в скрипте через «use Worker». Пул сам автоматически подгрузит модуль воркера, вам нужно только правильно указать название модуля в переменной.
Количество ядер
# Определить количество ядер в системе
my $cpus = AnyEvent::Fork::Pool::ncpu 1;
Для многопоточного выполнения задач желательно знать, сколько в системе имеется ядер. Желательно, чтобы количество потоков, которые вы будете запускать, равнялось количеству ядер. Если потоков будет меньше — некоторые ядра будут простаивать зря, если потоков будет больше — некоторые потоки будут вставать в очередь и вместо ускорения получатся потери на диспетчеризацию.
Если по каким-то причинам количество ядер не удалось определить, то будет использоваться значение, указанное вручную. В данном случае это 1.
Пул
# Создать пул воркеров
my $pool = AnyEvent::Fork
->new
->require ($mod)
->AnyEvent::Fork::Pool::run(
"${mod}::$sub", # Модуль::Функция - рабочая функция воркера
init => "${mod}::init", # Модуль::init - функция инициализации воркера
max => $cpus, # Количество воркеров в пуле
idle => 0, # Количество воркеров при простое
load => 1, # Размер очереди воркера
);
Пояснения к параметрам:
- Рабочая функция воркера всегда должна указываться первым параметром. Это та самая функция того самого модуля, которую мы указали в двух первых «настроечных» переменных $mod и $sub. Это единственный обязательный параметр.
- init — Если в вашем воркере есть необходимость инициализации, то в этом параметре можно указать название инициализирующей функции. В даном случае название функции указано как «init», поскольку это обычное название для такой функции, но, в принципе, можно указать любое другое название.
- max — Этот параметр задает количество потоков, которые будет запускать пул. Именно тут следует указать ранее определенное количество ядер в системе (но если хотите — можете указать любое число, если знаете, что делаете).
- idle — тут указывается количество воркеров, которые будут ждать «на низком старте». Чем больше это число (но не больше параметра max) — тем быстрее пул среагирует на новую поступившую задачу, но тем больше будет бесполезно ждущих (и жрущих ресурсы) процессов.
- load — Сколько задач будет отдано каждому воркеру не дожидаясь исполнения предыдущих. Значение сильно зависит от ситуации — в каких-то случаях лучше меньше, в каких-то лучше больше. При прочих равных большее значение должно повышать эффективность работы пула (оптом — дешевле).
Также имеются и другие параметры, которые я здесь не рассматриваю. Они сильно специфичны и требуются редко. С полным списком параметров можно ознакомиться в документации модуля.
Постановка задач пулу
# Поставить пулу задачи
for my $str (qw{q2 rtr4 ui3 asdg5}) {
$pool->($str, sub {
print "result: @_n";
});
};
Пулу можно передать произвольное количество параметров, но последним параметром должен быть коллбэк. Коллбэк — это анонимная функция, которая будет вызвана после того, как воркер выполнит задачу. В эту функцию будут переданы результаты работы воркера.
Другими словами — эта функция является получателем результатов работы функции $sub. Все, что выдаст функция $sub, будет передано в качестве аргументов в функцию-коллбэк. Условно эту связь можно записать примерно так — «callback($sub)».
В нашем случае функция-коллбэк просто печатает все, что она получает.
Переменная же $str — это, собственно, и есть та самая задача, которую должен выполнить воркер. В нашем случае это просто одна строка (точнее — 4 строки, запускаемых в цикле). Строки тут не имеют никакого глубокого смысла, я просто позвал кота походить по клавиатуре.
В зависимости от ситуации вместо строки может быть всё, что угодно — имя файла, идентификатор записи в базе, математическое выражение, ссылка на сложную структуру с данными… короче говоря — всё, что угодно. Пулу без разницы, что это будет, он не обрабатывает это значение. Пул просто передает это значение воркеру, а вот тот уже должен знать, что с этим делать.
Запуск движка
AnyEvent->condvar->recv;
Эта строка говорит модулю AnyEvent, что нужно запустить в работу событийный движок и далее работать бесконечно.
В этом месте скрипт зациклится. Приведенный пример не имеет способа остановки и выхода из бесконечного цикла обработки задач. Вопрос условного выхода из цикла AnyEvent является более общим, а я здесь хочу рассмотреть только частный случай использования пула. Про условный выход из цикла можно почитать тут.
Сам воркер
Теперь возникает вопрос — а где же, собственно, сам воркер? Где код, исполняющий непосредственно работу?
Этот код вынесен в отдельный модуль, который мы указали в переменной $mod.
Вот код модуля Worker:
package Worker;
use strict;
use warnings;
my $file;
sub init {
open $file, '>>', 'file.txt';
my $q = select($file);
$|=1;
select($q);
return;
}
sub work {
my ($str) = @_;
for (1..length($str)) {
print $file "$$ $strn";
sleep 1;
};
return $str, length($str);
}
1;
Как видите, в модуле две функции — init и work.
Функция init инициализирует воркер. В нашем случае функция открывает лог-файл, в который далее будут выводиться результаты работы рабочей функции work. Как уже говорилось выше — функция init является необязательной, в нашем случае я сделал ее просто для наглядности.
Функция work — это главная функция. Это та самая рабочая функция, которая была задана в переменной $sub. Именно в этой функции выполняется вся работа, связанная с выполнением конкретной задачи.
В нашем случае функция выполняет простейшую работу — вычисляет длину строки. Для более наглядной демонстрации работы воркера я добавил в функцию цикл с секундной задержкой, который выводит строку в лог столько раз, сколько в строке букв.
Обратите внимание — функция возвращает два значения — саму строку и ее длину. Именно эти два значения будут переданы в коллбэк, заданный на этапе постановки задач пулу (а в коллбэке, как говорилось выше, эти значения будут просто напечатаны).
Вот, собственно, и весь код.
Запускаем пул
Теперь запустим наш пул и посмотрим, что получится:
Тут мы видим результаты работы пула. Можно заметить, что порядок вывода результатов отличается от порядка строк, заданного в цикле в скрипте. Причина понятна — у строк разная длина, поэтому воркеры обрабатывают строки с разной скоростью. Чем проще задача — тем она быстрее выполняется.
Теперь посмотрим не просто на результаты, но и на процесс работы воркеров. Для этого во втором окне запустим tail для лог-файла:
Обратите внимание — результаты работы выводятся вперемешку, так-как задачи выполняются одновременно. Слева видны идентификаторы процессов — видим, что задействованы 4 процесса. У меня в системе 4 ядра, поэтому одновременно выполняются все 4 задачи.
И, наконец, посмотрим на таблицу процессов:
Так выглядит дерево процессов нашего пула.
Первым в списке идет скрипт, далее менеджер пулов (да-да, пулов может быть несколько штук), потом менеджер пула, и, наконец, воркеры.
Если не полениться и сравнить идентификаторы процессов, то можно увидеть, что идентификаторы воркеров совпадают с идентификаторами в лог-файле.
Литература
- AnyEvent
- AnyEvent::Fork::Pool
- Статья про AnyEvent (русский язык)
Автор: ivanych