Всем привет. Сегодня мы рассмотрим, как можно связать фреймворк gRPC в C++ и библиотеку Qt. В статье приведен код, обобщающий использование всех четырех режимов взаимодействия в gRPC. Помимо этого, приведен код, позволяющий использовать gRPC через сигналы и слоты Qt. Статья может быть интересна в первую очередь Qt разработчикам, заинтересованных в использовании gRPC. Тем не менее, обобщение четырех режимов работы gRPC написано на C++ без использования Qt, что позволит адаптировать код разработчикам, не связанных с Qt. Всех заинтересовавшихся прошу под кат.
Предыстория
Около полугода назад на мне висело два проекта, использующих клиентскую и серверную части gRPC. Оба проекта падали в продакшене. Эти проекты писали разработчики, которые уже уволились. Радовало только то, что я принимал активное участие в написании кода сервера и клиента gRPC. Но это было около года назад. Поэтому, как водится, пришлось разбираться со всем с нуля.
Код сервера gRPC писался с расчетом на то, что будет в дальнейшем генерироваться по .proto файлу. Код был написан неплохо. Однако, сервер обладал одним большим недостатком: к нему мог подключиться только один клиент.
Клиент gRPC был написан просто ужасно.
С кодом клиента и сервера gRPC я разобрался только спустя несколько дней. И понял, что возьми я какой-нибудь проект на пару недель, с сервером и клиентом gRPC пришлось бы разбираться заново.
Именно тогда я решил, что самое время написать и отладить клиент и сервер gRPC так, чтобы:
-
Можно было спокойно спать по ночам;
-
Не нужно было вспоминать, как это работает каждый раз, когда нужно написать клиента или сервер gRPC;
-
Можно было использовать написанных клиента и сервера gRPC в других проектах.
При написании кода я руководствовался следующими требованиями:
-
И клиент и сервер gRPC могут работать с использованием сигналов и слотов библиотеки Qt естественным образом;
-
Код клиента и сервера gRPC не нужно исправлять при изменении .proto файла;
-
Клиент gRPC должен уметь сообщить клиентскому коду состояние соединения с сервером.
Структура статьи следующая. Сначала будет краткий обзор результатов работы с клиентским кодом и небольшими пояснениями к нему. К конце обзора ссылка на репозиторий. Дальше будут общие вещи по архитектуре. Затем описание кода сервера и клиента (то, что под капотом) и заключение.
Краткий обзор
В качестве .proto файла был использован простейший pingproto.proto файл, в котором определены RPC всех видов взаимодействия:
syntax = "proto3";
package pingpong;
service ping
{
rpc SayHello (PingRequest) returns (PingReply) {}
rpc GladToSeeMe(PingRequest) returns (stream PingReply){}
rpc GladToSeeYou(stream PingRequest) returns (PingReply){}
rpc BothGladToSee(stream PingRequest) returns (stream PingReply){}
}
message PingRequest
{
string name = 1;
string message = 2;
}
message PingReply
{
string message = 1;
}
Файл pingpong.proto с точностью до имен повторяет файл helloworld.proto из статьи об асинхронных режимах работы gRPC в C++.
В итоге написанный сервер можно использовать примерно так:
class A: public QObject
{
Q_OBJECT;
QpingServerService pingservice;
public:
A()
{
bool is_ok;
is_ok = connect(&pingservice, SIGNAL(SayHelloRequest(SayHelloCallData*)), this, SLOT(onSayHello(SayHelloCallData*))); assert(is_ok);
is_ok = connect(&pingservice, SIGNAL(GladToSeeMeRequest(GladToSeeMeCallData*)), this, SLOT(onGladToSeeMe(GladToSeeMeCallData*))); assert(is_ok);
is_ok = connect(&pingservice, SIGNAL(GladToSeeYouRequest(GladToSeeYouCallData*)), this, SLOT(onGladToSeeYou(GladToSeeYouCallData*))); assert(is_ok);
is_ok = connect(&pingservice, SIGNAL(BothGladToSeeRequest(BothGladToSeeCallData*)), this, SLOT(onBothGladToSee(BothGladToSeeCallData*))); assert(is_ok);
}
public slots:
void onSayHello(SayHelloCallData* cd)
{
std::cout << "[" << cd->peer() << "][11]: request: " << cd->request.name() << std::endl;
cd->reply.set_message("hello " + cd->request.name());
cd->Finish();
}
//etc.
};
Когда клиент вызывает RPC, сервер gRPC уведомляет об этом клиентский код (в данном случае класс А) при помощи соответствующего сигнала.
Клиент gRPC можно использовать так:
class B : public QObject
{
Q_OBJECT
QpingClientService pingPongSrv;
public:
B()
{
bool c = false;
c = connect(&pingPongSrv, SIGNAL(SayHelloResponse(SayHelloCallData*)), this, SLOT(onSayHelloResponse(SayHelloCallData*))); assert(c);
c = connect(&pingPongSrv, SIGNAL(GladToSeeMeResponse(GladToSeeMeCallData*)), this, SLOT(onGladToSeeMeResponse(GladToSeeMeCallData*))); assert(c);
c = connect(&pingPongSrv, SIGNAL(GladToSeeYouResponse(GladToSeeYouCallData*)), this, SLOT(onGladToSeeYouResponse(GladToSeeYouCallData*))); assert(c);
c = connect(&pingPongSrv, SIGNAL(BothGladToSeeResponse(BothGladToSeeCallData*)), this, SLOT(onBothGladToSeeResponse(BothGladToSeeCallData*))); assert(c);
c = connect(&pingPongSrv, SIGNAL(channelStateChanged(int, int)), this, SLOT(onPingPongStateChanged(int, int))); assert(c);
}
void usage()
{
//Unary
PingRequest request;
request.set_name("user");
request.set_message("user");
pingPongSrv.SayHello(request);
//Server streaming
PingRequest request2;
request2.set_name("user");
pingPongSrv.GladToSeeMe(request2);
//etc.
}
public slots:
void SayHelloResponse(SayHelloCallData* response)
{
std::cout << "[11]: reply: " << response->reply.message() << std::endl;
if (response->CouldBeDeleted())
delete response;
}
//etc.
};
Клиент gRPC позволяет вызывать RPC напрямую, и подписаться на ответ сервера с помощью соответствующих сигналов.
Клиент gRPC также имеет сигнал:
channelStateChanged(int, int);
который сообщает о прошлом и текущем состояниях подключения к серверу. Весь код с примерами использования находится в репозитории qgrpc.
Как это работает
Принцип включения клиента и сервера gRPC в проект изображен на рисунке.
В .pro файле проекта указываются .proto файлы, на основе которых будет работать gRPC. В файле grpc.pri прописаны команды для генерации gRPC и QgRPC файлов. Компилятор protoc генерирует gRPC файлы [protofile].grpc.pb.h и [protofile].grpc.pb.cc. [protofile] — это имя .proto файла, переданного на вход компилятора.
Генерацией QgRPC файлов [protofile].qgrpc.[config].h занимается скрипт genQGrpc.py. [config] — это либо «server», либо «client».
Генерируемые QgRPC файлы содержат обертку Qt вокруг gRPC классов и вызовов с соответствующими сигналами. В предыдущих примерах, классы QpingServerService и QpingClientService объявлены соответственно в сгенерированных файлах pingpong.qgrpc.server.h и pingpong.qgrpc.client.h. Сгенерированные QgRPC файлы добавляются в обработку к moc'у.
В сгенерированных QgRPC файлах происходит включение файлов QGrpc[config].h, в которых и происходит вся основная работа. Подробнее об этом рассказано ниже.
Чтобы подключить всю эту конструкцию в проект, в .pro файле проекта нужно подключить файл grpc.pri и указать три переменные. Переменная GRPC определяет .proto файлы, которые будут переданы на входы компилятора protoc и скрипта genQGrpc.py. Переменная QGRPC_CONFIG определяет значение конфигурации сгенерированных QgRPC файлов и может содержать значения «server» или «client». Также можно определить опциональную переменную GRPC_VERSION для указания версии gRPC.
Подробнее обо всем сказанном читайте файл grpc.pri и .pro файлы примеров.
Архитектура сервера
Диаграмма классов сервера приведена на рисунке.
Толстыми стрелочками показана иерархия наследования классов, а тонкими — принадлежность членов и методов классам. В общем случае, для службы генерируется класс Q[servicename]ServerService, где servicename — имя службы, объявленное в .proto файле. RPCCallData — это управляющие структуры, сгенерированные для каждой RPC в службе. В конструкторе класса QpingServerService происходит инициализация базового класса QGrpcServerService асинхронной службой gRPC pingpong::ping::AsyncService. Для запуска службы нужно вызвать метод Start() с адресом и портом, на которых будет работать служба. В функции Start() реализована стандартная процедура запуска службы.
В конце функции Start() вызывается вызывается чисто виртуальная функция makeRequests(), которая реализована в сгенерированном классе QpingServerService:
void makeRequests()
{
needAnotherCallData< SayHello_RPCtypes, SayHelloCallData >();
needAnotherCallData< GladToSeeMe_RPCtypes, GladToSeeMeCallData >();
needAnotherCallData< GladToSeeYou_RPCtypes, GladToSeeYouCallData >();
needAnotherCallData< BothGladToSee_RPCtypes, BothGladToSeeCallData >();
}
Второй шаблонный параметр функции needAnotherCallData — это сгенерированные структуры RPCCallData. Эти же структуры являются параметрами сигналов в сгенерированном классе Qt службы.
Сгенерированные структуры RPCCallData наследуются от класса ServerCallData. В свою очередь, класс ServerCallData наследуется от респондера ServerResponder. Таким образом, создание объекта сгеренированных структур приводит к созданию объекта респондера.
Конструктор класса ServerCallData принимает два параметра: signal_func и request_func. signal_func — это сгенерированный сигнал, который вызывается после получения тэга из очереди. request_func — это функция, которая должна быть вызвана при создании нового респондера. Например, в данном случае это может быть функция RequestSayHello(). Вызов request_func происходит именно в функции needAnotherCallData(). Это сделано для того, чтобы управление респондерами (создание и удаление) происходило в службе.
Код функции needAnotherCallData() состоит из создания объекта респондера и вызова функции, связывающей респондер с вызовом RPC:
template<class RPCCallData, class RPCTypes>
void needAnotherCallData()
{
RPCCallData* cd = new RPCCallData();
//...
RequestRPC<RPCTypes::kind, ...>
(service_, cd->request_func_, cd->responder, ..., (void*)cd);
}
Функции RequestRPC() — это шаблонные функции для четырех видов взаимодействия. В итоге, вызов RequestRPC() сводится к вызову:
service_->(cd->request_func_)(...,cd->responder, (void*)cd);
где service_ — это служба gRPC. В данном случае, это pingpong::ping::AsyncService.
Для синхронной или асинхронной проверки очереди событий необходимо вызвать функции CheckCQ() или AsyncCheckCQ() соответственно. Код функции CheckCQ() сводится к вызовам функции синхронного получения тэга из очереди и обработки этого тэга:
virtual void CheckCQ() override
{
void* tag; bool ok;
server_cq_->Next(&tag, &ok);
//tagActions_ call
if (!tag)
return;
AbstractCallData* cd = (AbstractCallData*)tag;
if (!started_.load())
{
destroyCallData(cd);
return;
}
cd->cqReaction(this, ok);
}
После получения тэга из очереди идут проверки валидности тэга и старта сервера. Если сервер выключен, то тэг уже не нужен — его можно удалить. После этого вызывается функция cqReaction(), определенная в класса ServerCallData:
void cqReaction(const QGrpcServerService* service_, bool ok)
{
if (!first_time_reaction_)
{
first_time_reaction_ = true;
service_->needAnotherCallData<RPC, RPCCallData>();
}
auto genRpcCallData = dynamic_cast<RPCCallData*>(this);
void* tag = static_cast<void*>(genRpcCallData);
if (this->CouldBeDeleted())
{
service_->destroyCallData(this);
return;
}
if (!this->processEvent(tag, ok)) return;
//call generated service signal with generated call data argument
service_->(*signal_func_)(genRpcCallData);
}
Флаг first_time_reaction_ говорит о том, что нужно создать новый респондер для вызванной RPC. Функции CouldBeDeleted() и ProcessEvent() унаследованы от класса респондера ServerResponder соответствующего вида. Функция CouldBeDeleted() возвращает признак того, что объект респондера может быть удален. Функция processEvent() обрабатывает тэг и флаг ok. Так, например, для респондера вида Client Streaming функция выглядит следующим образом:
bool processEvent(void* tag, bool ok)
{
this->tag_ = tag;
read_mode_ = ok;
return true;
}
Функция ProcessEvent() вне зависимости от вида респондера всегда возвращает true. Возвращаемое значение этой функции оставлено для возможного расширения функциональности и, теоретически, для устранения ошибок.
После обработки события следует вызов:
service_->(*signal_func_)(genRpcCallData);
Переменная service_ — это экземпляр сгенерированной службы, в нашем случае QpingServerService. Переменная signal_func_ — это сигнал службы, соответствующий конкретной RPC. Например, SayHelloRequest(). Переменная genRpcCallData — это объект респондера соответствующего вида. С точки зрения вызывающего кода, переменная genRpcCallData — это объект одной из сгенерированных структур RPCCallData.
Архитектура клиента
По возможности, имена классов и функций клиента совпадают с именами классов и функций сервера. Диаграмма классов клиента приведена на рисунке.
Толстыми стрелочками показана иерархия наследования классов, а тонкими — принадлежность членов и методов классам. В общем случае, для службы генерируется класс Q[servicename]СlientService, где servicename — имя службы, объявленное в .proto файле. RPCCallData — это управляющие структуры, сгенерированные для каждой RPC в службе. Для вызова RPC, сгенерированный класс предоставляет функций, имена которых в точности соответствуют RPC, объявленным в .proto файле. В нашем примере, в .proto файле RPC SayHello() объявлена как:
rpc SayHello (PingRequest) returns (PingReply) {}
В сгенерированном классе QpingClientService соответствующая RPC функция выглядит так:
void SayHello(PingRequest request)
{
if(!connected()) return;
SayHelloCallData* call = new SayHelloCallData;
call->request = request;
call->responder = stub_->AsyncSayHello(&call->context, request, &cq_);
call->responder->Finish(&call->reply, &call->status, (void*)call);
}
Сгенерированные структуры RPCCallData, как и в случае сервера, наследуются в конечном счете от класса ClientResponder. Поэтому создание объекта сгенерированной структуры приводит к созданию респондера. После создания респондера происходит вызов RPC и связывание респондера с событием получения ответа от сервера. С точки зрения клиентского кода, вызов RPC выглядит следующим образом:
void ToSayHello()
{
PingRequest request;
request.set_name("user");
request.set_message("user");
pingPongSrv.SayHello(request);
}
В отличии от сгенерированного класса сервера QpingServerService, класс QpingClientService наследуется от двух шаблонных классов: ConnectivityFeatures и MonitorFeatures.
Класс ConnectivityFeatures отвечает за состояние подключения клиента с сервером и предоставляет в использование три функции: grpc_connect(), grpc_disconnect(), grpc_reconnect(). Функция grpc_disconnect() просто удаляет все структуры данных, отвечающие за взаимодействие с сервером. Вызов функции grpc_connect сводится к вызовы функции grpc_connect_(), которая создает управляющие структуры данных:
void grpc_connect_()
{
channel_ = grpc::CreateChannel(target_, creds_);
stub_ = GRPCService::NewStub(channel_);
channelFeatures_ = std::make_unique<ChannelFeatures>(channel_);
channelFeatures_->checkChannelState();
}
Класс ChannelFeatures отслеживает состояние канала связи channel_ с сервером. Класс ConnectivityFeatures инкапсулирует объект класса ChannelFeatures и с помощью этого объекта реализует абстрактные функции channelState(), checkChannelState() и connected(). Функция channelState() выдает последнее наблюдаемое (last observed) состояние канала связи с сервером. Функция checkChannelState(), фактически, выдает текущее состояние канала. Функция connected() возвращает признак подключения клиента к серверу.
Класс MonitorFeatures отвечает за получение и обработку событий с сервера и предоставляет в использование функцию CheckCQ():
bool CheckCQ()
{
auto service_ = dynamic_cast< SERVICE* >(this);
//connection state
auto old_state = conn_->channelState();
auto new_state = conn_->checkChannelState();
if (old_state != new_state)
service->*channelStateChangedSignal_(old_state, new_state);
//end of connection state
void* tag;
bool ok = false;
grpc::CompletionQueue::NextStatus st;
st = cq_.AsyncNext(&tag, &ok, deadlineFromMSec(100));
if ((st == grpc::CompletionQueue::SHUTDOWN) || (st == grpc::CompletionQueue::TIMEOUT))
return false;
(AbstractCallData< SERVICE >*)(tag)->cqActions(service_, ok);
return true;
}
Структура кода такая же, как в случае сервера. В отличии от сервера, в клиенте добавляется блок кода, отвечающий за обработку текущего состояния. Если состояние канала связи изменилось, вызывается сигнал channelStateChangedSignal_(). Во всех сгенерированных службах это сигнал:
void channelStateChanged(int, int);
Также, в отличии от сервера, здесь используется функция AsyncNext() вместо Next(). Это было сделано по нескольким причинам. Во-первых, при использовании AsyncNext() клиентский код имеет возможность узнать об изменении состояния канала связи. Во-вторых, при использовании AsyncNext() имеется возможность вызывать различные RPC в клиентском коде любое количество раз. Использование функции Next() в данном случае приведет к блокированию потока до получения события из очереди и, как следствие, к потере двух описанных возможностей.
После получения события из очереди, как и в случае с сервером, вызывается функция cqReaction(), определенная в классе ClientCallData:
void cqActions(RPC::Service* service, bool ok)
{
auto response = dynamic_cast<RPCCallData*>(this);
void* tag = static_cast<void*>(response);
if (!this->processEvent(tag, ok)) return;
service->*func_( response );
}
Как и в случае сервера, функция processEvent() обрабатывает тэг и флаг ok и всегда возвращает true. Как и в случае сервера, после обработки события следует вызов сигнала сгенерированной службы. Однако, здесь есть два существенных отличия от одноименной серверной функции. Первое отличие состоит в том, что в этой функции не происходит создание респондеров. Cоздание респондеров, как было показано выше, происходит при вызове RPC. Второе отличие состоит в том, что в этой функции не происходит удаление респондеров. Отсутствие удаления респондеров сделано по двум причинам. Во-первых, клиентский код может использовать указатели на сгенерированные структуры RPCCallData для своих целей. Удаление содержимого по этому указателю, скрытое от клиентского кода, может привести к неприятным последствиям. Во-вторых, удаление респондера приведет к тому, что сигнал с данными сгенерирован не будет. Следовательно, клиентский код не будет получать последнее сообщение сервера. Среди нескольких альтернатив решения обозначенных проблем, было выбрано решение переложить удаление респондера (сгенерированных структур) на клиентский код. Таким образом, функции-обработчики сигналов (слоты) должны содержать следующий код:
void ResponseHandler(RPCCallData* response)
{
if (response->CouldBeDeleted())
delete response;
//process response
}
Отсутствие удаления респондера в клиентском коде приведет не только к утечке памяти, но и возможным проблемам с каналом связи. Обработчики сигналов всех видов взаимодействия RPC реализованы в коде примеров.
Заключение
В заключении обратим внимание два момента. Первый момент связан с вызовом функций CheckCQ() клиента и сервера. Работают они, как было показано выше, по одному принципу: если в очереди есть событие, «эмитится» сигнал с соответствующей сгенерированной структурой RPCCallData. Можно вызывать эту функцию вручную и проверять (в случае клиента) наличие события. Но изначально была идея перенести всю сетевую часть, связанную с gRPC в другой поток. Именно для этих целей были написаны вспомогательные классы QGrpcSrvMonitor для сервера gRPC и QGrpcCliServer для клиента gRPC. Оба класса работают по одному принципу: создают отдельный поток, помещают в этот поток сгенерированную службу и периодически вызывают функцию CheckCQ() этой службы. Таким образом, при использовании обоих вспомогательных классов отпадает необходимость вызова функций CheckCQ() в клиентском коде. Cигналы сгенерированной службы, в этом случае, «приходят» из другого потока. Примеры клиента и сервера реализованы с использованием этих вспомогательных классов.
Второй момент касается той большей части разработчиков, кто в работе не использует библиотеку Qt. Классы и макросы Qt в QgRPC используются только в двух местах: в генерируемых файлах служб, и в файлах, содержащих вспомогательные классы: QGrpcServerMonitor.h и QGrpcClientMonitor.h. Остальные файлы с библиотекой Qt никак не связаны. Планировалось добавить сборку с использованием cmake, и сделать заглушки некоторым директивам Qt. В частности, классу QObject и макросу Q_OBJECT. Но до этого просто не дошли руки. Поэтому, любые предложения приветствуются.
На этом все. Всем спасибо!
Ссылки
Автор: JackBoned