Отложенные уведомления пользователей на Node.js & Redis

в 17:19, , рубрики: node.js, redis, очереди, очередь сообщений, метки: , , ,
Описание задачи

Часто у разработчиков веб приложений, а также разработчиков для мобильных платформ возникает задача отсылать нотификации о завершении некоторого события. Например: уведомить пользователя о запланированной встрече, звонке, определенной записи в расписании (todo листы) или просто создать напоминание о любом другом действии в будущем.


Если подобные приложения расположены в соц. сетях, то для непосредственной отсылки уведомлений сеть ВКонтакте предоставляет для этого метод secure.sendNotification;
Для Одноклассников: notifications.sendSimple;
А для устройств Apple и Андроид, такие уведомления могут быть отосланы через Apple Push Notification Service и Android Cloud to Device Messaging (C2DM) соответственно.

Выбор инструментов для решения

Для решения поставленной задачи были выбраны два основных средства:
Node.js (0.6.x) и Redis. Плюс express для реализации примитивного UI для показа статистики по принятым и отосланным уведомлениям и Vows для тестов.

В первом привлекла возможность легко задействовать все ядра через модуль Cluster (включен в 0.6), для Node.js версии 0.4 Cluster выполнен в виде отдельного пакета Cluster.

Redis был выбран за поддержку различных типов данных, прямо из коробки (По сравнению с тем же Memcache). А также наличию удобных команд для организации работы очередей. А еще потому, что Youporn.com is now a 100% Redis Site

Основное API

server_name/send_delayed – отложенная нотификация (принимает как GET, так и POST параметры)

  • delay — интервал, через который необходимо выполнить отсылку (задается в секундах)
  • url – непосредственно адрес, который необходимо запросить, когда придет время. Для соц.сетей это отправка через sendNotification или notifications.sendSimple, для отправки пушей это может быть отсылка на дополнительный сервис, занимающийся непосредственно отправкой в APNS или C2DN.
  • uid – идентификатор пользователя.
  • type – параметр для статситики (appointment_1, planned_call)
  • recheck_url – дополнительный адрес, который необходимо запросить перед отправкой уведомления. Это может быть полезно для отслеживания актуального состояния задания. Например строительство ускорили или вообще отменили. url должен ответить “ok” или “error”, в случае положительного ответа будет осуществлена отправка уведомления. (параметр не обязательный и если он отсутствует никакой проверки не будет)
  • send_than_online – дополнительный параметр, если он присутствует, то события складываются в очередь, но отправка будет осуществлена только если будет вызван запрос /user_online. Введение этого параметра связано с тем, что многие сети ограничивают кол-во посылок на одного юзера в день.

server_name/user_online – взять событие для опреденного пользователя (если пришло время) и отправить

  • uid – идентификатор пользователя
Дизайн приложения

Основные структуры для организации очереди:
Очередь сообщений — SORTEDSET в котором rank будет время срабатывания события;
Очередь онлайн сообщений — SORTEDSET для каждого uid отдельная, в котором rank будет время срабатывания события;
Очередь для процессинга – LIST – при попадании в этот список сообщение будет обработано.

Мастер процесс

1. Стартует несколько воркеров равных количеству ядер на сервере;
2. Стартует REPL;
3. Запускает сканер очереди сообщений (очередь онлайн сообщений не сканируется);
4. Отслеживает падение воркеров и перезапускает их в случае падения.
5. Восстанавливает очередь сообщений из отдельных событий в случае полного падения;
6. Запускает UI сервер для отслеживания статистики.

Сканер получает пачку событий через ZREVRANGEBYSCORE из очереди сообщений в количестве равной 100 в обратном порядке от текущего времени назад. В транзакции кладет их в очередь для процессинга на обработку и удаляет из очереди сообщений.

Отдельный воркер

1. Принимает команды send_delayed или user_online. Данные события записывются в HASH в формате json, через JSON.stringify. Также было опробовано хранение событий в msgpack. Хотя по приведенным тестам и написано, что это должно быть быстрее, но для простых объектов оказалось никакого прироста скорости нет. Идентификатор события создается рандомом и ставится в очередь сообщений. На каждый ключ события ставится время жизни равное времени срабатывания + задержка из настроек.

2. Запускает сканер очереди для процессинга
Сканер получает события из LIST через BLPOP (для того чтобы брать их, только когда они есть в очереди). Если запустить команду MONITOR в ./redis-cli при запущеном сервере то лог может выглядеть примерно так:

1336484301.674405 "zrevrangebyscore" "actual_queue_test-queue" "1336484301673" "-inf" "limit" "0" "100"
1336484301.675102 "zrevrangebyscore" "actual_queue_test-queue" "1336484301674" "-inf" "limit" "0" "100"
1336484301.675387 "zrevrangebyscore" "actual_queue_test-queue" "1336484301675" "-inf" "limit" "0" "100"

// fill processing queue
Worker.prototype.fetch = function(fn) {
var self = this;
this.client.zrevrangebyscore(this.queue_key, Date.now(),
"-inf", "limit", 0, self.pack, function(err, reply) {
process.nextTick(function() {
self.fetch();
}); // re-schedule fetch
...

В данном случае мы каждый раз ставим выполнение fetch на следующий tick. Как можно видеть, если в очереди нет событий, то и смысла проверять ее на каждом тике нет.
Делает JSON.parse, проверяем ключ последней отправки если он отсутствует или (текущее время — ключ > USER_POLL_THRESHOLD — 30 мин) — инициируем событие 'event-data' при котором UrlSender отправит http запрос, предварительно проверив по recheck_url, если он есть. Если на http запрос получен 200 код то инициируем событие ‘event-sent’, стираем событие в базе, устанавливаем ключ последней отправки для юзера текущим временем.

Последовательность обработки online событий следующая: получаем очередь онлайн сообщений для конкретного пользователя, Через ZREVRANGEBYSCORE получает самое ближайшее событие. Проверяем по настройке USET_POLL_THRESHOLD_ONLINE пришло ли время отправки и кладет их в очередь для процессинга на обработку и удаляет из очереди сообщений.

Тестирование

Автоматические тесты написаны с использованием фреймворка vows. Основная идея тестов это запись последовательности событий в Redis LIST через специальный метод logger’a и потом сравнение с эталонной последовательностью событий.

logger.spec('event-processed'); //only test env

logger.spec('event-fetched'); //only test env

Запуск тестов:
1. Скопировать config/settings.test.js -> config/settings.js и изменить настройки на свои;
2. Запуск сервера: APP_ENV=test npm start
3. Запуск тестов: APP_ENV=test vows --spec test/delayed-request-test.js

Примеры скрипта для тестирования (ruby)

require 'uri'
require 'net/http'

api_url = "http://127.0.0.1:8000/send_delayed"
url = "http://127.0.0.1:8000/health"
recheck_url = "http://127.0.0.1:8000/recheck_url_correct"
# recheck_url = "http://127.0.0.1:8000/recheck_url_error"
uid=rand(1000)
type="test_common"
delay=1
post_url = URI.parse(api_url)
# res = Net::HTTP.post_form(post_url,{"url" => url, "delay" => delay,
# "uid" => uid, "type" => type, "recheck_url" => recheck_url, "send_than_online" => true})
res = Net::HTTP.post_form(post_url,{"url" => url, "delay" => delay,
"uid" => uid, "type" => type, "recheck_url" => recheck_url})
p res.inspect

Пример интерфейса просмотра статистики

Отложенные уведомления пользователей на Node.js & Redis

Исходный код сервера

Написан совместно с pinkey
https://github.com/catz/actual_queue

Автор: roman_truschev

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js