Обмен сообщениями → PubSub внутри OTP

в 15:24, , рубрики: Elixir/Phoenix, messaging, pub/sub

OTP расшифровывается как Open Telecom Platform; так исторически сложилось, потому что платформа создавалась для нужд и на деньги Ericsson. Но, в принципе, это название имеет примерно столько же коннотаций с ее функциональностью, сколько и яблоки со среднего качества телефонами.

Основной отличительной характеристикой OTP по замыслу авторов является отказоустойчивость. Не многопоточность, не модель акторов, не богатые возможности pattern matching’а, даже не прозрачная кластеризация и не hot code upgrades. Отказоустойчивость.

Виртуальная машина эрланга на поверхностный взгляд устроена очень просто: есть куча «процессов» (не системных процессов, эрланг-процессов) с изолированной памятью, которые могут обмениваться сообщениями. Все. Вот, что говорил Джо Армстронг по этому поводу:

In my blog I argued that processes should behave pretty much like people. People have private memories and exchange data by message passing.
Why I don't like shared memory

Обмен сообщениями внутри OTP очень прост: один процесс посылает сообщение другому (или группе других процессов), синхронно, или асинхронно. Но для этого необходимо знать, кому эти сообщения посылать. То есть менеджером обмена выступает отправитель. Но что если мы хотим просто слать broadcast и дать возможность всем заинтересованным процессам подписаться на это сообщение?

Да, это обычный PubSub, но из коробки в OTP он не реализован. Ну да не беда, все кирпичики для того, чтобы за часик воплотить его на коленке у нас есть. Приступим.

Варианты реализации

В принципе, Elixir включает модуль Registry, который может использован как scaffold для pubsub. Немного домотканного кода, аккуратный присмотр за всеми участниками (супервизор для каждого), — и все готово. Единственная проблема — Registry локален и не умеет в кластеризацию. То есть в распределенной среде (distributed nodes) эта красота работать не будет.

На нашу удачу, есть и распределенная реализация Phoenix.PubSub, которая поставляется с двумя готовыми реализациями: Phoenix.PubSub.PG2 и Phoenix.PubSub.Redis. Ну, Redis — это явно лишнее звено в нашей цепочке, а вот PG2, работающий поверх эрланговских групп процессов pg2 — самое оно. Тоже, правда, без boilerplate не обойдется.

Итак, у нас есть все, чтобы наладить удобные PubSub подписки в нашем приложении. Пора открывать текстовый редактор? — Не совсем. Я не люблю дублировать код из проекта в проект и все, что я могу вычленить в библиотеку — обособляется для повторного использования.

Envío

Таким образом родился пакет Envío. Поскольку, болтовня, как известно, гроша выломанного не сто́ит, начнем с примеров использования.

Локальная рассылка → Registry

defmodule MyApp.Sub do
  use Envio.Subscriber, channels: [{MyApp.Pub, :main}]

  def handle_envio(message, state) do
    # optionally call the default implementation
    {:noreply, state} = super(message, state)
    # handle it!
    IO.inspect({message, state}, label: "Received")
    # respond with `{:noreply, state}` as by contract
    {:noreply, state}
  end
end

Вот, в общем-то, и все. Осталось запихнуть MyApp.Sub в наше дерево супервизоров, и этот процесс начнет получать все сообщения, высланные при помощи функций из MyApp.Pub, который тоже не перегружен кодом.

defmodule MyApp.Pub do
  use Envio.Publisher, channel: :main

  def publish(channel, what), do: broadcast(channel, what)
  def publish(what), do: broadcast(what) # send to :main
end

Распределенная рассылка → PG2

Для распределенных систем, состоящих из множества узлов, такой способ не подойдет. Нам нужно уметь подписываться на сообщения от других узлов, и Registry тут не помощник. Зато есть PG2, реализующий то же behaviour.

defmodule Pg2Sucker do
  use Envio.Subscriber, channels: ["main"], manager: :phoenix_pub_sub

  def handle_envio(message, state) do
    {:noreply, state} = super(message, state)
    IO.inspect({message, state}, label: "Received")
    {:noreply, state}
  end
end

Единственное отличие от автономного кода выше — manager: :phoenix_pub_sub параметр, который мы передаем в use Envio.Subscriberuse Envio.Publisher), чтобы построить модуль на основе :pg2 вместо локального Registry. Теперь сообщения, отправленные с помощью такого Publisher будут доступны на всех узлах в кластере.

Применение

Envío поддерживает так называемые backends. В поставке идет Envio.Slack, который позволяет донельзя упростить отсылку сообщений в Slack. Все, что требуется от приложения — отправить сообщение в канал, сконфигурированный в файле config/prod.exs — все остальное сделает Envío. Вот пример конфигурации:

config :envio, :backends, %{
  Envio.Slack => %{
    {MyApp.Pub, :slack} => [
      hook_url: {:system, "SLACK_ENVIO_HOOK_URL"}
    ]
  }
}

Теперь все сообщения, отправленные при помощи вызова MyApp.Pub.publish(:slack, %{foo: :bar}) будут доставлены в соответствующий канал в Slack, красиво отформатированные. Для того, чтобы перестать слать сообщения в Slack, достаточно остановить процесс Envio.Slack. Больше примеров (например, лог в IO) можно найти в тестах.

Да чего я распинаюсь, попробуйте сами.

def deps do
  [
    {:envio, "~> 0.8"}
  ]
end

Удачного сообщательства!

Автор: Aleksei Matiushkin

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js