Не так давно вышла статья, в которой автор описывал свой framework для написания приложений с использованием Ruby, Sinatra и websoсket. Но в том решении не был затронут вопрос горизонтального масштабирования. Так при подключении к одному из узлов, пользователи могут получать уведомления/данные только о событиях/изменениях, вызванных пользователями этого же узла, а при изменениях, внесенных через другой, они не узнают. Для решения данной задачи необходимо организовать общую шину данных. Рассматривать данную задачу буду в контексте обмена сообщениями клиент-клиент.
Шина данных
Требования, которые будем предъявлять к шине следующие:
- простота работа;
- передача в «реальном времени»;
- производительность.
Организовать шину можно через хранилище с периодическим опросом, либо через сервер очередей.
Первый вариант не удовлетворяет второму условию, т.к. задержка в передаче будет ровна периоду опроса хранилища. Уменьшение периода приведет к росту нагрузки на него. Поэтому этот вариант отметаем сразу.
Второй вариант подходит лучше всего. В данном случае можно воспользоваться специализированными решениями на подобии RabbitMQ, ActiveMQ. Оба этих продукта представляют из себя серьезные решения, со множеством функций, хорошим масштабированием. Можно использовать и их, но нужно оценить, не будет ли это пушкой по воробьям. Кроме подобных решений функционал очередей предоставляет и Redis, в добавок получаем key-value хранилище, которое нам тоже понадобится.
Redis предоставляет простейший механизм Pub-Sub, которого достаточно для нашей задачи. Он достаточно быстр, прост в работе и имеет малые задержки при передаче.
Решение
Наша система будет иметь следующую схему.
Сообщения между пользователями одного узла передаются напрямую, а сообщения между узлами через шину.
Для этого:
- узел генерирует уникальное имя;
- подписывается по нему на сообщения в Redis;
- все клиенты подключенные к этому узлу записывают пару ключ-значение в виде идентификатора клиента и идентификатора узла, к которому он подключен;
- при отправке сообщения другому клиенту, узнаем имя узла и передаем сообщения в его очередь для обработки.
А теперь реализуем
В качестве библиотеки для websocket выбран faye-websocket-ruby. Для работы с Redis стандартный гем redis (hiredis) + код примера для PubSub через EventMachine, так как реализация из гема работает в блокирующем режиме, а при работе в одном потоке с web-сервером это не допустимо.
module App
class << self
def configuration
yield(config) if block_given?
config.sessions = Metriks.counter('total_sessions')
config.active = Metriks.counter('active_sessions')
end
def config
@config ||= OpenStruct.new( redis: nil, root: nil )
end
def id
@instance_id ||= SecureRandom.hex
end
def logger
@logger ||= Logger.new $stderr
end
def register
config.redis.multi do
config.redis.set "node_#{App.id}", true
config.redis.expire "node_#{App.id}", 60*10
end if config.redis
EM.next_tick do
config.sub = PubSub.connect
config.sub.subscribe App.id do |type, channel, message|
case type
when 'message'
begin
json = Oj.load(message, mode: :compat)
WS::Base.remote_messsage json
rescue => ex
App.logger.error "ERROR: #{message.class} #{message} #{ex.to_s}"
end
else
App.logger.debug "(#{type}) #{channel}:: #{message}"
end
end
@pingpong = EM.add_periodic_timer(30) do
App.config.redis.expire "node_#{App.id}", 60
end
end
rescue
config.redis = nil
end
end
end
Основная работа этого модуля заключается в методе register, который регистрирует себя на шине и ожидает входящие сообщения. Для мониторинга создается ключ вида node_%node_id% c TTL в 60 секунд и периодом обновления 30 секунд, на случай если узел отвалится. Таким образом можно всегда узнать сколько узлов сейчас находится в сети и их имена.
module WS
class Base
NEXT_RACK = [404, {}, []].freeze
def self.call(*args)
instance.call(*args)
end
def self.instance
@instance ||= self.new
end
def self.remote_messsage(json)
user = User.get json['from']
instance.send :process, user, json if user
rescue => ex
user.error( { error: ex.to_s } )
end
def initialize
@ws_cache = {}
end
def call(env)
return NEXT_RACK unless Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, ['xmpp'], ping: 5)
user = User.register(ws)
ws.onmessage = lambda do |event|
json = Oj.load(event.data, mode: :compat)
process(user, json )
end
ws.onclose = lambda do |event|
App.logger.info [:close, event.code, event.reason]
user.unregister
user = nil
end
ws.rack_response
rescue WS::User::NotUnique => ex
ws.send Oj.dump({ action: :error, data: { error: 'not unique session' } })
ws.close
ws.rack_response
end
private
def process(user, json)
action = json['action'].to_s
data = json['data']
return App.logger.info([:message, 'Empty action']) if action.empty?
return App.logger.info([:message, "Unknown action #{json['action']}"]) unless user.respond_to? "on_#{action}"
user.send "on_#{action}", data
rescue => ex
user.error({ error: ex.to_s })
puts ex.to_s
puts ex.backtrace
end
end
end
Данный класс отвечает за установление соединения и обработку сообщений. В методе call создается новый клиент и вешаются обработчики. Метод класса remote_messsage используется для приема внешних сообщений (из шины). Метод process — единая точка для сообщений пришедших напрямую от клиента и для сообщений пришедших по шине.
module WS
class User
include UserBehavior
attr_reader :id
class Error < StandardError; end
class RoomFull < Error; end
class NotFound < Error
attr_reader :id
def initialize(id); @id = id end
def to_s; "User '@#{id}' not found" end
end
class NotUnique < Error; end
class << self
def cache
@ws_cache ||= {}
end
def get(id)
fail NotFound.new(id) if id.to_s.empty?
@ws_cache.fetch(id)
rescue KeyError
WS::RemoteUser.new(id)
end
def register(ws)
self.new(ws)
end
def unregister(ws)
url = URI.parse(ws.url)
id = url.path.split('/').last
get(id).unregister
end
end
def initialize(ws)
@ws = ws
register
@pingpong = EM.add_periodic_timer(5) do
@ws.ping('') do
App.config.redis.expire @id, 15 if App.config.redis
end
end
end
def unregister
on_close if respond_to? :on_close
App.config.active.decrement
App.config.redis.del @id if App.config.redis
User.cache.delete(@id)
@pingpong.cancel
@pingpong = nil
@ws = nil
@id = nil
end
def send_client(from, action, data)
return unless @ws
data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat)
@ws.send(data)
end
private
def register
url = URI.parse(@ws.url)
@id = url.path.split('/').last
if App.config.redis
App.config.redis.multi do
App.config.redis.set @id, App.id
App.config.redis.expire @id, 15
end
App.config.sessions.increment
App.config.active.increment
end
User.cache[@id] = self
App.logger.info [:open, @ws.url, @ws.version, @ws.protocol]
on_register if respond_to? :on_close
self
end
end
class RemoteUser
include UserBehavior
attr_reader :id
attr_reader :node
def initialize(id)
@id = id.to_s
fail WS::User::NotFound.new(id) if @id.empty?
@node = App.config.redis.get(@id).to_s
fail WS::User::NotFound.new(id) if @node.empty?
end
def send_client(from, action, data)
return if node.to_s.empty?
App.logger.info ['REMOTE', self.id, from.id, action]
data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat)
App.config.redis.publish node, data
end
end
end
Метод register регистрирует пользователя в хранилище, сопоставляя его ID с ID узла куда он подключен и кэширует его в локальном списке. Метод unregister напротив убирает все записи о клиенте и удаляет таймер. Таймер используется для периодической проверки состояние клиента и обновления TTL для его записи, чтобы в Redis не было мертвых душ.
ID клиента получается из URL по которому был запрос на подключение. Он имеет формат ws://%hostname%/ws/%user_id% где user_id случайно сгенерированная уникальаня последовательность.
Метод send_client отправляет данные уже самому клиенту.
Отдельное место занимает метод класса get. Данный метод возвращает по ID экземпляр класса WS::User либо если пользователь не найден в локальном кэше создает экземпляр класса WS::RemoteUser. При его создании проверяется есть ли такой ID в хранилище и какому узлу он принадлежит. Если ID не найдет кидается исключение.
Класс WS::RemoteUser в отличии от WS::User имеет только один метод send_client, который пересылает сформированные сообщения через шину на требуемый узел.
Таким образом, неважно где находится клиент вызов метода send_client доставит данные до адресата.
module UserBehavior
module ClassMethods
def register_action(action, params = {})
return App.logger.info ['register_action', "Method #{action} already defined"] if respond_to? action
block = lambda do |*args |
if block_given?
data, from = yield(self, *args)
send_client from || self, action, data
else
send_client self, action, args.first
end
end
define_method action, &block
define_method "on_#{action}" do |data|
self.send action, data
end if params[:passthrough]
end
end
def self.included(base)
base.instance_exec do
extend ClassMethods
register_action :message do |user, from, text|
[{ to: user.id, text: text }, from]
end
register_action :error, passthrough: true
end
end
def on_message(data)
App.logger.info ['MESSAGE', id, data.to_s]
to_user_id = data['to']
to_user = WS::User.get(to_user_id)
to_user.message self, data['text']
rescue WS::User::NotFound => ex
error({ error: ex.to_s })
end
end
Обработка самих событий вынесена в отдельный модуль UserBehavior, который расширяет предыдущие два класса методами для реакции на сообщения. Каждое сообщение имеет поля FROM, ACTION и DATA. Первое идентифицирует от кого пришло, второе определяет метод, а третья сопутствующие данные. Так для ACTION со значением «message» будет вызван метод on_message, в который будет передано значение поля DATA.
Используя такой подход получилось реализовать прозрачную передачу сообщений между подключенными клиентами, при этом не важно находятся они на одном узле или на разных. Для тестирования запускал несколько экземпляров на разных портах, сообщения корректно отправлялись и получались.
Для желающих попробовать, код рабочего приложения выложил на github. Запускается просто, через rackup
PS
Данное решения не является законченным, думаю есть куда его улучшить и убрать лишнее, но как отправная точка вполне сгодится.
Автор: fuCtor