В очередной статье про SObjectizer продолжим следить за эволюцией простого поначалу агента, который все более и более усложняется по мере своего развития. Рассмотрим, как быть с отложенными сообщениями, в которых мы больше не заинтересованы. И воспользуемся некоторой функциональностью иерархических конечных автоматов.
В предыдущей статье мы остановились на том, что у нас появился агент email_analyzer, который можно считать более-менее надежно решающим свою задачу. Однако, он сам, последовательно, выполняет три стадии проверки email-а: сперва проверяет заголовки, затем содержимое, затем аттачи.
Скорее всего, каждая из этих операций не будет исключительно CPU-bound. Намного вероятнее, что вычленив какие-то значения из проверяемого фрагмента (например, из заголовков письма), потребуется сделать куда-то запрос для проверки допустимости этого значения. Например, запрос в БД дабы проверить, нет ли имени хоста-отправителя в черном списке. Пока будет выполняться данный запрос можно было бы выполнить еще какую-то операцию, например, разобрать содержимое текста письма на отдельные ключевые фразы, дабы их можно было проверить по какому-то словарю спам-маркеров. Или проверить, есть ли в аттачах архивы, и инициировать их проверку антивирусом. В общем, имеет смысл распараллелить операции анализа email-а.
Давайте попробуем задействовать отдельных агентов на каждую операцию. Т.е. можно написать агентов вида:
class email_headers_checker : public agent_t {
public :
struct result { check_status status_ }; /* Сообщение с результатом */
email_headers_checker( context_t ctx, ... /* Какие-то параметры */ ) {...}
virtual void so_evt_start() override {
... /* Иницирование операций по проверке заголовков */
}
... /* Какие-то детали реализации */
};
class email_body_checker : public agent_t {...};
class email_attachment_checker : public agent_t {...};
Каждый такой агент будет выполнять специфические для своей операции действия, а затем отошлет результат email_analyzer в виде сообщения. Нашему email_analyzer потребуется создать экземпляры этих агентов у себя и дождаться от них сообщений с результатами анализа:
void on_load_succeed( const load_email_succeed & msg ) {
try {
auto parsed_data = parse_email( msg.content_ );
introduce_child_coop( *this,
// Агенты-checker-ы будут работать на своем собственном
// thread-pool-диспетчере, который был создан заранее
// под специальным именем.
disp::thread_pool::create_disp_binder(
"checkers", disp::thread_pool::bind_params_t{} ),
[&]( coop_t & coop ) {
coop.make_agent< email_headers_checker >(
so_direct_mbox(), parsed_data->headers() );
coop.make_agent< email_body_checker >(
so_direct_mbox(), parsed_data->body() );
coop.make_agent< email_attach_checker >(
so_direct_mbox(), parsed_data->attachments() );
} );
}
catch( const exception & ) {...}
}
Тех, кто внимательно читал предыдущие статьи, фраза «дождаться от них сообщений» должна была бы насторожить. Ждать без ограничения времени не есть хорошо, это прямой путь получить зря болтающегося в системе и ничего не делающего агента. Поэтому при ожидании ответов от checker-ов нам имеет смысл поступить так же, как и при ожидании результата IO-операции: отослать самим себе какой-то отложенный сигнал, получив который мы поймем, что дальше ждать бессмысленно. Т.е. нам надо было бы написать что-то вроде:
// Попытка представить агента email_analyzer с двумя отложенными сигналами.
class email_analyzer : public agent_t {
// Этот сигнал потребуется для того, чтобы отслеживать отсутствие
// ответа от IO-агента в течении разумного времени.
struct io_agent_response_timeout : public signal_t {};
// Этот сигнал потребуется для того, чтобы отслеживать отсутствие
// результатов проверки отдельных частей email-а.
struct checkers_responses_timeout : public signal_t {};
...
virtual void so_evt_start() override {
... /* Отсылка запроса IO-агенту */
// И сразу же начинаем отсчет тайм-аута для ответа от IO-агента.
send_delayed< io_agent_response_timeout >( *this, 1500ms );
}
...
void on_load_succeed( const load_succeed & msg ) {
... /* Создание коопераций с агентами checker-ами */
// Сразу же начинаем отсчет тайм-аута для ответов от агентов-checker-ов.
send_delayed< checkers_responses_timeout >( *this, 750ms );
}
...
void on_checkers_responses_timeout() {
... /* Отсылка отрицательного ответа. */
}
};
Однако, пойдя по этому пути мы наступим на грабли: ожидая ответа от checker-ов мы запросто можем получить отложенный сигнал io_agent_response_timeout. Ведь его же никто не отменял. И когда это сигнал придет, мы сгенерируем отрицательный ответ из-за якобы имеющегося тайм-аута ввода-вывода, которого-то и нет. Давайте попробуем обойти эти грабли.
Зачастую разработчики, не привыкшие к асинхронному обмену сообщениями, пытаются отменить отложенный сигнал. Это можно сделать, если сохранить идентификатор таймера при обращении к send_periodic:
// Попытка представить агент email_analyzer с отменой отложенного
// сигнала io_agent_response_timeout.
class email_analyzer : public agent_t {
struct io_agent_response_timeout : public signal_t {};
...
virtual void so_evt_start() override {
... /* Отсылка запроса IO-агенту */
// Для того, чтобы получить идентификатор таймера используем
// send_periodic вместо send_delayed, но параметр period
// выставляем в 0, что делает отсылаемый сигнал отложенным,
// но не периодическим.
io_response_timer_ = send_periodic< io_agent_response_timeout >(
*this, 1500ms, 0ms );
}
...
void on_load_succeed( const load_succeed & msg ) {
// Отменяем отложенный сигнал.
io_response_timer_.reset();
... /* Создание коопераций с агентами checker-ами */
// Сразу же начинаем отсчет тайм-аута для ответов от агентов-checker-ов.
send_delayed< checkers_responses_timeout >( *this, 750ms );
}
...
// Идентификатор таймера для отложенного сигнала о тайм-ауте для IO-операции.
timer_id_t io_response_timer_;
};
К сожалению, этот простой способ не всегда работает. Проблема в том, что отложенный сигнал может быть отослан агенту email_analyzer буквально за мгновение до того, как агент email_analyzer выполнит сброс таймера для этого отложенного сигнала. Тут уж ничего не поделать – чудеса многопоточности, они такие.
Агент email_analyzer может зайти в on_load_succeed на контексте своей рабочей нити, может даже успеть войти в вызов reset() для таймера… Но тут его нить вытеснят, управление получит нить таймера SObjectizer-а, на которой произойдет отсылка отложенного сигнала. После чего управление опять получит рабочая нить агента email_analyzer() и метод reset() для таймера сделает отмену уже отосланного сигнала. Однако, сигнал уже находится в очереди сообщений агента, откуда его уже никто не выбросит – раз уж сообщение попало в очередь к агенту, то изъять его оттуда нельзя.
Самое плохое в этой ситуации то, что подобная ошибка будет возникать эпизодически. Из-за чего понять, что именно происходит и в чем именно ошибка, будет сложно. Так что нужно помнить, что отмена отложенного сообщения – это вовсе не гарантия того, что оно не будет отослано.
Итак, если просто отменять отложенное сообщение неправильно, то что же делать?
Например, можно использовать состояния агента. Когда email_analyzer ждет ответа от IO-агента, он находится в одном состоянии. Когда ответ от IO-агента приходит, агент email_analyzer переходит в другое состояние, в котором он будет ждать ответов от checker-ов. Т.к. во втором состоянии email_analyzer на сигнал io_agent_response_timeout не подписан, то этот сигнал будет просто проигнорирован.
С введением состояний в агент email_analyzer мы могли бы получить что-то вроде:
// Попытка представить агент email_analyzer с использованием
// нескольких состояний.
class email_analyzer : public agent_t {
struct io_agent_response_timeout : public signal_t {};
struct checkers_responses_timeout : public signal_t {};
// Состояние, в котором агент будет ждать результата IO-операции.
state_t st_wait_io{ this };
// Состояние, в котором агент будет ждать ответов от checker-ов.
state_t st_wait_checkers{ this };
...
virtual void so_define_agent() override {
// Подписываем агента на разные события в разных состояниях.
// Для того, чтобы это было наглядно, используем вторую способ
// подписки агентов – через методы класса state_t.
st_wait_io
.event( &email_analyzer::on_load_succeed )
.event( &email_analyzer::on_load_failed )
.event< io_agent_response_timeout >( &email_analyzer::on_io_timeout );
st_wait_checkers
.event( &email_analyzer::on_header_check_result )
.event( &email_analyzer::on_body_check_result )
.event( &email_analyzer::on_attach_check_result )
.event< checkers_responses_timeout >( &email_analyzer::on_checkers_timeout );
}
...
};
Однако, в SObjectizer можно поступить еще проще: можно назначить временной лимит на пребывание агента в конкретном состоянии. Когда этот лимит истечет, агент будет принудительно переведен в другое состояние. Т.е. мы можем написать что-то вроде:
// Попытка представить агента email_analyzer с использованием ограничения времени
// на пребывание агента в конкретном состоянии.
class email_analyzer : public agent_t {
state_t st_wait_io{ this };
state_t st_io_timeout{ this };
state_t st_wait_checkers{ this };
state_t st_checkers_timeout{ this };
...
virtual void so_define_agent() override {
st_wait_io
.event( &email_analyzer::on_load_succeed )
.event( &email_analyzer::on_load_failed )
// Ограничиваем время ожидания.
.time_limit( 1500ms, st_io_timeout );
st_wait_checkers
.event( &email_analyzer::on_header_check_result )
.event( &email_analyzer::on_body_check_result )
.event( &email_analyzer::on_attach_check_result )
.time_limit( 750ms, st_checkers_timeout );
}
};
Но просто ограничить время пребывания в некотором состоянии недостаточно. Нужно еще предпринять какие-то действия, когда это время истечет. Как это сделать?
Использовать такую вещь, как обработчик входа в состояние. Когда агент входит в конкретное состояние, SObjectizer вызывает функцию-обработчик входа в это состояние, если пользователь такую функцию назначил. Это означает, что на вход в st_io_timeout мы можем повесить обработчик, который отсылает check_result с отрицательным результатом и завершает работу агента:
st_io_timeout.on_enter( [this]{
send< check_result >( reply_to_, email_file_, check_status::check_failure );
so_deregister_agent_coop_normally();
} );
Точно такой же обработчик мы повесим и на вход в st_checkers_timeout. А т.к. действия внутри этих обработчиков будут одинаковыми, то мы можем вынести их в отдельный метод агента email_analyzer и указать этот метод в качестве обработчика входа и для состояния st_io_timeout, и для состояния st_checkers_timeout:
class email_analyzer : public agent_t {
state_t st_wait_io{ this };
state_t st_io_timeout{ this };
state_t st_wait_checkers{ this };
state_t st_checkers_timeout{ this };
...
virtual void so_define_agent() override {
...
st_io_timeout
.on_enter( &email_analyzer::on_enter_timeout_state );
...
st_checkers_timeout
.on_enter( &email_analyzer::on_enter_timeout_state );
};
...
void on_enter_timeout_state() {
send< check_result >( reply_to_, email_file_, check_status::check_failure );
so_deregister_agent_coop_normally();
}
};
Но и это еще не все. Раз уж мы затронули тему состояний агентов и их возможностей, то можно развить ее дальше и провести рефакторинг кода email_analyzer.
Нетрудно заметить, что в коде очень часто дублируется парочка действий: отсылка сообщения check_result и дерегистрация кооперации агента. Такое дублирование не есть хорошо, cледует от него избавиться.
По сути, работа агента email_analyzer сводится к тому, чтобы в итоге агент оказался в одном из двух состояний: либо все завершилось нормально и следует отослать положительный результат, после чего завершить свою работу, либо же все завершилось ошибкой, нужно отослать отрицательный результат и, опять таки, завершить работу агента. Так давайте это и выразим прямо в коде с помощью двух состояний агента: st_success и st_failure.
// Попытка представить агента email_analyzer со специальными финальными
// состояниями st_success и st_failure.
class email_analyzer : public agent_t {
state_t st_wait_io{ this };
state_t st_wait_checkers{ this };
state_t st_failure{ this };
state_t st_success{ this };
...
virtual void so_define_agent() override {
st_wait_io
.event( &email_analyzer::on_load_succeed )
.event( &email_analyzer::on_load_failed )
// Ограничиваем время ожидания.
.time_limit( 1500ms, st_failure );
st_wait_checkers
.event( &email_analyzer::on_header_check_result )
.event( &email_analyzer::on_body_check_result )
.event( &email_analyzer::on_attach_check_result )
.time_limit( 750ms, st_failure );
st_failure
.on_enter( [this]{
send< check_result >( reply_to_, email_file_, status_ );
so_deregister_agent_coop_normally();
} );
st_success
.on_enter( [this]{
send< check_result >( reply_to_, email_file_, check_status::safe );
so_deregister_agent_coop_normally();
} );
};
...
// Новый атрибут нужен для сохранения актуального отрицательного результата.
check_status status_{ check_status::check_failure };
};
Это позволит нам в коде агента просто менять состояние для завершения работы агента тем или иным образом:
void on_load_failed( const load_email_failed & ) {
st_failure.activate();
}
void on_checker_result( check_status status ) {
// На первом же неудачном результате прерываем свою работу.
if( check_status::safe != status ) {
status_ = status;
st_failure.activate();
}
else {
++checks_passed_;
if( 3 == checks_passed_ )
// Все результаты получены. Можно завершать проверку с
// положительным результатом.
st_success.activate();
}
}
Но можно пойти и еще дальше. Для состояний st_failure и st_success есть одно общее действие, которое нужно выполнить при входе в любое их этих состояний – обращение к so_deregister_agent_coop_normally(). И это не случайно, ведь оба этих состояния отвечают за завершение работы агента. А раз так, то мы можем воспользоваться вложенными состояниями. Т.е. мы введем состояние st_finishing, для которого st_failure и st_success будут подсостояниями. При входе в st_finishing будет вызываться so_deregister_agent_coop_normally(). А при входе в st_failure и st_success – будет только отсылаться соответствующее сообщение.
Т.к. состояния st_failure и st_success вложены в st_finishing, то при входе в любое из них сначала будет вызваться обработчик входа в st_finishing, а уже затем – обработчик входа в st_failure или st_success. Получится, что мы при входе в st_finishing мы дерегистрируем агента, а следом, при входе в st_failure или st_success, отсылаем сообщение check_result.
Если кто-то из читателей чувствует себя не комфортно при упоминании вложенных состояний, обработчиков входа в состояния, ограничений на время пребывания в состоянии, то имеет смысл ознакомится с одной из основополагающих статей на тему иерархических конечных автоматов: David Harel, Statecharts: A visual formalism for complex systems. Science of Computer Programming. Состояния агентов в SObjectizer реализуют изрядную часть описанных там возможностей.
В итоге всех этих преобразований агент email_analyzer примет показанный ниже вид.
// Седьмая версия агента email_analyzer, с распараллеливанием работы по проверке
// содержимого email-а и использованием вложенных состояний.
class email_analyzer : public agent_t {
state_t st_wait_io{ this };
state_t st_wait_checkers{ this };
state_t st_finishing{ this };
state_t st_failure{ initial_substate_of{ st_finishing } };
state_t st_success{ substate_of{ st_finishing } };
public :
email_analyzer( context_t ctx,
string email_file,
mbox_t reply_to )
: agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to))
{}
virtual void so_define_agent() override {
st_wait_io
.event( &email_analyzer::on_load_succeed )
.event( &email_analyzer::on_load_failed )
// Назначаем тайм-аут для ожидания ответа.
.time_limit( 1500ms, st_failure );
st_wait_checkers
.event( [this]( const email_headers_checker::result & msg ) {
on_checker_result( msg.status_ );
} )
.event( [this]( const email_body_checker::result & msg ) {
on_checker_result( msg.status_ );
} )
.event( [this]( const email_attach_checker::result & msg ) {
on_checker_result( msg.status_ );
} )
// Еще один тайм-аут для ответов.
.time_limit( 750ms, st_failure );
// Для состояний, которые отвечают за завершение работы,
// нужно определить только обработчики входа.
st_finishing.on_enter( [this]{ so_deregister_agent_coop_normally(); } );
st_failure.on_enter( [this]{
send< check_result >( reply_to_, email_file_, status_ );
} );
st_success.on_enter( [this]{
send< check_result >( reply_to_, email_file_, check_status::safe );
} );
}
virtual void so_evt_start() override {
// Начинаем работать в состоянии по умолчанию, поэтому
// нужно принудительно перейти в нужное состояние.
st_wait_io.activate();
// При старте сразу же отправляем запрос IO-агенту для загрузки
// содержимого email файла.
send< load_email_request >(
so_environment().create_mbox( "io_agent" ),
email_file_,
so_direct_mbox() );
}
private :
const string email_file_;
const mbox_t reply_to_;
// Храним последний отрицательный результат для того, чтобы отослать
// его при входе в состояние st_failure.
check_status status_{ check_status::check_failure };
int checks_passed_{};
void on_load_succeed( const load_email_succeed & msg ) {
// Меняем состояние т.к. переходим к следующей операции.
st_wait_checkers.activate();
try {
auto parsed_data = parse_email( msg.content_ );
introduce_child_coop( *this,
// Агенты-checker-ы будут работать на своем собственном
// thread-pool-диспетчере, который был создан заранее
// под специальным именем.
disp::thread_pool::create_disp_binder(
"checkers", disp::thread_pool::bind_params_t{} ),
[&]( coop_t & coop ) {
coop.make_agent< email_headers_checker >(
so_direct_mbox(), parsed_data->headers() );
coop.make_agent< email_body_checker >(
so_direct_mbox(), parsed_data->body() );
coop.make_agent< email_attach_checker >(
so_direct_mbox(), parsed_data->attachments() );
} );
}
catch( const exception & ) {
st_failure.activate();
}
}
void on_load_failed( const load_email_failed & ) {
st_failure.activate();
}
void on_checker_result( check_status status ) {
// На первом же неудачном результате прерываем свою работу.
if( check_status::safe != status ) {
status_ = status;
st_failure.activate();
}
else {
++checks_passed_;
if( 3 == checks_passed_ )
// Все результаты получены. Можно завершать проверку с
// положительным результатом.
st_success.activate();
}
}
};
Ну а теперь имеет смысл посмотреть на код получившегося агента email_analyzer и задать себе простой, но важный вопрос: а оно того стоило?
Очевидно, что с ответом на этот вопрос все не так однозначно. Но поговорить об этом мы попробуем уже в следующей статье. В которой затронем тему уроков, которые мы извлекли после более чем десяти лет использования SObjectizer в разработке программных систем.
Исходные коды к показанным в статье примерам можно найти в этом репозитории.
Автор: eao197