Для организации обработки потока задач используются очереди. Они нужны для накопления и распределения задач по исполнителям. Также очереди могут обеспечивать дополнительные требования к обработке задач: гарантия доставки, гарантия однократного исполнения, приоритезация и т. д.
Как правило, используются готовые системы очередей сообщений (MQ — message queue), но иногда нужно организовать ad hoc очередь или какую-нибудь специализированную (например, очередь с приоритетом и отложенным перезапуском не обработанных из-за исключений задач). О создании таких очередей и пойдёт речь ниже.
Ограничения применимости
Предлагаемые решения предназначены для обработки потока однотипных задач. Они не подходят для организации pub/sub или обмена сообщениями между слабо связанными системами и компонентами.
Очередь поверх реляционной БД хорошо работает при малых и средних нагрузках (сотни тысяч задач в сутки, десятки-сотни исполнителей), но для больших потоков лучше использовать специализированное решение.
Суть метода в пяти словах
select ... for update skip locked
Базовая очередь
Для простоты здесь и далее в таблице будут храниться только уникальные идентификаторы задач. Добавление какой-нибудь полезной нагрузки не должно составить труда.
Таблица для простейшей очереди содержит саму задачу и её статус:
create table task
(
id bigint not null primary key,
status integer not null default 0 -- 0 - новая, 1 - в работе, 2 - выполнена
);
create index task__status__idx on task (status);
Добавление задачи:
insert into task (id) values ($1) on conflict (id) do nothing;
Получение следующей задачи:
with next_task as (
select id from task
where status = 0
limit 1
for update skip locked
)
update task
set
status = 1
from next_task
where task.id = next_task.id
returning task.id;
Завершение задачи:
update task
set
status = 2
where id = $1;
Очередь с приоритетами
В простом случае id
задачи является её приоритетом. Меняется только запрос на получение следующей задачи — добавляется условие сортировки order by id
с требуемым порядком обработки задач. Также нужно создать составной индекс по (status, id)
.
Либо для приоритета добавляется отдельный столбец:
create table task
(
id bigint not null primary key,
priority integer not null,
status integer not null default 0 -- 0 - новая, 1 - в работе, 2 - выполнена
);
create index task__status__priority__idx on task (status, priority);
insert into task (id, priority) values ($1, $2) on conflict (id) do nothing;
with next_task as (
select id from task
where status = 0
order by priority
limit 1
for update skip locked
)
update task
set
status = 1
from next_task
where task.id = next_task.id
returning task.id;
Выделенный столбец позволяет менять приоритет задачи "на лету".
Очередь с повтором "упавших" задач
В процессе выполнения задачи может произойти ошибка или исключительная ситуация. В таких случаях задачу нужно поставить в очередь повторно. Иногда ещё требуется отложить и время её повторного исполнения на некоторое время, например, если исключение связано с временной недоступностью стороннего сервиса.
create table task
(
id bigint not null primary key,
status integer not null default 0, -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
attempt integer not null default 0,
delayed_to timestamp null,
error_text text null
);
create index task__status__delayed_to__idx on task (status, delayed_to);
Как видно, расширился список статусов и добавились новые столбцы:
attempt
— номер попытки; нужен для принятия решения о необходимости повтора (ограничение количества попыток) и для выбора задержки перед повтором (например, каждая следующая попытка откладывается на10 * attempt
минут);delayed_to
— время следующей попытки выполнения задачи;error_text
— текст ошибки.
Текст ошибки нужен для группировки по типам ошибки.
Пример. Система мониторинга сообщает, что в очереди скопились тысячи задач со статусом "ошибка". Выполняем запрос:
select error_text, count(*) from task where status = 3 group by 1 order by 2 desc;
За подробностями идём в логи исполнителей. Исправляем ситуацию, вызвавшую ошибки (если это возможно). При необходимости ускоряем перезапуск задач установкой статуса в 0 или сдвигом времени следующей попытки.
with next_task as (
select id from task
where status = 0
limit 1
for update skip locked
)
update task
set
status = 1,
attempt = attempt + 1,
delayed_to = null,
error_text = null
from next_task
where task.id = next_task.id
returning task.id;
with next_task as (
select id from task
where status = 3
and delayed_to < localtimestamp
limit 1
for update skip locked
)
update task
set
status = 1,
attempt = attempt + 1,
delayed_to = null,
error_text = null
from next_task
where task.id = next_task.id
returning task.id;
update task
set
status = 2,
delayed_to = null,
error_text = null
where id = $1;
update task
set
status = 3,
delayed_to = localtimestamp + make_interval(mins => 5 * attempt),
error_text = $2
where id = $1;
update task
set
status = 4,
delayed_to = null,
error_text = $2
where id = $1;
Запрос получения следующей задачи разделён на два, чтобы СУБД могла построить эффективный план запроса для очереди с приоритетом. Условие отбора с or
может очень плохо сочетаться с сортировкой order by
.
Сбор метрик
Добавляем такие атрибуты:
- время создания задачи;
- время изменения задачи;
- время начала и завершения выполнения задачи.
create table task
(
id bigint not null primary key,
status integer not null default 0, -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
attempt integer not null default 0,
begin_time timestamp null,
end_time timestamp null,
delayed_to timestamp null,
error_text text null,
created timestamp not null default localtimestamp,
updated timestamp not null default localtimestamp
);
create index task__status__delayed_to__idx on task (status, delayed_to);
create index task__updated__idx on task (updated);
Учитываем добавленные столбцы во всех запросах.
with next_task as (
select id from task
where status = 0
limit 1
for update skip locked
)
update task
set
status = 1,
attempt = attempt + 1,
begin_time = localtimestamp,
end_time = null,
delayed_to = null,
error_text = null,
updated = localtimestamp
from next_task
where task.id = next_task.id
returning task.id;
with next_task as (
select id from task
where status = 3
and delayed_to < localtimestamp
limit 1
for update skip locked
)
update task
set
status = 1,
attempt = attempt + 1,
begin_time = localtimestamp,
end_time = null,
delayed_to = null,
error_text = null,
updated = localtimestamp
from next_task
where task.id = next_task.id
returning task.id;
update task
set
status = 2,
end_time = localtimestamp,
delayed_to = null,
error_text = null,
updated = localtimestamp
where id = $1;
update task
set
status = 3,
end_time = localtimestamp,
delayed_to = localtimestamp + make_interval(mins => 5 * attempt),
error_text = $2,
updated = localtimestamp
where id = $1;
update task
set
status = 4,
end_time = localtimestamp,
delayed_to = null,
error_text = $2,
updated = localtimestamp
where id = $1;
Примеры, для чего это может быть нужно
Поиск и перезапуск повисших задач:
update task
set
status = 3,
end_time = localtimestamp,
delayed_to = localtimestamp,
error_text = 'hanged',
updated = localtimestamp
where status = 1
and updated < localtimestamp - interval '1 hour';
Удаление старых задач:
delete from task
where updated < localtimestamp - interval '30 days';
Статистика по выполнению задач:
select
date_trunc('hour', end_time),
count(*),
sum(end_time - begin_time),
avg(end_time - begin_time)
from task
where status = 2
and end_time >= '2019-12-16'
group by 1
order by 1;
Повторный запуск ранее выполненных задач
Например, обновился документ, нужно его переиндексировать для полнотекстового поиска.
create table task
(
id bigint not null primary key,
task_updated_at timestamp not null default localtimstamp,
status integer not null default 0, -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
begin_time timestamp null,
end_time timestamp null,
delayed_to timestamp null,
error_text text null,
created timestamp not null default localtimestamp,
updated timestamp not null default localtimestamp
);
Здесь для времени обновления задачи добавлен столбец task_updated_at
, но можно было бы использовать поле created
.
Добавление или обновление (перезапуск) задачи:
insert into task (id, task_updated_at) values ($1, $2)
on conflict (id) do update
set
task_updated_at = excluded.task_updated_at,
status = case when status = 1 then 1 else 0 end,
delayed_to = null,
error_text = null,
updated = localtimestamp
where task_updated_at < excluded.task_updated_at;
Что здесь происходит. Задача становится "новой", если она сейчас не исполняется.
В запросе завершения задачи также будет проверка, была ли она изменена во время исполнения.
Запросы на получение следующей задачи такие же, как в очереди со сбором метрик.
Успешное завершение задачи:
update task
set
status = case when begin_time >= updated then 2 else 0 end,
end_time = localtimestamp,
delayed_to = null,
error_text = null,
updated = localtimestamp
where id = $1;
Завершение задачи с ошибкой: в зависимости от задачи. Можно сделать безусловное откладывание перезапуска, можно при обновлении ставить статус "новая".
Pipeline
Задача проходит несколько стадий. Можно для каждой стадии сделать отдельную очередь. А можно в таблицу добавить соответствующий столбец.
Пример на основе базовой очереди, чтобы не загромождать код. Все ранее описанные модификации без проблем применимы и к этой очереди.
create table task
(
id bigint not null primary key,
stage integer not null default 0,
status integer not null default 0
);
create index task__stage__status__idx on task (stage, status);
Получение следующей задачи на заданной стадии:
with next_task as (
select id from task
where stage = $1
and status = 0
limit 1
for update skip locked
)
update task
set
status = 1
from next_task
where task.id = next_task.id
returning task.id;
Завершение задачи с переходом на указанную стадию:
update task
set
stage = $2,
status = 2
where id = $1;
Или переход на следующую по порядку стадию:
update task
set
stage = stage + 1,
status = 2
where id = $1;
Задачи по расписанию
Это вариация очереди с повтором.
У каждой задачи может быть своё расписание (в простейшем варианте — периодичность запуска).
create table task
(
id bigint not null primary key,
period integer not null, -- периодичность запуска в секундах
status integer not null default 0, -- 0 - новая, 1 - в работе
next_run_time timestamp not null default localtimestamp
);
create index task__status__next_run_time__idx on task (status, next_run_time);
Добавление задачи:
insert into task (id, period, next_run_time) values ($1, $2, $3);
Получение следующей задачи:
with next_task as (
select id from task
where status = 0
and next_run_time <= localtimestamp
limit 1
for update skip locked
)
update task
set
status = 1
from next_task
where task.id = next_task.id
returning task.id;
Завершение задачи и планирование следующего запуска:
update task
set
status = 0,
next_run_time = next_run_time + make_interval(secs => period)
where id = $1
Вместо заключения
В создании специализированной очереди задач средствами РСУБД нет ничего сложного.
"Самопальная" очередь будет отвечать даже самым диким практически любым требованиям бизнеса/предметной области.
Ну и не следует забывать, что как и любая другая БД, очередь требует вдумчивого тюнинга сервера, запросов и индексов.
Автор: yakov-bakhmatov