Одним вечером, после очередного расстраивающего дня, наполненного попытками наладить баланс в своей игре, я решил, что мне срочно требуется отдых. Переключусь на другой проект, быстренько его сделаю, верну на место скатившуюся за время разработки игры самоооценку и с новыми силами возьму игру штурмом! Главное выбрать проект nice and relaxing… Написать свой месседжер? Ха! How hard can it be?
Код можно посмотреть здесь.
Краткая предыстория
До начала работы над мессенджером почти год корпел над мультиплеерной онлайн Line Tower Wars игрой. Программирование шло хорошо, всё остальное (баланс и визуал в особенности) — не очень. Внезапно оказалось, что сделать игру и сделать увлекательную игру (увлекательную для кого-то помимо самого себя) — две разные вещи. После года мытарств мне нужно было отвлечься, поэтому я решил попробовать свои силы в чём-то другом. Выбор пал на мобильную разработку, а именно, Flutter. Слышал множество хороших вещей про Flutter, да и дарт после недолгих экспериментов мне понравился. Решил написать свой собственный мессенджер. Во-первых, хорошая практика по реализации и клиента, и сервера. Во-вторых, будет что-то весомое положить в портфолио для поиска работы, я как раз нахожусь в процессе.
Запланированный функционал
- Личные и групповые чаты
- Отправка текста, изображений и видео
- Аудио и видео-звонки
- Подтверждение получения и прочтения (галочки из Вотсапа)
- «Печатает...»
- Уведомления
- Поиск по QR-коду и геолокации
Забегая вперёд, могу с гордостью (и с облегчением) сказать, что почти всё запланированное было реализовано, а что ещё не реализовано — будет реализовано в ближайшее время.
Выбор языка
С выбором языка долго не думал. Сначала был соблазн использовать дарт и для клиента, и для сервера, но более детальная инспекция показала, что доступных драйверов для дарт не очень много, а те что есть не внушают особого доверия. Хотя не поручусь говорить о текущем моменте, возможно ситуация улучшилась. Так что выбор мой пал на C#, с которым я работал в Unity.
Архитектура
Начал с продумывания архитектуры. Конечно, учитывая что моим мессенджером скорее всего будут пользоваться 3 с половиной человека, можно было бы не заморачиваться с архитектурой вообще. Берёшь и делаешь как в бесчисленных туториалах. Вот нода, вот монго, вот вебсокеты. Готово. И Firebase где-то тут. Но так не интересно. Я решил делать мессенджер, способный легко горизонтально скейлиться, будто ожидаю миллионы одновременных клиентов. Однако так как опыта в этой сфере у меня не было никакого, пришлось всё познавать на практике методом ошибок и снова ошибок.
Я не утвержаю, что такая архитектура супер крута и надёжна, но она жизнеспособна и в теории должна выдерживать большие нагрузки и скейлиться горизонтально, но я не очень понимаю, как проверить. И я надеюсь, что не упустил какой-то очевидный момент, который известен всем, кроме меня.
Ниже будет подробное описание отдельных компонентов.
Frontend Server
Ещё до того как я взялся делать игру, меня увлекла концепция асинхронного однопоточного сервера. Эффективно и без потенциальных race'ов — о чем ещё можно просить. С целью разобраться, как такие сервера устроены, я стал копаться в модуле asyncio
языка python. Увиденное решение показалось мне очень изящным. Если кратко, то решение на псевдокоде выглядит так.
// Есть сокет, из которого мы ожидаем получить байты, но мы не знаем
// пришли ли они уже или ещё нет. Вместо того чтобы сразу вызывать socket.Receive
// и потенциально блокировать весь поток, делаем:
var bytesReceived = Completer<object>();
selector.Register(
socket,
SocketEvent.Receive,
() => bytesReceived.Complete(null)
);
await bytesReceived.Future;
int n = socket.Receive(...); // точно не заблокирует
// selector - это простая обертка над poll. Он периодически опрашивает
// все зарегистрированные сокеты на предмет нужного события (Receive в
// данном случае), и, когда сокет становится готов, вызывает коллбек.
// Коллбек завершает completer, что приводит к возобновлению данного метода,
// и мы можем спокойно читать данные из сокета, зная, что байты там точно есть.
// Если не все байты получены, то мы просто повторяем те же шаги.
С помощью такой нехитрой техники мы можем обслуживать большое число сокетов одним потоком. Мы никогда не блокируем поток в ожидании пока байты будут получены или отправлены. Поток всегда занят полезной работой. Concurrency, одним словом.
Frontend сервера реализованы именно так. Они все однопоточные и асинхронные. Поэтому для максимальной производительности нужно запускать столько серверов на одной машине сколько у неё имеется ядер (4 на картинке).
Frontend сервер читает сообщение от клиента и, основываясь на коде сообщения, отправляет его в один из топиков кафки.
Зачем такие сложности? Зачем делить топик на партишн? Партишн служит в качестве единицы параллелизации. Несколько потребителей (consumer) могут подписаться на один и тот же топик (образуя группу consumer'ов), и тогда кафка (по умолчанию) распределит все партишн равномерно между ними. Если, скажем, у нас топик с двумя партишн, на который подписано 2 клиента, кафка распределит каждому клиенту по одному партишн. Если партишн 3 — одному из клиентов достанется 2. Кафка умеет детектить добавление новых партишн и новых клиентов и автоматически перераспределять партишн в случае необходимости.
Frontend сервер отправляет сообщение в кафку без ключа (когда нет ключа, кафка просто отправляет сообщения в партишн по очереди). Из топика сообщение вытаскивает один из соответствующих backend серверов. Сервер обрабатывает сообщение и… что дальше? А что дальше зависит от типа сообщения.
В самом банальном случае происходит цикл запрос-ответ. Например, на запрос о регистрации нам нужно просто дать клиенту ответ (Success
, EmailAlreadyInUse
, и тп). Но на сообщение, содержащем приглашение в существующий чат новых членов (Васю, Эмиля и Юлю), нам нужно ответить сразу тремя разными типами сообщений. Первый тип — нужно уведомить приглашающего об исходе операции (вдруг произошла серверная ошибка). Второй тип — нужно уведомить всех текущих членов чата, что в чате теперь такие-то новые члены. Третий — отправить приглашения Васе, Эмилю и Юле.
Окей, звучит не очень сложно, но для того чтобы отправить сообщение какому-либо клиенту нам нужно: 1) узнать с каким frontend сервером этот клиент соединён (ведь мы не выбираем с каким конкретно сервером клиент соединятся, за нас решает балансировщик); 2) передать сообщение от backend сервера нужному frontend серверу; 3) собственно, отправить сообщение клиенту.
Для реализации пунктов 1 и 2 я решил использовать отдельный топик («frontend servers» топик). Разделение authentication, session и call топиков на партишн служит как механизм параллелизации. Видим что session сервера сильно загружены? Просто добавляем парочку новых партишн и session серверов, и кафка сделает перераспределение нагрузки за нас, разгружая имеющиеся session сервера. Разделение же «frontend servers» топика на партишн служит как механизм маршрутизации.
Каждому frontend серверу соответствует один партишн «frontend servers» топика (с таким же индексом, что и сам сервер). То есть серверу 0 — партишн 0 и тд. Кафка даёт возможность подписаться не только на определённый топик, но и на определённый партишн определённого топика. Все frontend сервера на стартапе подписываются на соответствующий партишн. Таким образом backend сервер получает возможность отправить сообщение конкретному frontend серверу, отправив сообщение в определённый партишн.
Окей, теперь, когда клиент присоединяется, нужно просто сохранять где-то пару UserId — Frontend Server Index. При дисконнекте — удалять. Для этих целей подойдёт любое из многих in-memory key-value бд. Я выбрал редис.
Как всё выглядит на практике. Первым делом после установки соединения клиент Андрей отправляет серверу сообщение Join
. Frontend сервер получает сообщение и пересылает его в топик session, предварительно добавляя заголовок «Frontend Server»: {index}. Один из backend session серверов получит сообщение, прочитает токен авторизации, определит что это за юзер присоединился, прочитает добавленный frontend сервером индекс и сделает запись UserId — Index в редис. С этого момента клиент считается онлайн, и теперь мы знаем через какой frontend сервер (и, соответственно, через какой партишн «frontend servers» топика) мы можем до него «достучаться», когда другие клиенты будут отправлять Андрею сообщения.
* На самом деле процесс чуть сложнее чем я описал. Можете ознакомиться в исходном коде.
Псевдокод frontend сервера
// Frontend Server 6
while (true) {
// Consume from "Frontend Servers" topic, partition 6
var messageToClient = consumer.Consume();
if (message != null) {
relayMessageToClient(messageToClient);
}
var callbacks = selector.Poll();
while (callbacks.TryDequeue(out callback)) {
callback();
}
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
while (!callAtQueue.IsEmpty && callAtQueue.PeekPriority() <= now) {
callAtQueue.Dequeue()();
}
while (messagesToRelayToBackendServers.TryDequeue(out messageFromClient)) {
// choose topic
producer.Produce(topic, messageFromClient);
}
}
Здесь есть несколько трюков.
1) relayMessageToClient
. Будет ошибкой просто взять нужный сокет и сразу начать отправлять в него сообщение, потому что, возможно, мы уже отправляем клиенту какое-то другое сообщение. Если мы начнем посылать байты, не проверив не занят ли сокет в данный момент, сообщения будут перемешаны. Как и во многих других местах, где требуется упорядоченная обработка данных, трюк заключается в использовании очереди, а именно, очереди из Completer'ов (TaskCompletionSource
в C#).
void async relayMessageToClient(message) {
// find client
await client.ReadyToSend();
await sendMessage(client, message);
client.CompleteSend();
}
class Client {
// ...
sendMessageQueue = new LinkedList<Completer<object>>();
async Future ReadyToSend() {
var sendMessage = Completer<object>();
if (sendMessageQueue.IsEmpty) {
sendMessageQueue.AddLast(sendMessage);
} else {
var prevSendMessage = sendMessageQueue.Last;
sendMessageQueue.AddLast(sendMessage);
await prevSendMessage.Future;
}
}
void CompleteSend() {
var sendMessage = sendMessageQueue.RemoveFirst();
sendMessage.Complete(null);
}
}
Если очередь не пуста, значит, в данный момент сокет уже занят. Создаём новый completer
, добавляем его в очередь и await
'им предыдущий completer
. Таким образом, когда предыдущее сообщение будет отправлено, CompleteSend
завершит completer
, что приведет к тому, что сервер начнёт отправлять следующее сообщение. Такая очередь так же позволяет гладко propagate исключения. Допустим, во время отправки клиенту какого-то сообщения произошла ошибка. В таком случае нам нужно завершить с исключением отправку не только этого сообщения, но и всех сообщений, которые в данный момент ожидают своего часа в очереди (висят на await
'ах). Если мы этого не сделаем, то они так и продолжат висеть, и мы получим утечку памяти. Для краткости, код, который занимается этим, здесь не приведён.
2) selector.Poll
. Собственно, даже не трюк, а просто попытка сгладить недостатки реализации метода Socket.Select
(selector
— просто обертка над этим методом). В зависимости от ОС этот метод под капотом использует либо select
, либо poll
. Но важно здесь не это. Важно то, как этот метод работает со списками, которые мы подаём ему на вход (список сокетов на чтение, на запись, на проверку ошибки). Этот метод берёт списки, опрашивает сокеты и оставляет в списках только те сокеты, которые готовы выполнить требуемую операцию. Все остальные сокеты выкидываются из списков. «Выкидывание» происходит через RemoveAt
(то есть все последуюшие элементы сдвигаются, что неэффективно). Плюс к этому, так как нам нужно опрашивать все зарегистрированные сокеты каждую итерацию цикла, такое «очищение» вообще приносит вред, приходится каждый раз заново наполнять списки. Мы можем обойти все эти проблемы, используя кастомный List
, метод RemoveAt
которого не удаляет элемент из списка, а просто помечает его как удалённый. Класс ListForPolling
и есть моя реализация такого списка. ListForPolling
работает только с методом Socket.Select
и не годится ни для чего другого.
3) callAtQueue
. В большинстве случаев frontend сервер, переслав клиентское сообщение backend серверу, ожидает ответ (подтверждение, что операция прошла успешно, или ошибка, если что-то пошло не так). Если он не дожидается ответа в течение какого-то конфигурируемого промежутка времени, он отправляет клиенту ошибку, чтобы тот не ждал ответа, который никогда не придёт. callAtQueue
— это priority queue. Сразу после того, как сервер отправляет сообщение в кафку, он делает примерно следущее:
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
callAtQueue.Enqueue(callback, now + config.WaitForReplyMSec);
В коллбэке ожидание ответа отменяется и начинается отправка серверной ошибки. Если же ответ от backend сервера получен, коллбэк ничего не делает.
Использовать await Task.WhenAny(answerReceivedTask, Task.Delay(x))
нет возможности, так как код после Task.Delay
выполняется на потоке из пула.
Вот, собственно, всё, что касается frontend серверов. Здесь требуется небольшая поправка. На самом деле, сервер не полностью однопоточный. Конечно, кафка под капотом использует потоки, но я имею в виду код приложения. Дело в том, что отправка сообщения в топик кафки (produce) может и не преуспеть. Кафка в случае провала сам повторяет отправку определённое конфигурируемое количество раз, но, если и повторные отправления проваливаются, кафка бросает это дело как безнадёжное. Проверить, было ли сообщение успешно отправлено или нет, можно в deliveryHandler
, который мы передаём в метод Produce
. Кафка вызывает этот хэндлер в I/O потоке producer'а (поток, который занимается отправкой сообщений). Мы должны удостовериться, что сообщение отправлено успешно, и, если нет, отменить ожидание ответа от backend сервера (ответ не придёт, потому что запрос не был отправлен) и отправить клиенту ошибку. То есть нам никак не избежать взаимодействия с другим потоком.
* При написании статьи я вдруг осознал, что мы можем не передавать deliveryHandler
в метод Produce
или просто игнорировать все ошибки кафки (клиенту всё равно будет отправлена ошибка по таймауту, который я описал ранее) — тогда весь наш код будет однопоточным. Теперь думаю, как лучше сделать.
basicGet
механизм, который делает именно то, что мне нужно, мне было невдомёк в то время. Поэтому я перешёл на кафку. Если бы я знал про basicGet
, скорее всего остался бы на рэббите, но о переходе на кафку не жалею. Кафка легче кластеризуется и в теории обладает большей пропускной способностью.
Backend Server
По сравнению с frontend сервером, интересных моментов здесь практически нет. Все backend сервера работают одинаково. На стартапе сервер подписывается на топик (authentication, session или call в зависимости от роли), и кафка назначает ему один или более партишн. Сервер получает сообщение из кафки, обрабатывает и обычно посылает в ответ одно или более сообщений. Почти реальный код:
void Run() {
long lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
while (true) {
var consumeResult = consumer.Consume(
TimeSpan.FromMilliseconds(config.Consumer.PollTimeoutMSec)
);
if (consumeResult != null) {
var workUnit = new WorkUnit() {
ConsumeResult = consumeResult,
};
LinkedList<WorkUnit> workUnits;
if (partitionToWorkUnits.ContainsKey(consumeResult.Partition)) {
workUnits = partitionToWorkUnits[consumeResult.Partition];
} else {
workUnits = partitionToWorkUnits[consumeResult.Partition] =
new LinkedList<WorkUnit>();
}
workUnits.AddLast(workUnit);
handleWorkUnit(workUnit);
}
if (
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - lastCommitTime >=
config.Consumer.CommitIntervalMSec
) {
commitOffsets();
lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
}
}
TopicPartitionOffset
. Когда мы читаем (consume) сообщение из кафки, мы получаем ConsumeResult
, который, помимо самого сообщения, также содержит TopicPartitionOffset
. Зачем нам нужна эта информация?
Кафка гарантирует at least once delivery, что означает, что сообщения не будут потеряны и будут доставлены в большинстве случаев один раз (иногда возможна повторная доставка сообщения). Это достигается за счёт того, что кафка для каждого партишн каждого топика хранит последний подтвержённый (commited) оффсет. Скажем, один consumer вытащил из назначенного ему партишн сообщение с оффсетом 16, обработал его, закоммитил 16й оффсет, вытащил следующее сообщение, но во время обработки вдруг умер, не сделав коммит. Кафка назначит его партишн какому-то другому consumer'у из той же группы consumer'ов и начнёт доставлять ему сообщения из данного партишн, начиная с оффсета 16 + 1 (последний подтверждённый оффсет + 1). Таким образом сообщение 17 не будет потеряно. Кафка может либо коммитить оффсеты автоматически каждые N миллисекунд, либо полностью передать контроль над коммитами пользователю.
Я отключил авто-коммит и занимаюсь коммитами самостоятельно. Это необходимо так как handleWorkUnit
, где собственно и осуществляется обработка сообщения, — это async void
метод, поэтому нет никаких гарантий, что сообщение 5 будет обработано раньше сообщения 6. Кафка хранит только один commited оффсет (а не набор оффсетов), соответственно, перед тем как коммитить оффсет 6, нам нужно убедиться, что все предыдущие сообщения тоже были обработаны. Помимо этого, один backend сервер может потреблять сообщения из нескольких партишн одновременно, и, значит, должен следить за тем чтобы коммитить правильный оффсет в соответствующий партишн. Для этого мы используем hash map вида partition: work units. Вот как выглядит код commitOffsets
(настоящий код на этот раз):
private void commitOffsets() {
foreach (LinkedList<WorkUnit> workUnits in partitionToWorkUnits.Values) {
WorkUnit lastFinishedWorkUnit = null;
LinkedListNode<WorkUnit> workUnit;
while ((workUnit = workUnits.First) != null && workUnit.Value.IsFinished) {
lastFinishedWorkUnit = workUnit.Value;
workUnits.RemoveFirst();
}
if (lastFinishedWorkUnit != null) {
offsets.Add(lastFinishedWorkUnit.ConsumeResult.TopicPartitionOffset);
}
}
if (offsets.Count > 0) {
consumer.Commit(offsets);
foreach (var offset in offsets) {
logger.Debug(
"{Identifier}: Commited offset {TopicPartitionOffset}",
identifier,
offset
);
}
offsets.Clear();
}
}
Как видно, мы итерируем по ворк юнитам, находим последний завершённый к данному моменту юнит, после которого нет незавершённых, и коммитим соответствующий ему оффсет. Такой цикл позволяет нам избежать «дырявых» коммитов. Например, если у нас в данный момент 4 ворк юнита (0: Finished, 1: Not Finished, 2: Finished, 3: Finished
), мы можем закоммитить только 0й юнит, так как, если закоммитим сразу 3й, это может привести к потенциальной потере 1го, если вдруг сервер умрёт прямо сейчас.
class WorkUnit {
public ConsumeResult<Null, byte[]> ConsumeResult { get; set; }
private int finished = 0;
public bool IsFinished => finished == 1;
public void Finish() {
Interlocked.Increment(ref finished);
}
}
handleWorkUnit
, как было сказано, async void
метод, и он, соответственно, полностью обёрнут в try-catch-finally
. В try
он вызывает нужный сервис, а в finally
— workUnit.Finish()
.
Сервисы довольно тривиальны. Вот, например, какой код выполняется, когда юзер отправляет новое сообщение:
private async Task<ServiceResult> createShareItem(CreateShareItemMessage msg) {
byte[] message;
byte[] messageToPals1 = null;
int?[] partitions1 = null;
// Вытаскиваем UserId из токена.
long? userId = hashService.ValidateSessionIdentifier(msg.SessionIdentifier);
if (userId != null) {
var shareItem = new ShareItemModel(
requestIdentifier: msg.RequestIdentifier,
roomIdentifier: msg.RoomIdentifier,
creatorId: userId,
timeOfCreation: null,
type: msg.ShareItemType,
content: msg.Content
);
// Создаём новое сообщение или возвращаем null,
// если такой комнаты не существует.
long? timeOfCreation = await storageService.CreateShareItem(shareItem);
if (timeOfCreation != null) {
// Ищем всех членов комнаты в кэше.
List<long> pals = await inMemoryStorageService.GetRoomPals(
msg.RoomIdentifier
);
if (pals == null) {
// Если нет в кэше - вытаскиваем из бд и сохраняем в кэш.
pals = await storageService.GetRoomPals(msg.RoomIdentifier);
await inMemoryStorageService.SaveRoomPals(msg.RoomIdentifier, pals);
}
// Хотим отправить сообщение всем, кроме отправителя.
pals.Remove(userId.Value);
if (pals.Count > 0) {
// Создаём ack, чтобы отслеживать, кто не получил и
// кто не прочитал сообщение.
await storageService.CreateAck(
msg.RequestIdentifier, userId.Value, msg.RoomIdentifier,
timeOfCreation.Value, pals
);
// in - список UserId, out - список индексов frontend серверов,
// к которым юзеры подключены. Если какой-то юзер офлайн -
// индекс будет null.
partitions1 = await inMemoryStorageService.GetUserPartitions(pals);
List<long> onlinePals = getOnlinePals(pals, partitions1);
// Если никого нет онлайн, то и слать сообщение никому не нужно.
// Офлайн юзеры получат сообщение при следующем заходе в приложение.
if (onlinePals.Count > 0) {
messageToPals1 = converterService.EncodeNewShareItemMessage(
userId.Value, timeOfCreation.Value, onlinePals, shareItem
);
nullRepeatedPartitions(partitions1);
// Какие-то юзеры могут быть подключены к одному и тому же
// frontend серверу, поэтому здесь мы null'им дупликаты.
}
}
message = converterService.EncodeSuccessfulShareItemCreationMessage(
msg.RequestIdentifier, timeOfCreation.Value
);
} else {
message = converterService.EncodeMessage(
MessageCode.RoomNotFound, msg.RequestIdentifier
);
}
} else {
message = converterService.EncodeMessage(
MessageCode.UserNotFound, msg.RequestIdentifier
);
}
return new ServiceResult(
message: message, // Это сообщение уйдёт отправителю.
messageToPals1: messageToPals1, // Это - всем остальным членам комнаты.
partitions1: partitions1
);
}
База данных
Большая часть функционала сервисов, вызываемых backend серверами, — это просто добавление новых данных в бд и обработка уже имеющихся. Очевидно, как база данных устроена и как мы ей оперируем играет очень важное значение для мессенджера, и тут мне бы хотелось сказать, что я подошёл к вопросу выбора бд очень тщательно после внимательного изучения всех вариантов, но это не так. Я просто выбрал CockroachDb, потому что он обещает много при минимуме усилий и имеет совместимый с postgres синтаксис (я работал с постгрес раньше). Были мысли использовать Кассандру, но в конце концов решил остановиться на чём-то знакомом. Я никогда раньше не работал ни с кафкой, ни с рэббитом, ни с Flutter и дарт, ни с WebRtc, поэтому решил не тащить ещё и Кассандру, так как боялся утонуть во всём множестве новых для меня технологий.
Из всех частей моего проекта дизайн базы данных — вещь, в которой я сомневаюсь больше всего. Я не уверен, что решения, которые я принял, действительно, хорошие решения. Всё работает, но можно было сделать лучше. Например, есть таблицы ShareRooms (так я называю чаты) и ShareItems (так я называю сообщения). Так вот все юзеры, входящие в какую-то комнату, записаны в jsonb поле этой комнаты. Это удобно, но явно очень медленно, так что скорее всего переделаю на использование внешних ключей. Или, например, таблица ShareItems хранит все сообщения. Что тоже удобно, но так как ShareItems является одной из самых нагруженных таблиц (постоянные select
и insert
), возможно стоит создавать новую таблицу для каждой комнаты или что-то в этом роде. Кокроач раскидывает записи по разным нодам, соответственно, нужно тщательно продумывать куда какая запись пойдёт, чтобы добиться максимальной производительности, а я этого не делал. В общем, как можно понять из всего вышесказанного, базы данных не самое моё сильное место. Прямо сейчас я вообще тестирую всё на постгрес, а не кокроач, потому что так меньше нагрузки на мою рабочую машину, она и так бедная от нагрузок скоро взлетит. Благо код для постгрес и кокроач разнится совсем немного, так что переключаться не составляет труда.
Сейчас я нахожусь в процессе изучения, как, собственно, кокроач работает (как происходит mapping между SQL и key-value (кокроач использует RocksDb под капотом), как он распределяет данные между нодами, реплицирует и тд). Стоило, конечно, изучить кокроач перед тем как использовать его, но лучше поздно чем никогда.
Я думаю, что база претерпит большие изменения, когда я стану лучше разбираться в этом вопросе. Прямо сейчас мне не даёт покоя таблица Acks. В этой таблице я храню данные о том, кто ещё не получил и кто ещё не прочитал сообщение (чтобы показывать юзеру галочки). Легко уведомить юзера, что его сообщение прочитано, если юзер сейчас онлайн, но если нет, нам нужно сохранять эту информацию, чтобы уведомить юзера позже. И так как доступны групповые чаты, недостаточно просто хранить флаг, нужны данные про отдельных юзеров. Так вот здесь прямо просится использование битовых строк (одна строка на ещё не получивших юзеров, вторая — на ещё не прочитавших). Тем более кокроач поддерживат bit
и bit varying
. Однако я так и не придумал, как это дело реализовать, учитывая, что состав комнат может постоянно меняться. Чтобы битовые строки сохраняли свой смысл, юзеры в комнате должны оставаться в том же порядке, что довольно затрудительно сделать, когда, например, какой-то юзер покидает комнату. Здесь есть варианты. Возможно стоит записывать -1 вместо того чтобы удалять юзера из jsonb поля, чтобы сохранялся порядок, или использовать какой-то способ версионирования, чтобы мы знали, что вот эта вот битовая строка ссылается на порядок юзеров, который был тогда-то тогда-то, а не на нынешний порядок юзеров. Я всё ещё в процессе продумывания, как это дело лучше реализовать, а пока ещё не получившие и ещё не прочитавшие юзеры — это тоже просто jsonb поля. Учитывая, что запись в таблицу Acks делается при каждом сообщении, объём данных получается большим. Хотя запись, конечно, удаляется, когда сообщение получено и прочитано всеми.
Flutter
Долгое время я работал над серверной частью и использовал простые консольные клиенты для теста, так что даже не создавал Flutter проект. А когда создал, думал, что серверная часть была сложной частью, а приложение это так, фигня, за пару дней разберусь. Пока работал над сервером, пару раз создавал Hello World'ы на флаттер, чтобы прочувствовать фреймворк, и, так как мессенджеру не требуется какой-то замысловатый UI, думал, что полностью готов. Так вот UI, действительно, фигня, но реализация функционала доставила мне проблем (и ещё доставит, так как не всё готово).
State management
Самая популярная тема. Есть тысяча способов управлять состоянием, и рекомендуемый подход меняется раз в полгода. Сейчас мэйнстримом является provider. Лично я для себя выбрал 2 способа: bloc и redux. Bloc (Business Logic Component) для управления локальным состоянием и redux для управления глобальным.
Bloc — это не какая-то библиотека (хотя, конечно, есть и библиотека, уменьшающая бойлерплейт, но я ей не пользуюсь). Bloc — это подход, основанный на стримах. Вообще дарт довольно приятный язык, а стримы так вообще конфетка. Суть этого подхода заключается в том, что мы распихиваем всю бизнес-логику по сервисам, а коммуникацию между UI и сервисами осуществляем посредством контроллёра, который предоставляет нам различные стримы. Пользователь нажал кнопку «найти контакт»? Используя sink
(другой конец стрима) отправляем в контроллёр событие SearchContactsEvent
, контроллёр вызовет нужный сервис, дождётся результата и вернёт список юзеров обратно UI тоже посредством стрима. UI ждёт результаты, используя StreamBuilder
(виджет, который ребилдится каждый раз когда в стрим, на который он подписан, поступают новые данные). Вот, собственно, и всё. В некоторых случаях нам нужно обновлять UI безо всякого участия юзера (например, когда пришло новое сообщение), но это тоже легко делается посредством стримов. Фактически, простой MVC со стримами, никакой магии.
По сравнению с некоторыми другими подходами, bloc требует больше бойлерплейта, но, по моему мнению, лучше использовать родные решения без участия сторонних библиотек, если только использование стороннего решения не даёт какие-то существенные преимущества. Чем больше сверху абстракций, тем сложнее понять, в чём ошибка, когда ошибка случается. Преимущества провайдера я для себя не считаю достаточно существенными, чтобы переходить на него. Но опыта у меня в этой сфере немного, так что вполне вероятно я сменю лагерь в будущем.
Ну а про redux и так все всё знают, так что и говорить нечего. Тем более я его вырезал из приложения :) Использовал для управление аккаунтом, но затем, поняв что в данном случае преимуществ над блоком особых нет, вырезал, чтобы не тащить лишнее. Но в целом считаю redux полезной вещью для управления глобальным состоянием.
Самая мучительная часть
Что делать, если юзер отправил сообщение, но перед тем, как оно было послано, пропало интернет соединение? Что делать, если юзеру пришло подтверждение прочтения, но он закрыл приложение перед тем, как соответствующая запись в базе данных была обновлена? Что делать, если юзер пригласил в комнату своего друга, но перед тем как приглашение было отправлено, у него умерла батарея? Вы когда-нибудь задавались подобными вопросами? Вот и я нет. Раньше. А вот в процессе разработки стал задаваться. Так как соединение может в любой момент пропасть, а телефон в любой момент выключиться, необходимо подтверждать всё. Not fun. Поэтому самое первое сообщение, которое клиент отправляет серверу (Join
, если помните) — это не просто «Hello I am online», это «Hello I am online and here are unconfirmed rooms, here are unconfirmed messages, here are unconfirmed acks, here are unconfirmed room membership operations, and here are last received messages per room». И сервер отвечает похожей простынёй: «Пока ты был офлайн такие-то твои сообщения были прочитаны такими-то юзерами, а ещё в эту комнату пригласили Петю, а ещё из этой комнаты ушла Света, а ещё тебя пригласили в эту комнату, а вот в этих двух комнатах по 40 новых сообщений». Я бы очень хотел знать, как подобные вещи делаются в других мессенджерах, потому что моя реализация не блещет изяществом.
Изображения
В данный момент можно отправлять текст, текст + изображения и просто изображения. Отправка видео ещё не реализована. Изображения немного ужимаются и сохраняются в Firebase storage. В самом сообщении передаются ссылки. По получении сообщения клиент скачивает изображения, генерирует миниатюры и сохраняет всё на файловую систему. В базу записываются пути к файлам. Кстати, генерация миниатюр — единственный код, выполняемый на отдельном треде, так как это compute-heavy операция. Я просто запускаю один воркер-поток, скармливаю ему изображение и в ответ получаю миниатюру. Код предельно прост, так как дарт даёт удобные абстракции для работы с потоками.
class ThumbnailGeneratorService {
SendPort _sendPort;
final Queue<Completer<Uint8List>> _completerQueue =
Queue<Completer<Uint8List>>();
ThumbnailGeneratorService() {
var receivePort = ReceivePort();
Isolate.spawn(startWorker, receivePort.sendPort);
receivePort.listen((data) {
if (data is SendPort) {
_sendPort = data;
} else {
var completer = _completerQueue.removeFirst();
completer.complete(data);
}
});
}
static void startWorker(SendPort sendPort) async {
var receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
receivePort.listen((imageBytes) {
Image image = decodeImage(imageBytes);
Image thumbnail = copyResize(image, width: min(image.width, 200));
sendPort.send(Uint8List.fromList(encodePng(thumbnail)));
});
}
Future<Uint8List> generate(Uint8List imageBytes) {
var completer = Completer<Uint8List>();
_completerQueue.add(completer);
_sendPort.send(imageBytes);
return completer.future;
}
}
Также используется Firebase auth, но только для авторизации доступа к Firebase storage (чтобы юзер не мог, скажем, залить профильную картинку кому-то другому). Вся остальная авторизация осуществляется через мои серверы.
Формат сообщений
Здесь вы наверное ужаснётесь, так как я использую обычные массивы байтов. Json отпадает, потому что требуется эффективность, а про protobuf я не знал, когда начинал. Использование массивов требует большой аккуратности, потому что один неправильный индекс и всё пойдёт наперекосяк.
Первые 4 байта — длина сообщения.
Следующий байт — код сообщения.
Следующие 16 байт — идентификатор запроса (uuid).
Следующие 40 байт — токен авторизации.
Остальная часть сообщения.
Длина сообщения требуется, так как я не использую http или вебсокеты, или какой-то другой протокол, который обеспечивает разделение одного сообщения от другого. Мои frontend сервера видят только потоки байтов, и они должны знать, где одно сообщение заканчивается, и начинается другое. Есть несколько способов разделять сообщения (например, использовать какой-то никогда не встречающийся в сообщениях символ в качестве разделителя), но я предпочёл указывать длину, так как этот способ самый простой, хоть он и влечёт за собой оверхед, так как большинству сообщений хватает и одного байта для указания длины.
Код сообщения это просто один из членов enum'а MessageCode
. Routing осуществляется по коду, и, так как мы можем вытащить код из массива без предварительной десериализации, frontend сервер сам решает в какой топик кафки отправить сообщение вместо того чтобы делегировать эту обязанность кому-то другому.
Идентификатор запроса присутствует в большинстве сообщений, но не во всех. Он выполняет 2 функции: по этому идентификатору клиент устанавливает соответствие между отправленным запросом и полученным ответом (если клиент отправил сообщения А, Б, В в таком порядке, это не означает, что ответы тоже придут по порядку). Вторая функция — избежание дупликатов. Как было сказано ранее, кафка гарантирует at least once delivery. То есть в редких случаях сообщения всё-таки могут быть продублированы. Добавив в нужную таблицу базы данных колонку RequestIdentifier с unique ограничением, мы можем избежать вставки дупликата.
Токен авторизации — это UserId (8 байт) + 32 байта HmacSha256 подпись. Не думаю, что здесь стоит использовать Jwt. Jwt это примерно в 7-8 раз больший размер ради получения чего? У меня юзеры не имеют никаких claims, поэтому простая подпись hmac'ом годится. Авторизации через другие сервисы нет и не планируется.
Аудио и видео звонки
Забавно, что реализацию аудио и видео звонков я сознательно откладывал, так как был уверен, что проблем не оберусь, а на деле это оказалось одной из самых легких в реализации фич. По крайней мере базовый функционал. Вообще просто добавление WebRtc в приложение и получение первого сеанса видеосвязи заняло всего несколько часов, и, о чудо, первый же тест увенчался успехом. До этого я думал, что работающий с первого раза код — это миф. Обычно первый тест новой фичи всегда проваливается из-за какой-нибудь тупой ошибки вроде «добавил сервис, но не зарегистрировал его в DI-контейнере».
Сам сеанс связи в большинстве случаев происходит без участия сервера (peer-to-peer), но для того чтобы установить связь, требуются 3 разных сервера (серверы не обязательно должны быть физически разными, подразумеваются 3 разные роли. Один и тот же сервер в теории может выполнять все 3 функции).
Первый и самый простой — stun сервер. Мы отправляем stun серверу сообщение, и его задача — прочитать Source IP и Source Port пакета и отправить эту информацию обратно, но уже в теле пакета. Для чего это требуется? Прямо сейчас вы скорее всего сидите за каким-то роутером. У роутеров есть внутренний и внешний IP адреса. Когда вы отправляете пакет на какой-то сайт, роутер, получив от вас пакет, заменяет его Source IP и Source Port на свой внешний IP и какой-то сгенерированный порт и делает запись в таблицу NAT вида [ Source IP | Source Port | Router External IP | Router Port ]. Когда роутер получает пакет откуда-то снаружи, он сравнивает Dest IP и Dest Port полученного пакета с колонками Router External IP и Router Port таблицы NAT, и, либо находит соответствующие Source IP — Source Port и пересылает пакет нужному устройству, либо отбрасывает пакет. Важно тут то, что, чтобы пакет попал к вам на устройство, он сначала должен пройти через роутер, а, чтобы пройти через роутер, должна быть соответствующая запись в NAT таблице. Уже сам простой факт отправки сообщения stun серверу генерирует запись в NAT таблице. А в ответ от stun сервера мы получаем пару Router External IP — Router Port. Эта пара — публичный адрес нашего устройства. Отправляя пакеты на данный адрес, устройства «извне» смогут пройти через NAT (NAT traversal) благодаря тому, что нужная запись в таблицу NAT была сделана, когда мы отправили запрос stun серверу.
* Некоторые NAT сложнее, и обойти их не так просто. Собственно, если бы всё было просто, то WebRtc бы и не требовался.
Второй сервер — turn. Это сервер, через который происходит передача потоков между клиентами, когда реальный peer-to-peer невозможен. Fallback сервер. Намного сложнее в реализации, и, в теории, не обязателен, но крайне желателен, потому что peer-to-peer соединение возможно далеко не всегда. Есть свободная реализация turn сервера — coturn, но я его ещё не поднимал.
Третий сервер — сигнальный. Без него тоже в теории можно обойтись, но на практике нет. Этот сервер может быть реализован как угодно, нет никакого специального протокола. Его функция — просто передавать разную конфигурирующую информацию от одного устройства другому и обратно. Эта конфигурирующая информация нужна для установки соединения. Без него возможно обойтись, потому что информацию вы можете передать из уст в уста, например, или используя другой мессенджер :) В передаваемых данных нет ничего особенного — числа и строки.
В WebRtc есть 3 типа сигнальных сообщений: offer, answer и candidate. Инициатор звонка отправляет offer другой стороне через сигнальный сервер, получает в ответ answer, и обе стороны отправляют друг другу кандидатов. Кандидатов может быть много, и, по сути, это такой процесс переговоров, где стороны решают какую транспортную конфигурацию использовать. Возможных транспортных конфигураций (маршрутов от одного устройства к другому) может быть несколько, выбирается наилучший.
Сама по себе технология WebRtc устанавливает соединение и занимается передачей потоков туда-обратно, но это не фреймворк для создания полноценных звонков. Под звонком я подразумеваю сеанс связи с возможностью отменить, отклонить и принять вызов, а также положить трубку. Плюс нужно дать знать звонящему, если другая сторона уже занята. А также реализовать мелочи вроде «ждать ответа на вызов N секунд, затем сбросить». Если просто внедрить WebRtc в приложение в голом виде, то при входящем звонке камера и видео будут спонтанно включаться, что, конечно, неприемлемо.
В чистом виде WebRtc обычно подразумевает как можно более скорую отправку кандидатов другой стороне, чтобы переговоры начались как можно быстрее, что логично. В моих тестах кандидаты принимающей стороне вообще всегда начинали приходить ещё даже до того как придёт offer. Такие «ранние» кандидаты нельзя отбрасывать, их нужно запоминать, чтобы потом, когда придёт оффер, и RTCPeerConnection
будет создан, добавить их в соединение. Тот факт, что кандидаты могут начать приходить ещё до оффера, а также некоторые другие причины, делают реализацию полноценных звонков нетривиальной задачей. Что делать, если нам звонят сразу несколько юзеров? Нам будут приходить кандидаты от всех, и, хотя мы можем отделить кандидатов одного юзера от другого, становится неясно каких кандидатов отбрасывать, потому что мы не знаем чей оффер придёт раньше. Также будут проблемы, если нам начинают приходить кандидаты и затем оффер в момент, когда мы сами кому-то звоним.
Потестировав несколько вариантов с «голым» WebRtc, я пришёл к выводу, что в таком виде будет проблематично и чревато утечками памяти пытаться реализовать звонки, поэтому я решил добавить ещё одну стадию в процесс переговоров WebRtc. Я называю эту стадию Inquire - Grant/Refuse
.
Идея очень проста, но у меня заняло довольно много времени, чтобы до неё дойти. Звонящий ещё до создания стрима и RTCPeerConnection
(и вообще до выполнения любого кода, связанного с WebRtc) отправляет через сигнальный сервер другой стороне сообщение Inquire
. На принимающей стороне проверяется не находится ли юзер в каком-то другом сеансе связи в данный момент (простое bool
поле). Если находится, то обратно посылается сообщение Refuse
, и таким образом мы даем знать звонящему, что юзер занят, а принимающему — что ему звонил такой-то такой-то, пока он был занят другим разговором. Если же юзер в данный момент свободен, то он резервируется. В сообщении Inquire
отправляется идентификатор сессии, и данный идентификатор устанавливается как идентификатор текущей сессии. Если юзер зарезервирован, он откланяет все Inquire/Offer/Candidate
сообщения с идентификаторами сессий, отличных от текущего. После резервации принимающий отправляет через сигнальный сервер звонящему сообщение Grant
. Стоит сказать, что принимающему юзеру этот процесс не виден, так как никакого звонка ещё нет. И главное здесь не забыть на принимающей стороне повесить тайм-аут. Вдруг мы зарезервируем сессию, а никакого оффера не последует.
Звонящий получает Grant
, и вот здесь начинается WebRtc с офферами, кандидатами и вот этим вот всем. Оффер улетает принимающему, и тот, при получении, отображает экран с кнопками Ответить/Отклонить. Но кандидаты, как обычно, никого не ждут. Они снова начинают приходить даже раньше оффера, потому что нет никаких причин ждать, пока юзер ответит на звонок. Он может и не ответить, а отклонить или дождаться пока истечёт тайм-аут — тогда кандидаты просто будут выброшены.
Текущее состояние и дальнейшие планы
- Личные и групповые чаты
- Отправка текста, изображений и видео
- Аудио и видео-звонки
- Подтверждение получения и прочтения
- «Печатает...»
- Уведомления
- Поиск по QR-коду и геолокации
Поиск по QR-коду, неожиданно, довольно проблематично реализовать, потому что почти все плагины для скана кода, которые я пробовал, отказываются заводиться либо работают некорректно. Но, думаю, здесь проблемы будут решены. А за реализацию поиска по геолокации я пока ещё не брался. В теории особых проблем быть не должно.
Уведомления в процессе реализации, как и отправка видео.
Что ещё нужно сделать?
Ох, многое.
Во-первых, нет тестов. Раньше тесты писали коллеги, так что я совсем расслабился.
Во-вторых, пригласить юзеров в существующий чат и покинуть чат в данный момент невозможно. Серверный код для этого готов, клиентский — нет.
В-третьих, если с обработкой ошибок на сервере всё более-менее, то на клиенте никакой обработки ошибок нет. Недостаточно просто сделать запись в лог, нужно повторять попытки операции. Сейчас, например, механизм повторной отправки сообщений не реализован.
В-четвёртых, сервер не пингует клиента, поэтому дисконнект не детектится, если, например, у клиента пропал интернет. Дисконнект детектится только когда клиент закрывает приложение.
В-пятых, индексы в бд не используются.
В-шестых, оптимизация. В коде огромное число мест, где написано что-то вроде // @@TODO: Pool
. Большинство массивов просто new
'ятся. Backend сервер создаёт множество массивов фиксированной длины, так что тут можно и нужно использовать пул.
В-седьмых, на клиенте много мест, где код await
'ится, хотя в этом нет необходимости. Отправка изображений, например, поэтому кажется медленной, потому что код await
'ит сохранение картинок на файловую систему и генерацию миниатюр перед тем как отобразить сообщение, хотя ничего этого и не нужно делать. Или, например, если вы открываете приложение и за время вашего отсутствия вам посылали изображения, то стартап будет медленным, потому что опять же все эти изображения скачиваются, сохраняются на систему, миниатюры генерируются, и только после этого стартап завершается и вас перекидывает со splash скрина на home скрин. Все эти redundant await
'ы были сделаны для более простого дебага, но, конечно, перед релизом от ненужного ожидания нужно избавиться.
В-восьмых, UI сейчас готов на половину, потому что я пока не решил каким хочу его видеть. Поэтому сейчас всё неинтуитивно, половина кнопок неясно, что делают. И кнопки часто не нажимаются с первого раза, так как сейчас это просто иконки с GestureDetector
и без паддинга, поэтому не всегда получается в них попадать. Плюс в некоторых местах pixel overflow не исправлен.
В-девятых, сейчас даже невозможно Sign-in в аккаунт, только Sign Up. Поэтому если удалить приложение и установить заново, зайти в аккаунт не получится :)
В-десятых, код подтвержения не отправляется на почту. Сейчас код вообще всегда одинаковый, опять же потому что так проще дебажить.
В-одиннадцатых, single-responsibility принцип нарушается во многих местах. Нужен рефактор. Классы, отвечающие за взаимодействие с бд (что на клиенте, что на сервере), вообще очень сильно раздуты, потому как занимаются всеми бд операциями.
В-двенадцатых, frontend сервер сейчас всегда ожидает ответ от backend сервера, даже если сообщение не подразумевает отправки ответа (например, сообщение с кодом IsTyping
и некоторые WebRtc-related сообщения). Поэтому не дождавшись ответа, он пишет в консоль ошибку, хоть это и не является ошибкой.
В-тринадцатых, полные изображения не открываются по тапу.
В-сто миллион пятых, некоторые сообщения, которые нужно отправлять пачкой, отправляются отдельно. То же касается и некоторых бд операций. Вместо того чтобы выполнить одну команду, команды выполняются в цикле с await
(брр..).
В-сто миллион шестых, некоторые значения захардкожены, вместо того чтобы быть конфигурируемыми.
В-сто миллион седьмых, логирование на сервере сейчас только в консоль, а на клиенте вообще прямо в виджет. На главном экране есть таб Logs, куда скидываются все логи on tap. Дело в том, что моя рабочая машина отказывается запускать одновременно и эмулятор, и всё необходимое для сервера (кафка, бд, редис и все сервера). Дебажить с подлючённым устройством тоже не выходило, всё просто намертво зависало в половине случаев, потому что компьютер не справлялся с нагрузками. Поэтому приходится каждый раз делать билд, скидывать его на устройство, устанавливать и тестировать вот так. Чтобы видеть логи, скидываю их прямо в виджет. Извращение, знаю, но тут уж выбирать не приходится. По этой же причине многие методы возвращают Future
и await
'ятся (чтобы поймать исключение и скинуть в виджет), хотя и не должны. Если будете смотреть код, то увидите уродливый _logError
метод во многих классах, который этим и занимается. Это всё, конечно, тоже отправится в мусорку.
В-сто миллион восьмых, нет звуков.
В-сто миллион девятых, нужно больше использовать кэширование.
В-сто миллион десятых, много повторяющегося кода. Например, многие action'ы первым делом проверяют валидность токена, и, если он не валиден, отправляют ошибку. Думаю нужно реализовать простенький middleware-pipeline.
И много всего по-мелочи, вроде конкатенации строк вместо использования StringBuilder
'а, Dispose
не везде вызывается, где должен, и тд и тп. В общем, обычное состояние проекта в процессе разработки. Всё вышеперечисленное решаемо, но есть одна фундаментальная проблема, о которой я вообще не думал до последнего момента, потому что вылетело из головы — мессенджер должен работать даже когда приложение не открыто, а мой — не работает. Если честно, решение этой задачи пока не приходит мне в голову. Тут, видимо, не обойтись без нативного кода.
Я бы оценил готовность проекта в 70%.
Итоги
Полгода прошло с момента начала работы над проектом. Совмещал с парт-тайм работой и делал большие перерывы, но всё равно сил и времени ушло прилично. Планирую реализовать все заявленные фичи + добавить что-нибудь необычное вроде крестиков-ноликов или шашек прямо в комнате. Безо всякой причины, просто потому что интересно.
Если есть какие-то вопросы, пишите. Почта есть на гитхаб.
Автор: KoreanGuy