В первой статье речь шла о том, что такое SObjectizer. Во второй статье мы начали рассказывать как могут выглядеть агенты, почему, как и куда они эволюционируют. Сегодня мы продолжим этот рассказ, ещё более усложняя реализацию демонстрационных агентов. Заодно проверим надежность асинхронного обмена сообщениями.
В прошлый раз мы остановились на том, что операцию чтения содержимого файла с email-ом следует отдать на откуп отдельному IO-агенту. Давайте сделаем это и посмотрим, что получится.
Во-первых, нам потребуется набор сообщений, которыми между собой будут обмениваться IO-агент и email_analyzer:
// Запрос на загрузку содержимого файла.
struct load_email_request
{
// Имя файла для загрузки.
string email_file_;
// Куда нужно прислать результат.
mbox_t reply_to_;
};
// Успешный результат загрузки файла.
struct load_email_succeed
{
// Содержимое файла.
string content_;
};
// Неудачный результат загрузки файла.
struct load_email_failed
{
// Описание причины неудачи.
string what_;
};
Во-вторых, нам нужно определить, куда именно агент email_analyzer будет отсылать сообщение-запрос load_email_request. Мы могли бы пойти уже привычным путем: при регистрации IO-агента сохранить его direct_mbox, затем этот mbox передать параметром в конструктор агента analyzer_manager, затем параметром в конструктор каждого из агентов email_analyzer… В принципе, если бы нам нужно было бы иметь несколько разных IO-агентов, то так и следовало бы сделать. Но в нашей задачке вполне достаточно одного IO-агента. Что позволяет нам продемонстрировать именованные mbox-ы.
Именованный mbox создается обращением к so_5::environment_t::create_mbox(name). Если вызывать create_mbox несколько раз с одним и тем же именем, то возвращаться будет всегда один и тот же mbox, созданный при первом вызове create_mbox с этим именем.
IO-агент создает себе именованный mbox и подписывается на него. Агенты email_analyzer-ы получают этот же mbox когда им нужно отослать сообщение load_email_request. Тем самым мы избавляемся от необходимости «протаскивать» mbox IO-агента через analyzer_manager.
Теперь, когда мы определились с интерфейсом взаимодействия IO-агента и email_manager-а, мы можем сделать новый вариант агента email_analyzer:
// Пятая версия. С передачей IO-операции специальному IO-агенту.
class email_analyzer : public agent_t {
public :
email_analyzer( context_t ctx,
string email_file,
mbox_t reply_to )
: agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to))
{}
// Агент усложнился, у него появилось несколько обработчиков событий.
// Поэтому подписки агента лучше определять в специально предназначенном
// для этого виртуальном методе.
virtual void so_define_agent() override {
// Нам нужно получить два сообщения от IO-агента. Каждое
// из эти сообщений будет обрабатываться своим событием.
so_subscribe_self()
.event( &email_analyzer::on_load_succeed )
.event( &email_analyzer::on_load_failed );
}
virtual void so_evt_start() override {
// При старте сразу же отправляем запрос IO-агенту для загрузки
// содержимого email файла.
send< load_email_request >(
// mbox IO-агента будет получен по имени.
so_environment().create_mbox( "io_agent" ),
email_file_,
// Ответ должен прийти на наш собственный mbox.
so_direct_mbox() );
}
private :
const string email_file_;
const mbox_t reply_to_;
void on_load_succeed( const load_email_succeed & msg ) {
try {
// Стадии обработки обозначаем лишь схематично.
auto parsed_data = parse_email( msg.content_ );
auto status = check_headers( parsed_data->headers() );
if( check_status::safe == status )
status = check_body( parsed_data->body() );
if( check_status::safe == status )
status = check_attachments( parsed_data->attachments() );
send< check_result >( reply_to_, email_file_, status );
}
catch( const exception & ) {
// В случае какой-либо ошибки отсылаем статус о невозможности
// проверки файла с email-ом по техническим причинам.
send< check_result >(
reply_to_, email_file_, check_status::check_failure );
}
// Больше мы не нужны, поэтому дерегистрируем кооперацию,
// в которой находимся.
so_deregister_agent_coop_normally();
}
void on_load_failed( const load_email_failed & ) {
// Загрузить файл не удалось. Возвращаем инициатору запроса
// отрицательный результат и завершаем свою работу.
send< check_result >(
reply_to_, email_file_, check_status::check_failure );
so_deregister_agent_coop_normally();
}
};
Теперь агенты email_analyzer делегируют IO-операции другому агенту, который знает, как делать это эффективно. Соответственно, агенты email_analyzer-ы на своих рабочих нитях будут заниматься либо раздачей заданий IO-агенту, либо же обработкой ответов email_analyzer-ов. Это дает нам возможность изменить взгляд на то, сколько агентов email_analyzer мы можем создавать и сколько рабочих нитей им нужно.
Когда каждый агент email_analyzer сам выполнял синхронную IO-операцию нам нужно было иметь столько рабочих нитей в пуле, сколько параллельных IO-операций мы хотели разрешить. При этом не было смысла создавать намного больше агентов email_analyzer, чем количество рабочих нитей в пуле. Если в пуле 16 нитей, а мы позволяем одновременно существовать 32-м агентам, то это приведет к тому, что половина этих агентов будет просто ждать, когда же для них освободится какая-нибудь из рабочих нитей.
Теперь, после выноса IO-операций на другой рабочий контекст, можно, во-первых, сократить количество рабочих нитей в пуле. Агенты email_analyzer в своих событиях будут выполнять, в основном, нагружающие процессор операции. Поэтому нет смысла создавать больше рабочих потоков, чем есть доступных вычислительных ядер. Значит, если у нас 4-х ядерный процессор, то нам потребуется не 16 нитей в пуле, а не более 4-х.
Во-вторых, если IO-операции занимают больше времени, чем обработка содержимого email, то мы получаем возможность создать больше агентов email_analyzer, чем нитей в пуле. Просто большинство из этих агентов будут ждать результата своей IO-операции. Хотя, если время загрузки email-а сравнимо или меньше времени анализа его содержимого, то этот пункт потеряет свою актуальность и мы сможем создавать всего на 1-2-3 агента email_analyzer больше, чем количество нитей в пуле. Все эти настройки легко делаются в одном месте – в агенте analyzer_manager. Достаточно поменять буквально пару констант в его коде и увидеть, как изменения сказываются на производительности нашего решения. Однако, тюнинг производительности – это отдельная большая тема, углубляться в которую сейчас преждевременно...
Итак, у нас появилась очередная версия агента email_analyzer, которая устраняет проблемы предыдущих версий. Можем ли мы считать ее приемлемой?
Нет.
Проблема в том, что получившуюся реализацию нельзя считать надёжной.
Эта реализация рассчитана на оптимистичный сценарий, в котором отсылаемые нами сообщения никогда не теряются и всегда доходят до адресата, а дойдя до адресата, всегда обрабатываются. После чего мы всегда получаем нужный нам ответ.
Суровая правда жизни, однако, состоит в том, что когда система строится на асинхронном обмене сообщениями между отдельными акторами/агентами, то этот самый асинхронный обмен нельзя считать абсолютно надёжной штукой. Сообщения могут теряться. И это нормально.
Потеря сообщений может происходить по разным причинам. Например, агент-получатель еще просто не успел подписаться на сообщение. Или агента-получателя вообще в данный момент нет. Либо он есть, но у него сработал механизм защиты от перегрузки (подробнее об этом в одной из последующих статей). Либо агент есть и сообщение до него даже дошло, но агент находится в состоянии, в котором это сообщение не обрабатывается. Либо агент есть, сообщение до него дошло, он даже начал его обрабатывать, но в процессе обработки случилась какая-то прикладная ошибка и сбойнувший агент не отослал ничего в ответ.
В общем, общение между агентами посредством асинхронных сообщений – это как взаимодействие хостов через UDP протокол. В большинстве случаев датаграммы доходят до получателей. Но иногда теряются по дороге или даже при обработке.
Вышесказанное означает, что load_email_request может не дойти до IO-агента. Или, до агента email_analyzer могут не дойти ответные сообщения load_email_successed/load_email_failed. И что у нас получится в этом случае?
Мы получим агента email_analyzer, который присутствует в системе, но ничего не делает. Не работает. Не собирается умирать. И не дает стартовать какому-то другому агенту email_analyzer. Если нам не повезет, то мы можем столкнуться с ситуацией, когда все созданные analyzer_manager-ом агенты email_analyzer-ы превратятся в этаких ничего не делающих полутрупов. После чего analyzer_manager будет просто накапливать заявки в своей очереди, а потом выбрасывать их оттуда после истечения тайм-аута. Но никакой полезной работы выполняться не будет.
Как выйти из этой ситуации?
Например, за счёт контроля тайм-аутов. Мы можем либо ввести контроль времени выполнения IO-операции агентом email_analyzer (т.е., если нет ответа слишком долго, то считать, что IO-операция завершилась неудачно). Либо же ввести контроль времени выполнения всей операции анализа email-а в агенте analyzer_manager. Либо сделать и то, и другое.
Для простоты ограничимся отсчётом тайм-аута IO-операции в агенте email_analyzer:
// Шестая версия. С контролем тайм-аута для ответа IO-агента.
class email_analyzer : public agent_t {
// Этот сигнал потребуется для того, чтобы отслеживать отсутствие
// ответа от IO-агента в течении разумного времени.
struct io_agent_response_timeout : public signal_t {};
public :
email_analyzer( context_t ctx,
string email_file,
mbox_t reply_to )
: agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to))
{}
virtual void so_define_agent() override {
so_subscribe_self()
.event( &email_analyzer::on_load_succeed )
.event( &email_analyzer::on_load_failed )
// Добавляем еще обработку тайм-аута на ответ IO-агента.
.event< io_agent_response_timeout >( &email_analyzer::on_io_timeout );
}
virtual void so_evt_start() override {
// При старте сразу же отправляем запрос IO-агенту для загрузки
// содержимого email файла.
send< load_email_request >(
so_environment().create_mbox( "io_agent" ),
email_file_,
so_direct_mbox() );
// И сразу же начинам отсчет тайм-аута для ответа от IO-агента.
send_delayed< io_agent_response_timeout >( *this, 1500ms );
}
private :
const string email_file_;
const mbox_t reply_to_;
void on_load_succeed( const load_email_succeed & msg ) {
try {
auto parsed_data = parse_email( msg.content_ );
auto status = check_headers( parsed_data->headers() );
if( check_status::safe == status )
status = check_body( parsed_data->body() );
if( check_status::safe == status )
status = check_attachments( parsed_data->attachments() );
send< check_result >( reply_to_, email_file_, status );
}
catch( const exception & ) {
send< check_result >(
reply_to_, email_file_, check_status::check_failure );
}
so_deregister_agent_coop_normally();
}
void on_load_failed( const load_email_failed & ) {
send< check_result >(
reply_to_, email_file_, check_status::check_failure );
so_deregister_agent_coop_normally();
}
void on_io_timeout() {
// Ведем себя точно так же, как и при ошибке ввода-вывода.
send< check_result >(
reply_to_, email_file_, check_status::check_failure );
so_deregister_agent_coop_normally();
}
};
Вот этот вариант email_analyzer можно считать уже вполне приемлемым. В его коде напрашивается рефакторинг с вынесением парочки операций (send и so_deregister_agent_coop_normally) в отдельный вспомогательный метод. Но это не было сделано специально, дабы код каждой последующей версии агента email_analyzer минимально отличался от кода предыдущей версии.
И как раз если сравнить две показанные выше версии агента email_analyzer, то станет заметна одна особенность, которую очень ценят программисты, давно использующие SObjectizer в повседневной работе: простота и понятность процедуры расширения агентов. Потребовалось агенту реагировать на еще какое-то событие? Значит нужно добавить еще одну подписку и еще один обработчик события. А поскольку подписки, как правило, делаются в одних и тех же местах, то сразу понятно, куда именно идти и что именно править.
SObjectizer не накладывает каких-то ограничений на то, где и как агент пописывает свои события, но следование простому соглашению – подписки делаются в so_define_agent(), либо, в совсем простых случаях, для final-классов агентов, в конструкторе – что сильно упрощает жизнь. Заглядываешь в код чужого агента или даже в код своего агента, но написанного несколько лет назад, и сразу знаешь, что именно нужно смотреть, чтобы разобраться в поведении агента. Удобно, хотя для понимания этого удобства нужно, пожалуй, написать и отладить не одного реального агента, и даже не двух...
Однако, вернемся к теме надёжности агентов, которая была затронута выше и из-за которой появилась очередная, шестая по счету, версия агента email_analyzer: механизм асинхронного обмена сообщениями между агентами не надежен и с этим нужно как то жить.
Здесь нужно сделать важную ремарку: неправильно говорить, что механизм доставки сообщений в SObjectizer ну совсем уж «дырявый» и позволяет себе терять любые сообщения когда ему вздумается.
Сообщения в SObjectizer просто так не теряются, у каждой потери есть своя причина. Если агент отсылает сообщение самому себе, и функция send завершилась успешно, то сообщение до агента дойдет. Если только сам разработчик не предпримет явным образом каких-то действий, предписывающих SObjectizer-у выбросить это сообщение в определённом случае (например, разработчик не подписывает агента на сообщение в каком-то из состояний или задействует limit_then_drop, для защиты от перегрузки).
Итак, если разработчик сам не разрешает SObjectizer-у в определенных ситуациях выбрасывать определенные сообщения, то сообщение, которое агент отправил самому себе, до агента должно дойти. Поэтому в показанном выше коде мы совершенно спокойно отсылали сами себе отложенные сообщения не опасаясь того, что эти сообщения будут потеряны где-то по дороге.
Однако, когда сообщение отсылается другому агенту, то ситуация несколько меняется. Бывают случаи, когда мы уверены в успешности доставки. Например, если мы сами реализовали агента-получателя, да ещё и включили его в ту же кооперацию, в которой живет агент-отправитель.
Но если агент-получатель написан не нами, создается и уничтожается в составе чужой кооперации, если его поведение мы не контролируем, если мы не знаем, как именно агент защищается от перегрузок, как он себя ведет в той или иной ситуации, то уверенность у нас такая же, как при отсылке датаграммы по UDP-протоколу: если всё нормально, то скорее всего датаграмма до отправителя дойдет, а мы затем получим ответ. Если всё нормально. А вот если нет?
Мы подошли к интересному моменту: разработка софта на акторах/агентах из-за относительной ненадёжности асинхронного обмена сообщениями может выглядеть более трудоёмкой, чем при использовании подходов на основе синхронного взаимодействия объектов в программе.
Временами так оно и есть. Но зато в итоге, по нашему мнению, софт получается более надёжным, т.к. в коде поневоле приходится обрабатывать множество нештатных ситуаций, связанных как с потерей сообщений, так и с вариациями времен доставки и обработки сообщений.
Предположим, что email_analyzer-ы обращаются к io_agent посредством синхронного запроса, а не асинхронного сообщения, а о сбоях при выполнении IO-операции io_agent информирует посредством выбрасывания исключений. Долгое время всё будет работать нормально: email_analyzer синхронно запрашивает io_agent и получает в ответ либо содержимое email-а, либо исключение. Но в один прекрасный момент где-то внутри io_agent проявляется скрытый баг, и синхронный вызов просто подвисает. Ни ответа, ни исключения, просто зависание. Соответственно, подвисает сначала один email_analyzer, затем еще один, затем еще один и т.д. В итоге подвисшим оказывается все приложение.
Тогда как при асинхронном обмене сообщениями агент io_agent может повиснуть где-то у себя в потрохах. Но это не скажется на агентах email_analyzer, которые имеют возможность легко отследить истечение тайм-аута запроса и отослать отрицательный результат. Т.е., даже при сбоях в одной части приложения другие части приложения смогут продолжать свою работу, пусть эта работа будет состоять в генерировании потока отрицательных ответов. Ведь сам факт этого потока может стать важным симптомом и подсказать наблюдателю, что в приложении что-то пошло не так.
Кстати говоря, на тему наблюдения за работой написанного на агентах приложения.
За годы работы с SObjectizer у нас сложилось убеждение, что возможность увидеть что происходит внутри построенного на акторах/агентах приложения очень важна. В принципе, это было показано даже в данной статье. Если взять пятую версию email_analyzer без контроля тайм-аутов и попробовать ее запустить, то можно увидеть, как обработка запросов замедляется до тех пор, пока не останавливается совсем. Но как именно понять, в чем дело?
Хорошую подсказку могла бы дать информация о том, сколько агентов email_analyzer создано в данный момент и чем занимается каждый из них. Для этого нужна возможность мониторинга происходящего внутри приложения. Как раз то, за что так ценят Erlang и его платформу: там можно подключиться к работающей Erlang VM, посмотреть список Erlang-овых процессов, их параметры и т.д. Но в Erlang это возможно за счет того, что Erlang-овое приложение работает под управлением Erlang VM.
В случае с нативным C++приложением ситуация сложнее. В SObjectizer были добавлены средства для мониторинга происходящего внутри SObjectizer Environment (хотя эти средства пока обеспечивают лишь самый базовый функционал). Так, с их помощью в процессе работы нашего демонстрационного приложения можно получить следующую информацию:
mbox_repository/named_mbox.count -> 1 coop_repository/coop.reg.count -> 20 coop_repository/coop.dereg.count -> 0 coop_repository/agent.count -> 20 coop_repository/coop.final.dereg.count -> 0 timer_thread/single_shot.count -> 0 timer_thread/periodic.count -> 1 disp/ot/DEFAULT/agent.count -> 3 disp/ot/DEFAULT/wt-0/demands.count -> 8 disp/tp/analyzers/threads.count -> 4 disp/tp/analyzers/agent.count -> 16 disp/tp/analyzers/cq/__so5_au...109__/agent.count -> 1 disp/tp/analyzers/cq/__so5_au...109__/demands.count -> 0 disp/tp/analyzers/cq/__so5_au...124__/agent.count -> 1 disp/tp/analyzers/cq/__so5_au...124__/demands.count -> 0 ... disp/tp/analyzers/cq/__so5_au..._94__/agent.count -> 1 disp/tp/analyzers/cq/__so5_au..._94__/demands.count -> 0 disp/ot/req_initiator/agent.count -> 1 disp/ot/req_initiator/wt-0/demands.count -> 0
Этот выхлоп мониторинговой информации позволяет понять, что есть диспетчер с пулом потоков с именем «analyzers», в котором работает 4 рабочих потока. Именно на этом диспетчере в примере работают агенты email_analyzer. К диспетчеру привязаны 16 агентов, каждый из которых составляет отдельную кооперацию. И у этих агентов нет заявок. Т.е., агенты есть, а работы для них нет. И это уже повод разобраться почему так произошло.
Очевидно, что далеко не всегда низкоуровневая информация, которой располагает SObjectizer Environment, будет полезна прикладному программисту. Скажем, в обсуждаемом примере гораздо больше пользы разработчику мог бы дать счётчик количества агентов email_analyzer и размер длины списка заявок в агенте analyzer_manager. Но это прикладные данные, SObjectizer не имеет о них никакого понятия. Поэтому при разработке приложения на агентах программисту нужно позаботится о том, чтобы извне приложения была доступна информация, максимально полезная для оценки работоспособности и жизнеспособности приложения. Хотя это уже большая тема для отдельного разговора.
Пожалуй, на этом очередную статью можно закончить. В следующей статье попробуем показать, что можно сделать, если попробовать распараллелить операции анализа email-а ещё больше. Скажем, если выполнять параллельно операции анализа заголовков, тела письма и аттачей. И во что тогда превратится код агента email_analyzer.
Исходные коды к показанным в статье примерам можно найти в этом репозитории.
Автор: eao197