Подкинули задачу сделать микросервис, который получает данные от RabbitMQ, обрабатывает, и отправляет данные дальше по этапу в RabbitMQ. После отправки задания, я посмотрел на то что поучилось. Оказалось, что этот набор компонентов можно использовать для быстрого прототипирования pipeline архитектуры
Используемые компоненты:
Для примера буду делать микросервис для выдачи рейтинга игроков. От ядра системы в микросервис приходят следующие сообщения:
- player_registered(id,name);
- player_renamed(id,name);
- player_won(id, points).
Сервис раз в минуту должен отсылать сообщение с содержимым рейтинга.Рейтинг сортируется по набранным очкам за календарную неделю.
REACT-CPP
REACT-CPP — это обертка над libev на C++11. Эта библиотека нужна для организации цикла обработка событий(event loop).
Т.к. кроме работы с сокетом потребуются таймеры и обработчики unix сигналов.
class Application
{
public:
Application();
~Application();
using IntervalWatcherPtr = std::shared_ptr<React::IntervalWatcher>;
void run();
void shutdown();
//...
private:
bool onMinute();
//...
private:
React::MainLoop m_loop;
IntervalWatcherPtr m_minuteTimer;
//...
};
void Application::run()
{
m_minuteTimer = m_loop.onInterval(5.0, 60.0, std::bind(&Application::onMinute, this));
m_loop.onSignal(SIGTERM, [this]() -> bool
{
shutdown();
return false;
});
m_loop.onSignal(SIGUSR1, [this]()->bool{
cleanRating();
return true;
});
//...
m_loop.run();
}
bool Application::onMinute()
{
calculateRating();
sendRating();
return true;
}
Тут создаю таймер который стартует через 5 секунд и который будет вызывать обработчик каждые 60 секунд.
Любой приличный демон/сервис должен иметь обработчик SIGTERM, что бы из вне попросить его корректно завершится.
Что касается обработчика SIGUSR1 тут можно самостоятельно вычислять начало/конец недели через Boost.Date_Time, но мне тупо лень, когда в GNU/Linux есть cron+pkill.
AMQP-CPP
С тех пор как опубликовал RabbitMQ tutorials на C++ AMQP-CPP обзавелась реализацией обработчика на libev и libuv.
Подключение и обработка сообщения:
void Application::createChannel(AMQP::TcpConnection &connection)
{
m_channel = std::make_unique<AMQP::TcpChannel>(&connection);
m_channel->declareQueue(m_cfg.source().name, AMQP::durable)
.onSuccess([&](const std::string &name, uint32_t messagecount, uint32_t consumercount)
{
LOG(INFO) << "Declared queue "
<< name
<< ", message count: "
<< messagecount;
m_channel->consume(m_cfg.source().name)
.onReceived([&](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
onMessage(message, deliveryTag, redelivered);
})
.onError([](const char *message)
{
LOG(ERROR) << "Error consume:" << message;
APP->shutdown();
});
})
.onError([&](const char *message)
{
LOG(ERROR) << "Error declare queue:" << message;
shutdown();
});
}
void Application::onMessage(const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
parseMessage(message);
m_channel->ack(deliveryTag);
}
Публикация сообщения:
AMQP::Envelope env(s.GetString());
m_channel->publish("", m_cfg.destination().name, env);
LevelDB
Может потребоваться локальное хранилище данных. Взял LelevDB, я о нем писал в Использование LevelDB. Сделал лишь небольшую RAII обертку:
class DataBase
{
public:
DataBase();
bool open(const std::string &path2base, bool compression = true);
bool put(const std::string &key, const ByteArray &value, bool sync = false);
ByteArray get(const std::string &key);
Snapshot snapshot();
Iterator iterator();
private:
std::shared_ptr<leveldb::DB> m_backend;
};
class Snapshot
{
public:
Snapshot();
~Snapshot();
ByteArray get(const std::string &key);
Iterator iterator();
private:
Snapshot(const std::weak_ptr<leveldb::DB> &backend, const leveldb::Snapshot *snapshot);
private:
friend class DataBase;
std::weak_ptr<leveldb::DB> m_backend;
const leveldb::Snapshot *m_shapshot;
};
class Iterator
{
public:
Iterator(std::unique_ptr<leveldb::Iterator> rawIterator);
Iterator(Iterator &&iter);
/*!
* Create empty iterator
*/
Iterator() = default;
~Iterator();
bool isValid() const noexcept;
void next();
void prev();
std::string key();
ByteArray value();
/*!
* Seek to first
*/
void toFirst();
/*!
* Seek to last
*/
void toLast();
Iterator(const Iterator &) = delete;
Iterator &operator=(const Iterator &) = delete;
private:
std::unique_ptr<leveldb::Iterator> m_iterator;
};
LevelDB используется для сохранения/востановления состояния.
void Application::loadFromLocalStorage()
{
auto snapshot = m_localStorage->snapshot();
auto iter = snapshot.iterator();
iter.toFirst();
while (iter.isValid()) {
auto player = new Player(iter.value());
m_id2player[player->id] = player;
m_players.push_back(player);
iter.next();
}
}
void Application::updatePlayerInBD(const Player *player)
{
if (!m_localStorage->put(std::to_string(player->id), player->serialize())) {
LOG(ERROR) << "[" << player->id << ", "
<< player->name
<< "] is not updated in the database";
}
}
Логика сервиса
Данные приходят в формате JSON.
Разбирает json используя RapidJSON, ищу подходящий метод, вызываю нужный обработчик:
void Application::parseMessage(const AMQP::Message &message)
{
/*
* Схемка имеет вид
* {
* "method":"player_registered",
* "params":{
* ...
* }
* }
*/
rapidjson::Document doc;
doc.Parse(message.body(), message.bodySize());
const std::string method = doc["method"].GetString();
auto iter = m_handlers.find(method);
if (iter != m_handlers.end()) {
iter->second(*this, doc["params"]);
}
else {
LOG(WARNING) << "Unknown method:" << method;
}
}
Сами методы простые:
void Application::onPlayerRegistered(const JValue ¶ms)
{
auto obj = params.GetObject();
const uint64_t playerId = obj["id"].GetUint64();
if (!isRegistred(playerId)) {
auto player = new Player;
player->id = playerId;
player->name = obj["name"].GetString();
m_players.push_back(player);
m_id2player[playerId] = player;
updatePlayerInBD(player);
}
}
void Application::onPlayerRenamed(const JValue ¶ms)
{
auto obj = params.GetObject();
const uint64_t playerId = obj["id"].GetUint64();
if (isRegistred(playerId)) {
auto player = m_id2player[playerId];
player->name = obj["name"].GetString();
updatePlayerInBD(player);
}
else {
LOG(WARNING) << "Renaming an unknown user[" << playerId << "]";
}
}
void Application::onPlayerWon(const JValue ¶ms)
{
auto obj = params.GetObject();
const uint64_t playerId = obj["id"].GetUint64();
if (isRegistred(playerId)) {
auto player = m_id2player[playerId];
player->points += obj["points"].GetInt64();
updatePlayerInBD(player);
}
else {
LOG(WARNING) << "Unknown player[" << playerId << "]";
}
}
Раз в минуту сортируем игроков и отправляем рейтинг:
bool Application::onMinute()
{
calculateRating();
sendRating();
return true;
}
void Application::calculateRating()
{
std::sort(m_players.begin(), m_players.end(), [](const Player *a, const Player *b)
{
return a->points > b->points;
});
}
void Application::sendRating()
{
using namespace rapidjson;
StringBuffer s;
Writer<StringBuffer> writer(s);
writer.StartArray();
const size_t count = std::min(m_players.size(), size_t(10));
for (size_t i = 0;
i < count;
++i) {
writer.StartObject();
writer.Key("id");
writer.Uint64(m_players[i]->id);
writer.Key("name");
writer.String(m_players[i]->name.c_str());
writer.Key("points");
writer.Int64(m_players[i]->points);
writer.EndObject();
}
writer.EndArray();
AMQP::Envelope env(s.GetString());
m_channel->publish("", m_cfg.destination().name, env);
}
Весь код доступен на GitHub'e. Исходники библиотек поставляются вместе с сервисом и собираются автоматически на GNU/Linux с gcc.
Подведем итоги, что имеем:
- event loop с таймерами, обработчиками сигналов и всеми остальными плюшками libev;
- работа с RabbitMQ;
- встроенное key-value хранилище;
- поддержка json.
Автор: RPG18