Когда-то в SObjectizer-4 «из коробки» была доступна возможность построения распределенных приложений. Но не всегда это работало так хорошо, как хотелось бы. В итоге в SObjectizer-5 от поддержки распределенности в самом ядре SObjectizer-а мы отказались (подробнее этот вопрос рассматривается здесь). Отказались в пользу того, чтобы под конкретную задачу можно было выбрать конкретный транспорт с учетом особенностей этой самой задачи. Написав для этого соответствующую обвязку, которая будет скрывать от программиста детали работы выбранного транспорта.
В данной статье мы попробуем рассказать об одной такой обвязке вокруг MQTT и libmosquttio, посредством которой была реализована возможность взаимодействия частей распределенного приложения.
Что и зачем
Пару лет назад мы помогали одной команде с разработкой прототипа распределенной измерительной системы. Прототип заработал и, насколько нам известно, работает в демонстрационном режиме до сих пор. К сожалению, у команды возникли сложности с поиском дальнейшего финансирования и проект заморозили. Из-за «подвешенного» статуса проекта мы сможем рассказать только то, что сможем рассказать, без имен, явок и паролей подробностей.
Итак, была идея создания распределенной системы сбора некоторой измерительной информации. Планировалось иметь центральный сервер, куда стекается и где обрабатывается вся собранная информация. А также некоторое множество (от нескольких сотен до нескольких тысяч) «измерительных станций».
Первоначально центральный сервер раздает команды на сбор информации на измерительные станции. А уже дальше станции сами в любой момент могут отдавать информацию центральному серверу, при этом центральный сервер в любой момент может выдавать новые команды измерительным станциями.
Для реализации начинки измерительных станций решили использовать C++. Как из-за экономии ресурсов этих самых станций, так и из-за необходимости местами работать с системным API.
В реализации же центрального сервера можно было использовать использовать Java и/или что-то более удобное и безопасное, нежели C++. Хотя в рамках прототипа на C++ был написан демонстрационный «сервер» на C++, который поддерживал несколько сценариев сбора информации от измерительных станций, но без обработки этой информации.
MQTT в качестве транспорта
Итак, есть измерительная станция, на которой крутится написанный на C++ процесс, который должен взаимодействовать с центральным сервером: получать от сервера команды, отдавать серверу результаты выполнения команд, контролировать наличие связи с сервером, переподключаться к серверу при разрывах связи и т.д.
Возникал естественный вопрос: какой именно транспорт будет использоваться для общения центрального сервера с измерительными станциями?
Нужно сказать, что ответ на этот вопросы искали не очень долго. Остановились на протоколе MQTT, который:
- имеет официальную спецификацию и существуют реализации этой спецификации для разных языков программирования;
- очень простой и легковесный. Т.е. даже если придется для каких-то совсем уж специфических условий написать свою реализацию этого протокола, то это не составит большого труда;
- позволяет легко обмениваться информацией в обоих направлениях;
- предполагает использование простой, но мощной и удобной модели Publish/Subscribe.
В общем, по нашему мнению, раз уж MQTT для такого рода задач создавался, то грех его в подобных условиях не использовать.
Но MQTT — это транспорт. Сами данные было решено обрамлять в JSON. Благо, JSON достаточно простой, понятный и легковесный. Опять же, для любого мало-мальски востребованного языка программирования есть библиотеки, позволяющие работать с JSON-ом.
Таким образом мы столкнулись с тем, что в своем C++ коде мы должны подружить SObjectizer с MQTT и JSON-ом. Вот о том, как мы это сделали и поговорим дальше.
Что у нас получилось?
Мы сделали небольшую надстройку над SObjectizr-ом и libmosquitto под названием mosquitto_transport. Эта надстройка берет на себя задачи взаимодействия с MQTT-брокером и предоставляет разработчику небольшой API для публикации и подписки сообщения.
Библиотека mosquitto_transport написана на C++14, работает под Linux, поддерживает лишь часть возможностей MQTT (в частности, используется только QoS=0). В общем, в ней реализован тот минимум, который потребовался для прототипа.
Как выглядит использование mosquitto_transport?
Для использования mosquitto_transport разработчику нужно сделать несколько обязательных шагов, которые мы подробнее рассмотрим ниже.
Библиотека mosquitto_transport размещает все свое содержимое в пространстве имен mosquitto_transport. Но такое длинное имя использовать в примерах неудобно. Поэтому далее местами будет использоваться синоним mosqtt, который вводится следующим образом: namespace mosqtt = mosquitto_transport;
Шаг первый. Инициализация mosquitto_transport и libmosquitto
В качестве базы у mosquitto_transport лежит libmosquitto, а libmosquitto нуждается в явной инициализации в начале работы программы и явной деинициализации при завершении программы. Поэтому для работы с mosquitto_transport нужно создать где-то в программе объект типа lib_initializer_t. Этот объект в своем конструкторе проинициализирует libmosquitto, а в деструкторе — выполнит деинициализацию. Ссылку на lib_initializer_t нужно будет затем передать в конструктор агента a_transport_manager_t.
Обычно в программе это выглядит следующим образом:
int main() {
mosqtt::lib_initializer_t mosqtt_lib;
...
}
Шаг второй. Запуск агента a_transport_manager_t и получение экземпляра instance_t
Весь поток входящих и исходящих сообщений обслуживает специальный агент a_transport_manager_t. Разработчик должен сам создать агента этого типа у себя в программе и зарегистрировать этого агента в составе какой-нибудь кооперации.
Кроме собственно создания и регистрации агента a_transport_manager_t, нужно сделать еще одну важную вещь: взять у агента экземпляр объекта специального типа instance_t. Именно этот instance_t будет нужен для публикации и получения сообщений.
Если в приложении работают всего несколько агентов, то их всех можно поместить в одну кооперацию с агентом a_transport_manager_t:
int main() {
...
mosqtt::lib_initializer_t mosqtt_lib;
...
so_5::launch([&](so_5::environment_t & env) {
// Сразу регистрируем кооперацию, в которую войдут все
// агенты приложения.
env.introduce_coop([&](so_5::coop_t & coop) {
// Создаем транспортного агента.
auto transport = coop.make_agent<mosqtt::a_transport_manager_t>(...);
// Теперь можем создать и остальных агентов.
coop.make_agent<first_app_agent>(..., transport->instance(), ...);
coop.make_agent<second_app_agent>(..., transport->instance(), ...);
coop.make_agent<third_app_agent>(..., transport->instance(), ...);
});
});
}
Но в более сложных случаях, когда прикладных агентов больше и/или прикладные агенты могут возникать и исчезать по ходу работы приложения, имеет смысл создать отдельную кооперацию с a_transport_manager_t и привязать этого агента к собственному диспетчеру. В этом случае может быть разумным выделить создание a_transport_manager_t в отдельную вспомогательную функцию:
mosqtt::instance_t make_transport(
so_5::environment_t & env,
mosqtt::lib_initializer_t & mosqtt_lib,
... /* Какие-то дополнительные параметры */) {
mosqtt::instance_t instance;
env.introduce_coop(
// Транспортный агент будет работать на собственной рабочей нити.
so_5::disp::one_thread::create_private_disp(env)->binder(),
[&](so_5::coop_t & coop) {
auto transport = coop.make_agent<mosqtt::a_transport_manager_t>(
std::ref{mosqtt_lib},
... /* Дополнительные параметры */);
instance = transport->instance();
});
return instance;
}
Что позволяет затем использовать make_transport, например, следующим образом:
int main() {
...
mosqtt::lib_initializer_t mosqtt_lib;
...
so_5::launch([&](so_5::environment_t & env) {
// Запускаем MQTT-транспорт.
auto mqtt = make_transport(env, mosqtt_lib, ...);
// Регистрируем первую кооперацию с прикладными агентами приложения.
env.introduce_coop([&](so_5::coop_t & coop) {
coop.make_agent<first_app_agent>(..., mqtt, ...);
coop.make_agent<second_app_agent>(..., mqtt, ...);
coop.make_agent<third_app_agent>(..., mqtt, ...);
});
// Регистрируем вторую кооперацию с прикладными агентами приложения.
env.introduce_coop(...);
... // И так далее.
});
}
Примечательной особенностью агента a_transport_manager_t является то, что ряд обработчиков событий у него помечены как thread-safe. Что делает возможным привязку этого агента к диспетчеру типа adv_thread_pool. В этом случае агент сможет часть своих событий обрабатывать параллельно сразу на нескольких рабочих нитях. Хотя нам на практике эта функциональность не потребовалась.
Шаг третий. Реализация сериализации/десериализации прикладных сообщений
Самая важная часть при использовании mosquitto_transport — это реализация формата представления передаваемых сообщений. Например, если у нас есть сообщение вида:
struct sensor_data_t {
data_category_t category_;
timestamp_t when_;
int value_;
...
};
то для передачи его MQTT-брокеру и далее подписчикам следует преобразовать сообщение в какой-то формат. Например, в XML, JSON или ProtoBuf. Для того, чтобы это происходило автоматически во время работы с mosquitto_transport разработчик должен реализовать сериализацию/десериализацию. Подробнее этот вопрос рассматривается ниже, пока же покажем основную идею.
Во-первых, разработчик должен определиться с тем, как данные будут представлены. Например, это может быть текстовое JSON-представление. Или же бинарное ProtoBuf представление. Может быть и то, и другое — mosquitto_transport этому не препятствует, но разработчик сам должен следить за тем, какой формат сообщений в каких топиках у него используется.
Во-вторых, для каждого своего формата разработчик должен определить тип-тег. Например, если мы хотим использовать только JSON, то мы определяем тип-тег следующего вида:
struct json_encoding {};
Этот тип-тег потребуется для параметризации шаблонов и выбора перегруженных функций.
В-третьих, разработчик должен для каждого своего форма определить в пространстве имен mosquitto_transport частичную специализацию для шаблонных классов encoder_t и decoder_t.
Чтобы получилось что-то вроде:
// Это наш тип-тег, который определяет формат представления данных.
struct my_encoding_tag {};
// Реализация сериализации/десериализации данных через специализацию
// типов внутри пространства имен mosquitto_transport.
namespace mosquitto_transport {
// Реализация сериализации.
template<typename Msg>
struct encoder_t<my_encoding_tag, Msg> {
static std::string encode(const Msg & what) {
... // Тут какой-то код по сериализации.
}
};
// Реализация десериализации.
template<typename Msg>
struct decoder_t<my_encoding_tag, Msg> {
static Msg decode(const std::string & payload) {
... // Тут какой-то код по десериализации.
}
};
} /* namespace mosquitto_transport */
Далее операции публикации сообщений и подписки на сообщения нужно будет указывать тип-тег в качестве маркера, благодаря которому mosquitto_transport будет понимать, как именно нужно сериализовать и десериализовать сообщения.
Последующие шаги. Публикация и получение сообщений
После того как три вышеозначенных обязательных шага выполнены, можно выполнять операции публикации сообщений и подписки на сообщения.
Публикация выглядит следующим образом:
mosqtt::topic_publisher_t<Tag_Type>::publish(
// Объект instance, полученный от a_transport_manager_t.
instance,
// Имя топика на MQTT-брокере, в который будет выполняться
// публикация сообщения.
"some/topic/name",
// Объект, чье содержимое будет опубликовано.
some_message_instance);
Такой операций мы говорим, что хотим опубликовать сообщение в конкретный топик и что это сообщение должно быть сериализовано соответствующим образом (формат определяет шаблонный параметр Tag_Type). Например, мы могли бы написать так:
mosqtt::topic_publisher_t<my_encoding_tag>::publish(
instance,
"/sensor/1023/updates",
sensor_data_t{
data_category_t::recent_value, current_timestamp, value});
В этом случае mosquitto_transport сериализовал бы данные из сообщения типа sensor_data_t в формат, определенный для my_encoding_tag, и опубликовал бы получившееся сообщение в топик "/sensor/1023/updates".
Для того, чтобы получать сообщения, которые кто-то публикует в интересующие нас топики нужно сделать как бы «двойную подписку»: нужно вызвать специальный метод subscribe() и внутри этого метода подписаться на сообщения из специального mbox-а. Например, если мы хотим получать все сообщения из топиков "/sensor/+/updates", то мы можем сделать вот такой вызов subscribe() внутри какого-то из своих агентов:
class my_agent_t : public so_5::agent_t {
...
void so_define_agent() override {
mosqtt::topic_subscriber_t<my_encoding_tag>::subscribe(
// Объект instance, полученный от a_transport_manager_t.
instance_,
// Топик, на который хотим подписаться.
"sensor/+/updates",
// Лямбда, внутри которой будет происходить подписка на
// входящие сообщения.
[this](so_5::mbox_t mbox) {
// Сама подписка для того, чтобы агент мог получать
// входящие сообщения.
so_subscribe(mbox).event(&my_agent_t::on_sensor_data);
});
}
...
void on_sensor_data(
mhood_t<mosqtt::incoming_message_t<my_encodig_tag>> cmd) {
...
}
};
Двойная подписка здесь нужна потому, что у нас, по сути, есть два разных действия:
- во-первых, нам нужно заставить transport_manager-а подписаться на конкретный топик на MQTT-брокере. Без этой подписки брокер просто не будет доставлять нам новые входящие сообщения. Собственно, именно это и делает метод subscribe();
- во-вторых, когда MQTT-брокер доставляет нам сообщение из подписанного нами топика, то нужно доставить это сообщение до агента-получателя. Для этого должен использоваться механизм подписок SObjectizer-а. Как раз поэтому внутри subscribe() создается новый mbox, который будет связан с конкретной подпиской на MQTT-брокере. Когда брокер отдаст нам новое сообщение, то это сообщение будет отослано в mbox. Соответственно, сообщение дойдет до агента, который подписался на этот mbox.
Простейший Hello, World
Для демонстрации mosquitto_transport «в деле» сделаем типичный хелло-ворлдовский пример. В пример входят два приложения:
- demo_server_app. Подключается к локальному MQTT-шному брокеру и подписывается на сообщения из топиков "/client/+/hello". Когда сообщение приходит, отвечает ответным сообщением в топик "/client/:ID/replies";
- demo_client_app. Подключается к локальному MQTT-шному брокеру, подписывается на топик "/client/:ID/replies" и публикует сообщение в топик "/client/:ID/hello". Завершает свою работу как только в "/client/:ID/replies" приходит ответ.
В качестве ":ID" в именах топиков будет использоваться целочисленный идентификатор, который demo_client_app генерирует случайным образом.
Сообщения передаются в JSON-формате.
Исходные тексты примера можно найти в этом репозитории. Мы же пройдемся по основным моментам реализации примера.
Определение сообщений для взаимодействия «клиента» и «сервера»
В заголовочном файле demo/messages.hpp сделаны определения структур данных, которые выступают в качестве сообщений для взаимодействия «клиента» и «сервера». Клиент отсылает серверу сообщение типа client_hello_t, а сервер в ответ присылает сообщение типа server_hello_t. Конечно же, в виде MQTT-сообщений летают сериализованные в JSON представления этих сообщений. Но в программе работа идет именно с типами client_hello_t/server_hello_t.
Для того, чтобы сообщения типов client_hello_t/server_hello_t сериализовались и десериализовались должным образом, мы определяем тип-тег с именем json_format_t. И делаем частичную специализацию encoder_t и decoder_t в соответствии с тем, как мы об этом говорили выше. Для работы с JSON-ом мы используем небольшую надстройку над RapidJSON под названием json_dto. Поэтому внутри наших типов client_hello_t/server_hello_t есть шаблонные методы json_io — это как раз специфика json_dto.
Итак, вот что находится в messages.hpp:
#pragma once
#include <json_dto/pub.hpp>
#include <mosquitto_transport/pub.hpp>
namespace demo {
// Тип-тег, который будет указывать в каком формате должны быть
// сериализованы сообщения.
struct json_format_t {};
// Удобные имена для использования.
using publisher_t = mosquitto_transport::topic_publisher_t<json_format_t>;
using subscriber_t = mosquitto_transport::topic_subscriber_t<json_format_t>;
// Сообщение о появлении нового клиента.
struct client_hello_t {
int id_;
std::string greeting_;
std::string reply_topic_;
template<typename Json_Io>
void json_io(Json_Io & io) {
using namespace json_dto;
io & mandatory("id", id_)
& mandatory("greeting", greeting_)
& mandatory("reply_topic", reply_topic_);
}
};
// Ответное сообщение от сервера.
struct server_hello_t {
std::string greeting_;
template<typename Json_Io>
void json_io(Json_Io & io) {
io & json_dto::mandatory("greeting", greeting_);
}
};
} /* namespace demo */
// Средства для сериализации-десериализации сообщений.
namespace mosquitto_transport {
template<typename Msg>
struct encoder_t<demo::json_format_t, Msg> {
static std::string encode(const Msg & msg) {
return json_dto::to_json(msg);
}
};
template<typename Msg>
struct decoder_t<demo::json_format_t, Msg> {
static Msg decode(const std::string & json) {
return json_dto::from_json<Msg>(json);
}
};
} /* namespace mosquitto_transport */
Отдельного пояснения, возможно, заслуживают определения имен publisher_t и subscriber_t. Они нужны для того, чтобы в прикладном коде можно было писать, например:
publisher_t::publish(...)
вместо:
mosqtt::topic_publisher_t<json_format_t>::publish(...)
Общий утилитарный код
Еще один заголовочный файл, demo/common.hpp, содержит вспомогательные функции, который нужны как клиенту, так и серверу.
Функция make_loggers(), которую мы рассматривать не будем, создает объекты для логгирования. Нам нужно два таких объекта. Первый будет использоваться mosquitto_transport-ом. Второй — демонстрационным приложением. В примере для логгеров уровень детализации выставляется в spdlog::level::trace. Т.е. логироваться будет все, что позволяет наблюдать практически за всем, что происходит внутри приложения.
А вот функция run_transport_manager(), код которой мы посмотрим, служит как раз для создания и запуска агента a_transport_manager_t. Возвращает run_transport_manager() экземпляр типа mosquitto_transport::instance_t, который затем потребуется для публикации и подписки:
auto run_transport_manager(
so_5::environment_t & env,
const std::string & client_id,
std::shared_ptr<spdlog::logger> logger) {
mosquitto_transport::instance_t mqtt;
env.introduce_coop([&](so_5::coop_t & coop) {
auto lib = coop.take_under_control(
std::make_unique<mosquitto_transport::lib_initializer_t>());
auto tm = coop.make_agent<mosquitto_transport::a_transport_manager_t>(
std::ref(*lib),
mosquitto_transport::connection_params_t{
client_id, "localhost", 1883u, 60u},
std::move(logger));
mqtt = tm->instance();
});
return mqtt;
}
Здесь создается новая кооперация, которая содержит единственного агента — a_transport_manager_t. Этому агенту в конструктор передаются параметры подключения к MQTT-брокеру (поскольку это хелло-ворловский пример, то достаточно MQTT-брокера на localhost-е) и логгер, который следует использовать. Но самое любопытное здесь — это создание объекта lib_initializer_t. Он создается динамически, а ответственность за его удаление перекладывается на кооперацию с агентом a_transport_manager_t. Тем самым у нас получается, что libmosquitto будет проинициализирована непосредственно перед созданием агента a_transport_manager_t, а деинициализированна она будет уже после того, как агент a_transport_manager_t перестанет существовать.
Основная часть demo_server_app
В demo_server_app всю прикладную логику выполняет агент listener_t, реализованный следующим образом:
class listener_t final : public so_5::agent_t {
public:
listener_t(
context_t ctx,
mosqtt::instance_t mqtt,
std::shared_ptr<spdlog::logger> logger)
: so_5::agent_t{std::move(ctx)}
, mqtt_{std::move(mqtt)}
, logger_{std::move(logger)}
{}
virtual void so_define_agent() override {
demo::subscriber_t::subscribe(mqtt_, "/client/+/hello",
[&](const so_5::mbox_t & mbox) {
so_subscribe(mbox).event(&listener_t::on_hello);
});
}
private:
mosqtt::instance_t mqtt_;
std::shared_ptr<spdlog::logger> logger_;
void on_hello(mhood_t<demo::subscriber_t::msg_type> cmd) {
logger_->trace("message received from topic: {}, payload={}",
cmd->topic_name(),
cmd->payload());
const auto msg = cmd->decode<demo::client_hello_t>();
logger_->info("hello received. client={}, greeting={}",
msg.id_, msg.greeting_);
demo::publisher_t::publish(
mqtt_,
msg.reply_topic_,
demo::server_hello_t{"World, hello!"});
}
};
Это, можно сказать, типичный маленький SObjectizer-овский агент. В своем перегруженном методе so_define_agent() выполняется подписка на сообщения из топиков, удовлетворяющих маске "/client/+/hello". Когда такие сообщения от брокера будут приходить, то для их обработки будет вызываться метод on_hello.
В метод on_hello() передается специальный объект, который содержит имя топика (в который сообщение было опубликовано) и само сообщение именно в том виде, в котором оно пришло от MQTT-брокера. Т.е. в JSON-представлении. Но мы хотим получить это сообщение в виде объекта типа client_hello_t. Для этого вызывается метод decode(), в котором и происходит десериализация тела сообщения из JSON-а в client_hello_t.
Ну а в конце on_hello(), публикуется ответное сообщение. Имя топика для публикации ответа берется из пришедшего client_hello_t.
Основная часть demo_client_app
В demo_client_app так же основную работу выполняет единственный агент типа client_t. Но этот агент несколько посложнее, чем listener_t в demo_server_app. Вот код client_t:
class client_t final : public so_5::agent_t {
public:
client_t(
context_t ctx,
int id,
mosqtt::instance_t mqtt,
std::shared_ptr<spdlog::logger> logger)
: so_5::agent_t{std::move(ctx)}
, id_{id}
, mqtt_{std::move(mqtt)}
, logger_{std::move(logger)} {
}
virtual void so_define_agent() override {
// Подписываемся на уведомления о наличии подключения к брокеру.
so_subscribe(mqtt_.mbox()).event(&client_t::on_broker_connected);
demo::subscriber_t::subscribe(mqtt_, make_topic_name("/replies"),
[&](const so_5::mbox_t & mbox) {
so_subscribe(mbox).event(&client_t::on_reply);
});
}
private:
const int id_;
mosqtt::instance_t mqtt_;
std::shared_ptr<spdlog::logger> logger_;
std::string make_topic_name(const char * suffix) const {
return std::string("/client/") + std::to_string(id_) + suffix;
}
void on_broker_connected(mhood_t<mosqtt::broker_connected_t>) {
// После того, как подключились к брокеру можем сообщить о
// своем существовании.
demo::publisher_t::publish(
mqtt_,
make_topic_name("/hello"),
demo::client_hello_t{
id_,
"Hello, World",
make_topic_name("/replies")});
}
void on_reply(mhood_t<demo::subscriber_t::msg_type> cmd) {
logger_->trace("message received from topic: {}, payload={}",
cmd->topic_name(),
cmd->payload());
const auto msg = cmd->decode<demo::server_hello_t>();
logger_->info("hello received. greeting={}", msg.greeting_);
// Теперь мы свою работу можем завершить.
logger_->warn("finishing");
so_environment().stop();
}
};
Здесь агент client_t обрабатывает два события. С событием, на которое подписывается обработчик on_reply(), все должно быть понятно — это обработка сообщения, которое приходит от MQTT-брокера.
Пояснить следует обработчик on_broker_connected. Дело в том, что когда клиент стартует, нужно выбрать момент, когда можно публиковать начальное сообщение в топике "/client/:ID/hello". Поскольку если агент client_t сделает это сразу после своего старта, то сообщение никуда не уйдет и будет потеряно, т.к. еще нет подключения к MQTT-брокеру. Поэтому агенту client_t нужно дождаться, пока это подключение будет установлено.
Взаимодействием с MQTT-брокером занимается a_transport_manager_t. Когда связь с брокером появляется (т.е. когда не только установили TCP-соединение с брокером, но и когда обменялись с брокером сообщениями CONNECT/CONNACT) агент a_transport_manager_t отсылает в специальный mbox сигнал broker_connected_t. На этот сигнал подписывается client_t и когда сигнал приходит, client_t публикует сообщение client_hello_t.
Результаты работы приложения
Запустив demo_server_app можно увидеть приблизительно следующую картину:
Здесь видно, как demo_server_app подключился к брокеру, как сделал подписку, как получил входящее сообщение и как ответил на него.
Запустив demo_client_app можно увидеть картинку вот такого плана:
Здесь видно, как demo_client_app подключился к брокеру, как сделал подписку, как отослал свое приветственное сообщение и как получил сообщение от demo_server_app в ответ.
Пару слов о принятых проектных решениях
Выше мы показали как выглядит на данный момент получившаяся у нас библиотека mosquitto_transport. Теперь можно попробовать сказать несколько слов о ряде проектных решений, которые были положены в основу реализации.
Небольшой дисклаймер: mosquitto_transport сама по себе является прототипом
Когда у нас появилась задача предоставить возможность SObjectizer-овским агентам общаться с внешним миром через MQTT, то мы быстро убедились, что использовать такие широко распространенные библиотеки, как libmosquitto и Paho не удобно. С примитивными C++ными обертками над ними так же все было печально.
Главная проблема, в частности, с libmosquitto, заключалась в том, что в libmosquitto предполагается делать обработку вообще всех входящих сообщений в одном и том же коллбэке. Тогда как нам нужно, чтобы в приложении было несколько независимых агентов, каждый из которых должен получать только те сообщения, в которых он заинтересован.
Соответственно, нужно было сделать какой-то слой, который бы располагался между прикладными агентами и libmosquitto. Чтобы этой слой скрывал от агентов всю требуху работы с libmosquitto-вскими вызовами и коллбэками. Но каким должен быть этот слой?
Готового ответа у нас, естественно, не было. В mosquitto_transport мы попытались найти этот ответ. Получилось более-менее нормально, особенно с учетом того, что сил в mosquitto_transport было вложено, откровенно говоря, совсем немного. Больше всего работы потребовала поддержка wildcard-ов в именах топиков при подписке. Ну и написание понятного README, чтобы затем самим не забыть, как пользоваться mosquitto_transport.
Наверняка можно было бы сделать лучше. Кроме того, в существующей реализации остались серьезные белые пятна. Например, поддержка уровней QoS отличных от 0. Или вопросы производительности, которые в рамках разрабатываемого прототипа были несущественными.
Так что сами мы рассматриваем текущую реализацию mosquitto_transport как вполне себе работающий и отлаженный прототип. Который можно брать и пробовать. Но который, наверняка, придется доработать напильником для каких-то других условий.
Всю основную работу делает a_transport_manager_t и одна вспомогательная рабочая нить
В mosquitto_transport практически всю работу выполняет агент a_transport_manager_t. Он создает экземпляр mosquitto, он реализует все нужные коллбэки, он запускает основной цикл событий libmosquitto на контексте отдельной рабочей нити посредством mosquitto_loop_start(), он же затем останавливает этот цикл через mosquitto_loop_stop.
Но в работе a_transport_manager_t есть несколько фокусов, на которых можно заострить внимание.
Делегирование работы с одной рабочей нити на другую
Основной цикл обработки событий libmosquitto выполняется на контексте отдельной рабочей нити. Это означает, что и коллбэки, вроде mosquitto_log_callback, mosquitto_connect_callback, mosquitto_subscribe_callback и т.д., вызываются на контексте этой самой отдельной нити. Но их этих коллбэков нужно получать доступ к данным агента a_transport_manager_t. Который, в свою очередь, работает на другой рабочей нити (или даже на нескольких других нитях в случае adv_thread_pool-диспетчера).
Мы не стали защищать данные агента a_transport_manager_t какими-то примитивами синхронизации (вроде std::mutex-а). Вместо этого мы из коллбэка отсылаем агенту a_transport_manager_t сообщение. Которое уже будет обработано на основном рабочем контексте агента. Из-за чего при обработке сообщения нам вообще не нужно думать о синхронизации доступа к данным.
Вот, например, как это выглядит в коде. Реализация mosquitto_message_callback:
void
a_transport_manager_t::on_message_callback(
mosquitto *,
void * this_object,
const mosquitto_message * msg )
{
auto tm = reinterpret_cast< a_transport_manager_t * >(this_object);
tm->m_logger->trace( "on_message, topic={}, payloadlen={}, qos={}"
", retain={}",
msg->topic, msg->payloadlen, msg->qos, msg->retain );
so_5::send< message_received_t >( tm->m_self_mbox, *msg );
}
И обработчик сообщения message_received_t:
void
a_transport_manager_t::on_message_received(
const message_received_t & cmd )
{
auto subscribers = m_delivery_map.match( cmd.m_topic );
if( !subscribers.empty() )
{
for( auto * s : subscribers )
s->deliver_message( cmd.m_topic, cmd.m_payload );
}
else
m_logger->warn( "message for unregistered topic, topic={}, "
"payloadlen={}",
cmd.m_topic, cmd.m_payload.size() );
}
Асинхронная работа с подписками
Когда какой-то агент делает вызов subscribe(), то подписка на соответствующий MQTT-шный топик выполняется не синхронно внутри subscribe(), а асинхронно. Вначале агенту a_transport_manager_t отсылается сообщение subscribe_topic_t. Затем, агент a_transport_manager_t обрабатывает это сообщение и пытается создать подписку на MQTT-брокере. Что может быть в данный момент и невозможно, если сейчас связи с брокером нет.
Соответственно, вызов subscribe() вовсе не означает, что при выходе из него подписка уже создана и агент сразу же начнет получать опубликованные в соответствующем топике сообщения. Обычно это не проблема, но помнить об этом факте следует.
Иногда агенту желательно знать, существует ли подписка на топик или же подписки по каким-то причинам нет. Например, агент хочет сперва подписаться на некий топик и лишь затем начать публиковать свои сообщения. В этом случае агент может подписаться на специальные сообщения subscription_available_t, subscription_unavailable_t и subscription_failed_t. Скажем, вот так может выглядеть код по подписке на топик с получением уведомления о том, что подписка была выполнена успешно:
mosqtt::topic_subscriber_t<my_encoding_tag>::subscribe(
mqtt,
"some/topic/name",
[&](const so_5::mbox_t & mbox) {
// Из mbox будут прилетать как опубликованные в топике сообщения...
so_subscribe(mbox).event(
[&](mhood_t<mosqtt::topic_subscriber_t<my_encoding_tag>::msg_type> cmd) {
const auto msg = cmd->decode<my_type>();
...
});
// ...но и сообщения subscription_available_t.
so_subscribe(mbox).event(
[&](mhood_t<mosqtt::subscription_available_t> cmd) {
...
});
});
Автоматическая отписка от топиков
Подписки на MQTT-шные топики создаются через вызовы subscribe(). А вот удаляются подписки автоматически. Происходит это за счет небольшой уличной магии, спрятанной в SObjectizer-овских mbox-ах (которые можно создавать под собственные, возможно, весьма специфические задачи).
Дело в том, что метод subscribe() создает специальный служебный mbox и именно этот mbox затем отдается в лямбда-функцию, в которой программист подписывает своего агента. Этот служебный mbox точно знает, сколько подписок сделано. Когда все подписки уничтожаются, например, при дерегистрации агента или при ручном удалении подписок агента, mbox понимает, что подписок больше нет и просить a_transport_manager_t удалить подписку на MQTT-шный топик.
Метод so_define_agent() как классический пример настройки агента с несколькими состояниями
Обычно мы сталкиваемся с двумя противоположными реакциями людей, которые знакомятся с SObjectizer-ом. Первые говорят о том, что у нас все как-то сложно, агентов нужно от чего-то наследовать, нужно знать про какие-то so_define_agent(), so_evt_start() и т.д. А еще у агентов непонятно зачем есть какие-то непонятные состояния.
Вторые, напротив, говорят о том, что состояния у агентов — это классно. А наличие методов so_define_agent(), so_evt_start() и so_evt_finishe(), как раз таки упрощает и написание агентов и, особенно, разбирательства с чужими агентами: мол сразу знаешь куда смотреть и где что искать.
Естественно, что вторая точка зрения нам гораздо ближе. И, думается, содержимое a_transport_manager_t является хорошим тому подтверждением. В частности, метод so_define_agent() сразу дает представление о том, что, как и когда этот агент обрабатывает:
void
a_transport_manager_t::so_define_agent()
{
st_working
.event( m_self_mbox, &a_transport_manager_t::on_subscribe_topic )
.event( m_self_mbox, &a_transport_manager_t::on_unsubscribe_topic )
.event( m_self_mbox, &a_transport_manager_t::on_message_received,
so_5::thread_safe );
st_disconnected
.on_enter( [this] {
// Everyone should be informed that connection lost.
so_5::send< broker_disconnected_t >( m_self_mbox );
} )
.event< connected_t >(
m_self_mbox, &a_transport_manager_t::on_connected );
st_connected
.on_enter( [this] {
// Everyone should be informed that connection established.
so_5::send< broker_connected_t >( m_self_mbox );
// All registered subscriptions must be restored.
restore_subscriptions_on_reconnect();
} )
.on_exit( [this] {
// All subscriptions are lost.
drop_subscription_statuses();
// No more pending subscriptions.
m_pending_subscriptions.clear();
} )
.event< disconnected_t >(
m_self_mbox, &a_transport_manager_t::on_disconnected )
.event( m_self_mbox, &a_transport_manager_t::on_subscription_result )
.event( m_self_mbox, &a_transport_manager_t::on_publish_message,
so_5::thread_safe )
.event( &a_transport_manager_t::on_pending_subscriptions_timer );
}
Причем агент a_transport_manager_t использует иерархию состояний, пусть и очень простую:
state_t st_working{ this, "working" };
state_t st_disconnected{ initial_substate_of{ st_working }, "disconnected" };
state_t st_connected{ substate_of{ st_working }, "connected" };
При этом нельзя не отметить, что a_transport_manager_t далеко не самый сложный SObjectizer-овский агент, который нам приходилось реализовывать. Чем больше и сложнее агент, тем выигрышнее оказывается наличие специализированных методов so_define_agent(), so_evt_start() и so_evt_finish().
Более подробно про сериализацию/десериализацию сообщений
Выше мы показали, как пользователь должен настроить сериализацию и десериализацию своих сообщений. Но не объяснили почему это делается именно так. Хотя это весьма принципиальный вопрос. Можно даже назвать его «краеугольным камнем». Поэтому нужно посвятить несколько слов этому аспекту.
Во-первых, несмотря на то, что mosquitto_transport мы делали для конкретного проекта, мы хотели сделать так, чтобы mosquitto_transport можно было бы переиспользовать и для других задач. Поэтому нужно было сделать так, чтобы mosquitto_transport не был привязан к конкретному формату данных. Чтобы можно было передавать данные хоть в JSON, хоть в XML, хоть в ProtoBuf, хоть в каком-то другом представлении. Возможно, даже используя несколько разных представлений в рамках одной программы.
Во-вторых, нам нужно было определиться с тем, кто будет тратить ресурсы на сериализацию/десериализацию сообщений: агент a_transport_manager_t или же прикладные агенты пользователя. Тратить на эти операции ресурсы a_transport_manager_t не хотелось, т.к. это прямой путь к тому, чтобы a_transport_manager_t стал узким местом. Поэтому мы пошли по второму пути: сериализация и десериализация выполняется на контексте пользователя. Причем сериализация выполняется автоматически внутри topic_publisher_t::publish. А вот десериализация выполняется вручную — пользователь сам должен указать, какой тип он хочет получить на выходе при вызове метода incoming_message_t::decode(). Вот как в этом примере:
void on_reply(mhood_t<demo::subscriber_t::msg_type> cmd) {
...
const auto msg = cmd->decode<demo::server_hello_t>();
В-третьих, есть еще момент ошибок, которые могут возникать при сериализации/десериализации сообщений. Например, пришло сообщение в формате XML, а мы попытались распарсить его как JSON. Получим ошибку и… И возникает вопрос: как об этой ошибке информировать и как эту ошибку обрабатывать?
Это еще одна причина, почему сериализация/десериализация вынесена на контекст пользователя. Если при вызове publish() или decode() возникнет связанное с ошибкой сериализации/десериализации исключение, то пользователь сможет это исключение обработать любым удобным для него способом.
Вот по совокупности всех этих факторов и была выбрана описанная ранее схема: для каждого формата данных пользователь должен определить специальный тип-тег, для которого нужно сделать частичную специализацию типов encoder_t и decoder_t. Такая схема позволяет иметь сразу несколько форматов для сообщений в одном приложении. Плюс все разрешается статически во время компиляции, поэтому если для какого-то типа или формата сериализация/десериализация не реализована, то это будет обнаружено в compile-time.
Общие впечатления от использования mosquitto_transport
Забавно, но каких-то особых впечатлений от использования mosquitto_transport-а не осталось. Оно просто работает.
Из описанного выше видно, что больше всего кода при работе с mosquitto_transport приходится делать для двух вещей:
- во-первых, создание a_transport_manager_t в начале работы приложения;
- во-вторых, написание кода по сериализации/десериализации сообщений;
Но, как оказалось, на практике ни один из этих моментов проблем не вызвал. Код для создания a_transport_manager_t довелось написать всего несколько раз: в паре-тройке тестов, ну и в основном приложении, для которого все это предназначалось. Причем, по сравнению с другим кодом, который нужен для инициализации приложения (разбор аргументов командной строки, чтение конфигурации, создание и настройка логгеров и т.д., и т.п.) несколько строчек для запуска a_transport_manager_t — это пусть и не капля в море, но все равно совсем немного.
Благодаря json_dto, кода для сериализации/десериализации сообщение приходится писать довольно мало. Ну и его все равно придется писать в том или ином виде, по крайней мере до тех пор, пока в C++ не завезут рефлексию.
Так что какой-то особой трудоемкости в использовании mosquitto_transport при реализации десятка агентов, общающихся через MQTT с внешним миром и использующих в своей работы порядка трех десятков сообщений, мы не заметили.
Вот что оставило гораздо более яркие впечатление, так это работа с libmosquitto и изучение поведения MQTT-шного брокера в каких-то ситуациях через изучения его исходного кода. Можно только пожалеть, что на глаза не попалось ничего более достойного, чем libmosquitto и Paho. Но это уже совсем другая история…
Заключение
Вот, пожалуй, и все, что мы хотели рассказать о своем эксперименте по реализации распределенности для SObjectizer-приложений на базе MQTT и libmosquitto. Если какие-то интересующие вас моменты остались «за кадром», то задавайте вопросы — постараемся раскрыть интересующие вас моменты в комментариях.
В заключение, уже традиционно, предлагаем попробовать SObjectizer (github) в деле и поделиться своими впечатлениями: практически все конструктивные отзывы используются нами для совершенствования SObjectizer-а.
Но сегодня хочется затронуть еще такой аспект. Наличие распределенности в ядре SObjectizer — местами хорошо, но местами сильно не очень (как показывает опыт SObjectizer-4). Отсутствие распределенности в ядре — опять местами хорошо, но местами не очень. Такое ощущение, что получается как в известной мудрости: чтобы ты не выбрал, все равно пожалеешь. Например, один из распространенных вопросов в адрес SObjectizer-5: «А что у вас с распределенностью?»
Уже начинает надоедать объяснять одно и тоже по сотому разу, посему закрадывается мысль: «А не проще ли сделать для SO-5 механизм для поддержки распределенности?» В связи с чем просим поделиться в комментариях своими соображениями на эту тему.
Нужна ли вам распределенность в SO-5? Если да, то для каких задач? Нужно ли вам передавать множество мелких сообщений? Или же вы передаете большие BLOB-ы? Нужна ли вам интероперабильность с другими языками? Или же вас устраивает протокол/формат, который поддерживается только одним фреймворком? Нужны ли вам подтверждения о доставке отдельных сообщений, автоматические перепосылки и т.д.? Какие-то средства интроспекции, мониторинга и сбора статистики по трафику?
В общем, чем больше хотелок будет озвучено, тем проще нам будет определиться с тем, делать ли поддержку прозрачной распределенности и, если делать, то в каком виде. А может быть и не проще. Но, в любом случае, будет какая-то информация для размышлений.
Автор: eao197