Описание задачи
Часто у разработчиков веб приложений, а также разработчиков для мобильных платформ возникает задача отсылать нотификации о завершении некоторого события. Например: уведомить пользователя о запланированной встрече, звонке, определенной записи в расписании (todo листы) или просто создать напоминание о любом другом действии в будущем.
Если подобные приложения расположены в соц. сетях, то для непосредственной отсылки уведомлений сеть ВКонтакте предоставляет для этого метод secure.sendNotification;
Для Одноклассников: notifications.sendSimple;
А для устройств Apple и Андроид, такие уведомления могут быть отосланы через Apple Push Notification Service и Android Cloud to Device Messaging (C2DM) соответственно.
Выбор инструментов для решения
Для решения поставленной задачи были выбраны два основных средства:
Node.js (0.6.x) и Redis. Плюс express для реализации примитивного UI для показа статистики по принятым и отосланным уведомлениям и Vows для тестов.
В первом привлекла возможность легко задействовать все ядра через модуль Cluster (включен в 0.6), для Node.js версии 0.4 Cluster выполнен в виде отдельного пакета Cluster.
Redis был выбран за поддержку различных типов данных, прямо из коробки (По сравнению с тем же Memcache). А также наличию удобных команд для организации работы очередей. А еще потому, что Youporn.com is now a 100% Redis Site
Основное API
server_name/send_delayed – отложенная нотификация (принимает как GET, так и POST параметры)
- delay — интервал, через который необходимо выполнить отсылку (задается в секундах)
- url – непосредственно адрес, который необходимо запросить, когда придет время. Для соц.сетей это отправка через sendNotification или notifications.sendSimple, для отправки пушей это может быть отсылка на дополнительный сервис, занимающийся непосредственно отправкой в APNS или C2DN.
- uid – идентификатор пользователя.
- type – параметр для статситики (appointment_1, planned_call)
- recheck_url – дополнительный адрес, который необходимо запросить перед отправкой уведомления. Это может быть полезно для отслеживания актуального состояния задания. Например строительство ускорили или вообще отменили. url должен ответить “ok” или “error”, в случае положительного ответа будет осуществлена отправка уведомления. (параметр не обязательный и если он отсутствует никакой проверки не будет)
- send_than_online – дополнительный параметр, если он присутствует, то события складываются в очередь, но отправка будет осуществлена только если будет вызван запрос /user_online. Введение этого параметра связано с тем, что многие сети ограничивают кол-во посылок на одного юзера в день.
server_name/user_online – взять событие для опреденного пользователя (если пришло время) и отправить
- uid – идентификатор пользователя
Дизайн приложения
Основные структуры для организации очереди:
Очередь сообщений — SORTEDSET в котором rank будет время срабатывания события;
Очередь онлайн сообщений — SORTEDSET для каждого uid отдельная, в котором rank будет время срабатывания события;
Очередь для процессинга – LIST – при попадании в этот список сообщение будет обработано.
Мастер процесс
1. Стартует несколько воркеров равных количеству ядер на сервере;
2. Стартует REPL;
3. Запускает сканер очереди сообщений (очередь онлайн сообщений не сканируется);
4. Отслеживает падение воркеров и перезапускает их в случае падения.
5. Восстанавливает очередь сообщений из отдельных событий в случае полного падения;
6. Запускает UI сервер для отслеживания статистики.
Сканер получает пачку событий через ZREVRANGEBYSCORE из очереди сообщений в количестве равной 100 в обратном порядке от текущего времени назад. В транзакции кладет их в очередь для процессинга на обработку и удаляет из очереди сообщений.
Отдельный воркер
1. Принимает команды send_delayed или user_online. Данные события записывются в HASH в формате json, через JSON.stringify. Также было опробовано хранение событий в msgpack. Хотя по приведенным тестам и написано, что это должно быть быстрее, но для простых объектов оказалось никакого прироста скорости нет. Идентификатор события создается рандомом и ставится в очередь сообщений. На каждый ключ события ставится время жизни равное времени срабатывания + задержка из настроек.
2. Запускает сканер очереди для процессинга
Сканер получает события из LIST через BLPOP (для того чтобы брать их, только когда они есть в очереди). Если запустить команду MONITOR в ./redis-cli при запущеном сервере то лог может выглядеть примерно так:
1336484301.674405 "zrevrangebyscore" "actual_queue_test-queue" "1336484301673" "-inf" "limit" "0" "100"
1336484301.675102 "zrevrangebyscore" "actual_queue_test-queue" "1336484301674" "-inf" "limit" "0" "100"
1336484301.675387 "zrevrangebyscore" "actual_queue_test-queue" "1336484301675" "-inf" "limit" "0" "100"
// fill processing queue
Worker.prototype.fetch = function(fn) {
var self = this;
this.client.zrevrangebyscore(this.queue_key, Date.now(),
"-inf", "limit", 0, self.pack, function(err, reply) {
process.nextTick(function() {
self.fetch();
}); // re-schedule fetch
...
В данном случае мы каждый раз ставим выполнение fetch на следующий tick. Как можно видеть, если в очереди нет событий, то и смысла проверять ее на каждом тике нет.
Делает JSON.parse, проверяем ключ последней отправки если он отсутствует или (текущее время — ключ > USER_POLL_THRESHOLD — 30 мин) — инициируем событие 'event-data' при котором UrlSender отправит http запрос, предварительно проверив по recheck_url, если он есть. Если на http запрос получен 200 код то инициируем событие ‘event-sent’, стираем событие в базе, устанавливаем ключ последней отправки для юзера текущим временем.
Последовательность обработки online событий следующая: получаем очередь онлайн сообщений для конкретного пользователя, Через ZREVRANGEBYSCORE получает самое ближайшее событие. Проверяем по настройке USET_POLL_THRESHOLD_ONLINE пришло ли время отправки и кладет их в очередь для процессинга на обработку и удаляет из очереди сообщений.
Тестирование
Автоматические тесты написаны с использованием фреймворка vows. Основная идея тестов это запись последовательности событий в Redis LIST через специальный метод logger’a и потом сравнение с эталонной последовательностью событий.
logger.spec('event-processed'); //only test env
…
logger.spec('event-fetched'); //only test env
Запуск тестов:
1. Скопировать config/settings.test.js -> config/settings.js и изменить настройки на свои;
2. Запуск сервера: APP_ENV=test npm start
3. Запуск тестов: APP_ENV=test vows --spec test/delayed-request-test.js
Примеры скрипта для тестирования (ruby)
require 'uri'
require 'net/http'
api_url = "http://127.0.0.1:8000/send_delayed"
url = "http://127.0.0.1:8000/health"
recheck_url = "http://127.0.0.1:8000/recheck_url_correct"
# recheck_url = "http://127.0.0.1:8000/recheck_url_error"
uid=rand(1000)
type="test_common"
delay=1
post_url = URI.parse(api_url)
# res = Net::HTTP.post_form(post_url,{"url" => url, "delay" => delay,
# "uid" => uid, "type" => type, "recheck_url" => recheck_url, "send_than_online" => true})
res = Net::HTTP.post_form(post_url,{"url" => url, "delay" => delay,
"uid" => uid, "type" => type, "recheck_url" => recheck_url})
p res.inspect
Пример интерфейса просмотра статистики
Исходный код сервера
Написан совместно с pinkey
https://github.com/catz/actual_queue
Автор: roman_truschev