Разработка многопоточных программ на C++ — это не просто. Разработка больших многопоточных программ на C++ — это очень не просто. Но, как это обычно бывает в C++, жизнь сильно упрощается, если удается подобрать или сделать «заточенный» под конкретную задачу инструмент. Четырнадцать лет назад выбирать было особенно не из чего, поэтому мы сами для себя сделали такой «заточенный» инструмент и назвали его SObjectizer. Опыт повседневного использования SObjectizer-а в коммерческом софтостроении пока не позволяет жалеть о содеянном. А раз так, то почему бы не попробовать рассказать о том, что это, для чего это и почему у нас получилось именно так, а не иначе…
Что это?
SObjectizer — это небольшой OpenSource инструмент, свободно распространяющийся под 3-пунктной BSD-лицензией. Основная идея, лежащая в основе SObjectizer, — это построение приложения из мелких сущностей-агентов, которые взаимодействуют между собой через обмен сообщениями. SObjectizer при этом берет на себя ответственность за:
- доставку сообщений агентам-получателям внутри одного процесса;
- управление рабочими нитями, на которых агенты обрабатывают адресованные им сообщения;
- механизм таймеров (в виде отложенных и периодических сообщений);
- возможности настройки параметров работы перечисленных выше механизмов.
Можно сказать, что SObjectizer является одной из реализаций Actor Model. Однако, главное отличие SObjectizer от других подобных разработок, — это сочетание элементов из Actor Model с элементами из других моделей, в частности, Publish-Subscribe и CSP.
В «классической» Actor Model каждый актор и есть непосредственный адресат в операции send. Т.е. если мы хотим отослать сообщение какому-то актору, мы должны иметь ссылку на актора-получателя или идентификатор этого актора. Операция send просто добавляет сообщение в очередь сообщений актора-получателя.
В SObjectizer операция send получает ссылку не на актора, а на такую штуку, как mbox (message box, почтовый ящик). Mbox можно рассматривать как некий прокси, скрывающий реализацию процедуры доставки сообщения до получателей. Таких реализаций может быть несколько, и они зависят от типа mbox-а. Если это multi-producer/single-consumer mbox, то, как и в «классической» Actor Model, сообщение будет доставлено единственному получателю, владельцу mbox-а. А вот если это multi-producer/multi-consumer mbox, то сообщение будет доставлено всем получателям, которые подписались на данный mbox.
Т.е. операция send в SObjectizer больше похожа на операцию publish из модели Publish-Subscribe, нежели на send из Actor Model. Следствием чего является наличие такой полезной на практике возможности, как широковещательная рассылка сообщений.
Механизм доставки сообщений в SObjectizer похож на модель Publish-Subscribe еще и процедурой подписки. Если агент хочет получать сообщения типа A, то он должен подписаться на сообщения типа A из соответствующего mbox-а. Если хочет получать сообщения типа B — должен подписаться на сообщения типа B. И т.д. При этом тип сообщения играет ту же самую роль, как и название топика в модели Publish-Subscribe. Ну и как в модели Publish-Subscribe, где получатель может подписаться на любое количество топиков, агент в SObjectizer может быть подписан на любое количество типов сообщений из разных mbox-ов:
class example : public so_5::agent_t {
...
public :
virtual void so_define_agent() override {
// Подписка разных обработчиков событий на разные сообщения
// из одного mbox-а.
// Тип сообщения автоматически выводится из типа аргумента
// обработчика события.
so_subscribe(some_mbox)
.event( &example::first_handler )
.event( &example::second_handler )
// Обработчик может быть задан и в виде лямбда-функции.
.event( []( const third_message & msg ){...} );
// Подписка одного и того же события на сообщения
// из разных mbox-ов.
so_subscribe(another_mbox).event( &example::first_handler );
so_subscribe(yet_another_mbox).event( &example::first_handler );
...
}
...
private :
void first_handler( const first_message & msg ) {...}
void second_handler( const second_message & msg ) {...}
};
Следующим важным отличием SObjectizer от других реализаций «классической» Actor Model является то, что в SObjectizer у агента нет своей очереди сообщений. Очередь сообщений в SObjectizer принадлежит рабочему контексту, на котором обслуживается агент. А рабочий контекст определяется диспетчером, к которому привязан агент.
Диспетчер — это одно из краеугольных понятий в SObjectizer. Диспетчеры определяют где и когда агенты будут обрабатывать свои сообщения.
Самый простой диспетчер владеет всего одной рабочей нитью. Все привязанные к такому диспетчеру агенты работают на этой общей нити. Эта нить владеет одной единственной очередью сообщений, и сообщения для всех агентов, привязанных к диспетчеру, помещаются в эту единственную очередь. Рабочая нить берет сообщение из очереди, вызывает обработчик сообщения у соответствующего агента-получателя, после чего переходит к следующему сообщению и т.д.
Есть и другие типы диспетчеров. Например, диспетчеры с пулами рабочих потоков, диспетчеры с поддержкой приоритетов агентов и разными политиками обработки этих приоритетов и т.д. Во всех случаях рабочий контекст и очередь сообщений для агента назначается диспетчером, к которому привязан агент.
Следующей отличительной чертой SObjectizer является наличие такого понятия, как "кооперация агентов". Кооперация — это группа агентов, которая совместно выполняет какую-то прикладную задачу. И эти агенты должны начинать и завершать свою работу единовременно. Т.е. если какой-то агент не может стартовать, то не стартуют и все остальные агенты кооперации. Если какой-то агент не может продолжать работу, то не могут продолжать свою работу и все остальные агенты кооперации.
Кооперации появились в SObjectizer потому, что в изрядном количестве случаев для выполнения более-менее сложных действий приходится создавать не одного, а сразу нескольких взаимосвязанных агентов. Если создавать их по одному, то нужно решить, например:
- кто стартует первым и в каком порядке он создает остальных;
- как уже стартовавшие агенты определят, что все остальные агенты уже созданы и можно начинать свою работу;
- что делать, если при старте очередного агента возникает какая-то проблема.
В случае с кооперациями все проще: взаимосвязанные агенты создаются все сразу, включаются в кооперацию и кооперация регистрируется в SObjectizer. А уже сам SObjectizer следит, чтобы регистрация кооперации выполнялась транзакционно (т.е. чтобы все агенты кооперации стартовали, либо чтобы не стартовал ни один):
// Создаем и регистрируем кооперацию из 3-х агентов:
// - первый обслуживает TCP-подключение к AMQP-брокеру и реализует AMQP-протокол;
// - второй выполняет взаимодействие с СУБД.
// - третий выполняет обработку прикладных сообщений от брокера
// (используя при этом функциональность первых двух агентов);
so_5::environment_t & env = ...;
// Сам экземпляр кооперации, в котором будут храниться агенты.
// У каждой кооперации должно быть уникальное имя, которое, как в данном
// случае, может быть сгенерировано самим SObjectizer-ом.
auto coop = env.create_coop( so_5::autoname );
// Наполняем кооперацию...
coop.make_agent< amqp_client >(...);
coop.make_agent< db_worker >(...);
coop.make_agent< message_processor >(...);
// Регистрируем кооперацию.
// Дальнейшей ее судьбой будет заниматься сам SObjectizer.
env.register_coop( std::move(coop) );
Отчасти кооперации решают ту же проблему, что и система супервизоров в Erlang: входящие в кооперацию агенты как бы находятся под контролем супервизора all-for-one. Т.е. сбой одного из агентов приводит к дерегистрации всех остальных агентов кооперации.
Следующей важной чертой SObjectizer является то, что агенты в SObjectizer — это конечные автоматы. Агент может иметь произвольное количество состояний, одно из которых в конкретный момент времени является текущим состоянием. Реакция агента на внешнее воздействие зависит как от входящего сообщения, так и от текущего состояния. Агент может обрабатывать одно и то же сообщение по-разному в разных состояниях, для чего он подписывает разные обработчики сообщений в каждом из состояний.
Агенты в SObjectizer могут представлять из себя довольно сложные конечные автоматы: поддерживается вложенность состояний, временные ограничения на пребывания агента в состоянии, состояния с deep- и shallow-history, а также обработчики входа и выхода в состояние.
class device_handler : public so_5::agent_t {
// Перечень состояний, в которых может находиться агент.
// Есть два состояния верхнего уровня...
state_t st_idle{ this }, st_activated{ this },
// ... и три состояния, которые являются подсостояниями
// состояния st_activated.
st_cmd_sent{ initial_substate_of{ st_activated } },
st_cmd_accepted{ substate_of{ st_activated } },
st_failure{ substate_of{ st_activated } };
...
public :
virtual void so_define_agent() override {
// Начинаем работать в состоянии st_idle.
st_idle.activate();
// При получении команды в состоянии st_idle просто меняем
// состояние и делегируем обработку команды состоянию st_activated.
st_idle.transfer_to_state< command >( st_activated );
// В состоянии st_activated реагируем всего на одно сообщение:
// по приходу turn_off возвращаемся в состояние st_idle.
// При этом реакция на сообщение turn_off наследуется
// всеми дочерними состояниями.
st_activated
.event( [this](const turn_off & msg) {
turn_device_off();
st_idle.activate();
} );
// В состояние st_cmd_sent "проваливаемся" сразу после входа
// в состояние st_activated, т.к. st_cmd_sent является начальным
// подсостоянием.
st_cmd_sent
.event( [this](const command & msg) {
send_command_to_device(msg);
// Проверим, что произошло с устройством через 150ms.
send_delated<check_status>(*this, 150ms);
} )
.event( [this](const check_status &) {
if(command_accepted())
st_cmd_accepted.activate();
else
st_failure.activate();
} );
...
// В состоянии st_failure находимся не более 50ms,
// после чего возвращаемся в st_idle.
// При входе в это состояние принудительно сбрасываем
// настройки устройства.
st_failure
.on_enter( [this]{ reset_device(); } )
.time_limit( 50ms, st_idle );
}
...
};
Из CSP-модели SObjectizer позаимствовал такую штуку, как каналы, которые в SObjectizer называются message chains. CSP-ные каналы были добавлены в SObjectizer как инструмент для решения одной специфической проблемы: взаимодействие между агентами строится через обмен сообщениями, поэтому очень просто дать какую-то команду агенту или передать какую-то информацию агенту из любой части приложения — достаточно отсылать сообщение посредством send. Однако, как агенты могут воздействовать на не-SObjectizer частью приложения?
Эту проблему решают message chains (mchains). Message chain может выглядеть совсем как mbox: отсылать сообщения в mchain нужно посредством все того же send-а. А вот извлекаются сообщения из mchain функциями receive и select, для работы с которыми не требуется создавать SObjectizer-овских агентов.
Работа с message chain в SObjectizer похожа на работу с каналами в языке Go. Хотя есть и серьезные различия:
- mchains в SObjectizer могут одновременно хранить сообщения любых типов, тогда как Go-шные каналы типизированы;
- в Go-шной конструкции select можно использовать как send-, так и receive-операции. Тогда как в SObjectizer-овском select-е допускаются только receive-операции (по крайней мере в версиях до 5.5.17 включительно);
- mchains в SObjectizer могут иметь, а могут и не иметь ограничений на размер очереди сообщений. Тогда как в Go размер канала ограничен всегда. Для mchain-а с ограниченным размером SObjectizer заставляет выбрать подходящее поведение для попытки поместить новое сообщение в полный mchain (например, подождать какое-то время и выбросить самое старое сообщение из mchain или ничего не ждать и сразу породить исключение).
Message chains — это относительно недавнее добавление в SObjectizer. Однако, штука оказалась весьма полезной и довольно неожиданным следствием ее добавления стало то, что некоторые многопоточные приложения на SObjectizer стало возможно разрабатывать даже без применения агентов:
void parallel_sum_demo()
{
using namespace std;
using namespace so_5;
// Тип сообщения, которое отошлет каждая рабочая нить в конце своей работы.
struct consumer_result
{
thread::id m_id;
size_t m_values_received;
uint64_t m_sum;
};
wrapped_env_t sobj;
// Канал для отсылки сообщений рабочим нитям.
auto values_ch = create_mchain( sobj,
// Канал имеет ограничение на размер, поэтому назначаем
// паузу в 5м на попытку добавить сообщение в полный канал.
chrono::minutes{5},
// Не более 300 сообщений в канале.
300u,
// Память под внутреннюю очередь канала выделяется заранее.
mchain_props::memory_usage_t::preallocated,
// Если место в канале не появилось даже после 5м ожидания,
// то просто прерываем все приложение.
mchain_props::overflow_reaction_t::abort_app );
// Простой канал для ответных сообщений от рабочих потоков.
auto results_ch = create_mchain( sobj );
// Рабочие потоки.
vector< thread > workers;
for( size_t i = 0; i != thread::hardware_concurrency(); ++i )
workers.emplace_back( thread{ [&values_ch, &results_ch] {
// Последовательно читаем значения из входного
// канала, подсчитываем количество значений и
// их сумму.
size_t received = 0u;
uint64_t sum = 0u;
receive( from( values_ch ), [&sum, &received]( unsigned int v ) {
++received;
sum += v;
} );
// Отсылаем результирующие значения назад.
send< consumer_result >( results_ch,
this_thread::get_id(), received, sum );
} } );
// Отсылаем несколько значений во входной канал. Эти значения будут
// распределятся между рабочими потоками, висящими на чтении данных
// из входного канала.
for( unsigned int i = 0; i != 10000; ++i )
send< unsigned int >( values_ch, i );
// Закрываем входной канал и даем возможность дочитать его содержимое
// до самого конца.
close_retain_content( values_ch );
// Получаем результирующие значения от всех рабочих нитей.
receive(
// Мы точно знаем, сколько значений должно быть прочитано.
from( results_ch ).handle_n( workers.size() ),
[]( const consumer_result & r ) {
cout << "Thread: " << r.m_id
<< ", values: " << r.m_values_received
<< ", sum: " << r.m_sum
<< endl;
} );
for_each( begin(workers), end(workers), []( thread & t ) { t.join(); } );
}
Зачем это?
Наверняка у читателей, которые никогда раньше не использовали Actor Model и Publish-Subscribe, уже возник вопрос: «И что, все вышеперечисленное действительно упрощает разработку многопоточных приложений на C++?»
Да. Упрощает. Проверенно на людях. Многократно.
Понятное дело, упрощает не для всех приложений. Ведь многопоточность — это инструмент, который используется в двух очень разных направлениях. Первое направление, называемое parallel computing, использует потоки для загрузки всех имеющихся вычислительных ресурсов и сокращения общего времени расчета вычислительных задач. Например, ускорение перекодирования видео за счет загрузки всех вычислительных ядер, при этом каждое ядро выполняет одну и ту же задачу, но на своем наборе данных. Это не то направление, для которого создавался SObjectizer. Для упрощения решения такого класса задач предназначены другие инструменты: OpenMP, Intel Threading Building Blocks, HPX и т.д.
Второе направление, называемое concurrent computing, использует многопоточность для обеспечения параллельного выполнения множества (почти) независимых активностей. Например, почтовый клиент в одном потоке может отправлять исходящую почту, во втором — загружать входящую, в третьем — редактировать новое письмо, в четвертом — выполнять фоновую проверку орфографии в новом письме, в пятом — проводить полнотекстовый поиск по почтовому архиву и т.д.
SObjectizer создавался как раз для направления concurrent computing, а перечисленные выше возможности SObjectizer позволяют уменьшить объем головной боли у разработчика.
Прежде всего за счет выстраивания взаимодействия между агентами через асинхронный обмен сообщениями.
Взаимодействие независимых потоков через очереди сообщений гораздо проще, чем через ручную работу с разделяемыми данными, защищенными семафорами или мутексами. Причем тем проще, чем больше рабочих потоков в приложении и чем чаще и разнообразнее их взаимодействие.
Запутаться в мутексах и условных переменных несложно даже на десятке рабочих потоков. А уж когда счет идет на сотни рабочих потоков, то ручная возня с низкоуровневыми примитивами синхронизации вообще оказывается за гранью возможностей даже опытных разработчиков. Тогда как сотня нитей, взаимодействующих через очереди сообщений, как показала практика, совершенно не проблема.
Так что главное, что дает разработчику SObjectizer (как и любая другая реализация Actor Model) — это возможность представления независимых активностей внутри приложения в виде агентов, общающихся с окружающим миром только через сообщения.
Следующий ключевой момент — это связывание агентов с подходящими рабочими контекстами.
Здравый смыл подсказывает (и практика это подтверждает), что выдать всем агентам по собственной рабочей нити не есть хорошо. Приложению может потребоваться десять тысяч независимых агентов. Или сто тысяч. Или даже миллион. Очевидно, что наличие такого количества рабочих нитей в системе ни к чему хорошему не приведет. Даже если ОС и будет способна создать их (смотря какая ОС и на каком оборудовании), то накладные расходы на обеспечение их работы все равно окажутся слишком большими, чтобы построенное таким образом приложение работало с приемлемой производительностью и отзывчивостью.
Противоположность, когда все агенты привязываются к одной общей нити или к одному единственному пулу нитей, также не является идеальным решением для всех случаев. Например, в приложении может оказаться десяток агентов, которым приходится работать со сторонним синхронным API (делать запросы к БД, общаться с подключенным к компьютеру устройствами, выполнять тяжелые вычислительные операции и т.д.). Каждый такой агент способен затормозить работу всех остальных агентов, которые окажутся с ним на одной рабочей нити. Несколько таких агентов запросто могут затормозить все приложение, если оно использует в работе единственный пул рабочих потоков: просто каждый из агентов займет один из потоков пула…
Как раз для решения этих проблем в SObjectizer есть диспетчеры и такая архиважная операция, как привязка агентов к соответствующим диспетчерам. Все вместе это дает должную свободу и гибкость разработчику, при этом избавляя разработчика от забот по управлению этими потоками.
Программист может создать столько диспетчеров, сколько ему нужно, и так распределить своих агентов между этими диспетчеры, как ему представляется правильным. Например, в приложении могут быть:
- один диспетчер типа one_thread, на котором некий агент работает AMQP-клиентом;
- один диспетчер типа thread_pool, на котором работают агенты, отвечающие за обработку сообщений из AMQP-шных топиков;
- один диспетчер типа active_obj, к которому привязываются агенты для взаимодействия с СУБД;
- еще один диспетчер типа active_obj, на котором будут работать агенты, общающиеся с подключенными к компьютеру HSM-ами;
- и еще один thread_pool-диспетчер для агентов, которые следят и управляют всей описанной выше кухней.
Еще одной штукой, которую сами пользователи SObjectizer отмечают как одну из важнейших, является поддержка отложенных и периодических сообщений. Т.е. работа с таймерами.
Очень часто бывает нужно выполнить какое-то действие через N миллисекунд. А затем через M миллисекунд проверить наличие результата. И, если результата нет, выждать K миллисекунд и повторить все заново. Ничего сложного: есть send_delayed, которая делает отложенную на указанное время отсылку сообщения.
Зачастую агенты работают на тактовой основе. Скажем, раз в секунду агент просыпается, выполняет пачку накопившихся за последнюю секунду операций, после чего засыпает до наступления очередного такта. Опять ничего сложного: есть send_periodic, которая повторяет доставку одного и того же сообщения с заданным темпом.
Почему SObjectizer именно такой?
SObjectizer никогда не был экспериментальным проектом, он всегда применялся для упрощения повседневной работы с C++. Каждая новая версия SObjectizer сразу шла в работу, SObjectizer постоянно использовался в разработке коммерческих проектов (в частности, в нескольких business-critical проектах компании Интервэйл, но не только). Это накладывало свой отпечаток на его развитие.
Работы над последним вариантом SObjectizer (мы его называем SObjectizer-5), начались в 2010-ом, когда стандарт C++11 еще не был принят, какие-то вещи C++11 кое-где уже поддерживались, а каких-то пришлось ждать более пяти лет.
В таких условиях не все получалось сделать удобно и лаконично с первого раза. Местами не хватало опыта использования C++11. Очень часто нас ограничивали возможности компиляторов, с которыми приходилось иметь дело. Можно сказать, что движение вперед шло методом проб и ошибок.
При этом нам требовалось еще и заботиться о совместимости: когда SObjectizer лежит в основе business-critical приложений, нельзя просто выбросить какой-то кусок из SObjectizer или каким-то кардинальным образом поменять часть его API. Поэтому даже если время показывало, что где-то мы ошиблись и что-то можно делать проще и удобнее, то возможности «взять и переписать» не было. Мы двигались и двигаемся эволюционным путем, постепенно добавляя новые возможности, но не выбрасывая в одночасье старые куски. В качестве небольшой иллюстрации: какого-либо серьезного нарушения обратной совместимости не было с момента выхода версии 5.5.0 осенью 2014-го года, хотя с тех пор состоялось уже около 20 релизов в рамках развития версии 5.5.
SObjectizer приобрел свои уникальные черты в результате многолетнего использования SObjectizer в реальных проектах. К сожалению, эта уникальность «вылазит боком» при попытках рассказать о SObjectizer широкой публике. Слишком уж SObjectizer не похож на Erlang и другие проекты, созданные по образу и подобию Erlang-а (например, C++ Actor Framework или Akka).
Вот, скажем, есть у нас возможность запустить несколько независимых экземпляров SObjectizer-а в одном приложении. Возможность весьма экзотичная. Но добавлена она была потому, что на практике иногда такое бывает необходимо. Для поддержки этой возможности в SObjectizer появилось такое понятие, как SObjectizer Environment. И этот SObjectizer Environment потребовалось «протягивать» через изрядную часть API SObjectizer-а, что не могло не сказаться на лаконичности кода.
А вот в C++ Actor Framework такой возможности изначально не было. API акторов в CAF выглядел гораздо проще, а примеры кода — короче. Из-за чего мы часто встречаемся с утверждениями, что CAF воспринимается проще и понятнее, чем SObjectizer.
Ирония, однако, в том, что со временем разработчики CAF-а так же пришли к выводу, что им нужно иметь нечто вроде SObjectizer Environment (они это называют actor_system). И в следующей версии CAF ожидается добавление этой штуки. С очередной поломкой совместимости между версиями CAF-а. В этих поломках, кстати говоря, CAF так же сильно опережает SObjectizer.
Еще одна вещь, которая проистекает из опыта использования SObjectizer, которая нам кажется правильной и естественной, но которая вызывает нездоровую реакцию публики: отсутствие поддержки в SObjectizer-5 встроенных средств для распределенности. Мы часто слышим что-то вроде «Ну как же так? Вот в Erlang-е есть, в Akka — есть, в CAF — есть, а у вас нет?!!»
Нет. По очень простой причине: в SObjectizer-4 такая поддержка была, но со временем выяснилось, что не бывает транспорта, который бы идеально подходил под разные условия. Если узлы распределенного приложения гоняют друг-другу большие куски видеофайлов — это одно. Если обмениваются сотнями тысяч мелких пакетов — совсем другое. Если C++ приложение должно общаться с Java приложениями — это третье. И т.д.
Поэтому мы решили не добавлять в SObjectizer-5 универсальный транспорт, который мог бы оказаться весьма посредственным в каждом из реальных сценариев использования, а задействовать те коммуникационные возможности, которые нужны под задачу. Где-то это AMQP, где-то MQTT, где-то REST. Просто все это реализуется сторонними инструментами. Что в итоге обходится проще, дешевле и эффективнее.
Коль скоро речь зашла о других подобных разработках, то можно сказать, что живых и развивающихся проектов подобного рода для C++ буквально раз-два и обчелся:
- прежде всего относительно молодой и еще не стабилизировавшийся CAF, который является настолько близкой реализацией Erlang-а средствами C++, насколько это возможно;
- старый, промышленного качества проект QP, сильно заточенный под разработку встраиваемых систем;
- минималистичная и тривиальная реализация Actor Model в коммерческой библиотеке Just::Thread Pro Actor Edition.
У каждого из этих проектов есть свои сильные и слабые стороны. Но, вообще говоря, на их фоне SObjectizer со своими возможностями, кросс-платформенностью, производительностью, количеством тестов, примеров и объемом документации выглядит отнюдь не бедным родственником. Что, опять же, является следствием того, что SObjectizer всегда был инструментом для решения практических задач.
Послесловие
В этой небольшой статье мы попытались дать краткие ответы на самые распространенные вопросы, с которыми мы сталкиваемся, пытаясь рассказывать о SObjectizer-е широкой публике: «Что это такое?», «Зачем это?» и «Почему оно именно такое?» Для сохранения приемлемого объема статьи мы контурно обозначили лишь самые значимые моменты и яркие черты SObjectizer-а. Если же углубляться в детали, то написать можно еще с десяток-другой статьей подобного объема. И если читателям будет интересно, то мы с удовольствием расскажем еще больше о SObjectizer-е, о принципах его работы, об опыте разработки софта с его помощью, о перспективах дальнейшего развития и т.д, и т.п.
Автор: eao197