Прошло всего лишь каких-то 11577635 секунд с конца осенней школы GoTo в ИТМО. Неделя направления Распределённых систем началась с прототипирования распределённой системы на Cloud Haskell. Мы начали бодро и потому быстро выяснили, что существующую документацию без PhD понять сложновато — и решили написать методичку.
Под катом введение в p2p cloud haskell, немножко функционального стека прототипирования РС, мотивация и «но зачем».
Положим, вам захотелось сделать что-то такое распределённое (скажем, %sCoin), что не покрывается хорошо существующими системами (YARN таки не ответ на все вопросы). Если начать делать всё руками, можно быстро обнаружить огромное количество проблем — от мультиплексирования соединений и шифрования до пробива NAT и peer routing, которые совсем, очень не хочется решать (не в первый раз в истории человечества), особенно если цель — конечный продукт или красивый, работающий прототип.
Любому прикладному программисту от такой постановки задачи быстро придёт в голову слово «библиотека». И действительно. Можно взять discovery и кусочки routing из, например, Kademlia, стандартные механизмы пробива NAT — STUN, TURN, ICE — в общем, тоже известны, для шифрования — ну, прибьём TCP (зная специфику своей сети) и сделаем TLS 1.3 с захардкоженными шифрами, etc.
Но это всё ещё будет требовать много времени и экспертизы. Инвесторам терпения может и не хватить.
Здесь более опытным коллегам придёт мысль: «нужен фреймворк!». И правда. Для Прототипирования Распределённых Систем и Приложений.
А кто-то даже скажет: б-же, так это же libp2p! И будет прав. Частично.
libp2p решает проблему транспорта, его мультиплексирования и шифрования, discovery, peer routing, пробива NAT, connection upgrade и т.д. — в общем, многие сетевые и криптопотребности распределённых приложений. На Go и JS.
Это отличный фреймворк, но у него есть пара проблем. Это Go и JS. Кроме того, было бы приятно иметь во фреймворке что-нибудь для репликации.
the fragmented nature of the tutorials, some of which didn’t work at all, convinced me to not use Cloud Haskell
Наш проект начался с амбиции сделать блокчейн (простите, инновации) на Хаскелле — поэтому libp2p у нас не было — и за четыре дня. Мы начали искать нечто, что сделало бы сеть (транспорт, discovery, сериализацию) за нас. Нашли Cloud Haskell. Обнаружили, что с документацией сложновато. Решили написать своё введение. Итак:
Пишем небесных пчелок на Cloud Haskell
В примере мы напишем систему из пчелок: есть улей — кластер машин, и пчелки — ноды (машины). Пчелки отправляются на разведку искать цветочки и возвращаются с координатами вкусных цветочков в улей, а все другие пчелки должны об этих координатах узнать.
Вам вовсе не обязательно запускать программу на нескольких компьютерах — достаточно и ноутбука, на котором мы параллельно запустим нашу программу.
Cloud Haskell работает по принципу обмена сообщений между нодами (такая модель называется message passing), потому что ноды не разделяют общее пространство ресурсов (RAM, …) — модель shared state легко использовать не получится. Actor Model — частный пример модели message passing, когда сообщения рассылают акторы другим акторам и принимают сообщения в свой mailbox — так message passing выглядит в Cloud Haskell.
1. Для начала определим типы данных, представителями которых будут обмениваться пчелы: src/Types.hs
type Flower = (Int, Int) -- координаты цветка
type Flowers = GSet Flower -- Grow-Only Set цветков
Пчелы должны знать о каждом цветке, известном хотя бы одной пчелке в улье, следовательно им необходимо поддерживать единое состояние «базы данных» цветков в своем пчелином мозгу — решить проблему репликации данных, для чего нужно уметь достигать консенсус: разрешать конфликты между базами данных разных пчел. Для этого мы будем использовать структуру данных GSet (Grow-only Set) — множество, в которое элементы можно только добавлять, но не удалять. Это одна из структур данных CRDT.
Log A Log B
| |
logA.append("one") logA.append("two")
| |
v v
+-----+ +-------+
|"one"| |"hello"|
+-----+ +-------+
| |
logA.append("two") logA.append("world")
| |
v v
+-----------+ +---------------+
|"one","two"| |"hello","world"|
+-----------+ +---------------+
| |
| |
logA.join(logB) <----------+
|
v
+---------------------------+
|"one","hello","two","world"|
+---------------------------+
Необходим интерфейс, который будет служить рецепторами пчелки: добавление элемента в GSet, а также просмотр вкусных цветочков, известных улью — реализуем это в виде REPL (интерактивной оболочки).
2. Приступим к реализации ноды, которую в дальнейшем будем запускать из командной строки: app/Main.hs
main = do -- точка входа
[port, bootstrapPort] <- getArgs -- (1) считываем порт ноды и bootstrap ноды из аргументов командной строки
let hostName = "127.0.0.1" -- IP ноды
P2P.bootstrap -- вызываем функцию инициализации ноды со следующими аргументами:
hostName
port -- порт ноды
(port -> (hostName, port))
initRemoteTable -- (2) создаем remote table
[P2P.makeNodeId (hostName ++ ":" ++ bootstrapPort)] -- список из одной bootstrap ноды
spawnNode -- функция запуска логики ноды, ее код мы напишем потом
Для начала ноды должны как-то друг о друге узнать, то есть совершить peer discovery. В Cloud Haskell есть решение ‘из коробки’ — при инициализации ноды нам достаточно указать хотя бы одну другую bootstrap ноду: нода совершает с bootstrap нодой Peer Exchange — они обмениваются адресами знакомых им нод (aka пиров).
Remote table — штука, которая позволяет пирам обмениваться типами haskell, если они поддерживают сериализацию, то есть их можно представить в формате, который можно переслать по сети и восстановить обратно в обьект Haskell. Тип поддерживает сериализацию, если он реализовывает тайпкласс class (Binary a, Typeable) => Serializable a. Вам не надо самому придумывать реализацию Serializable, Binary и Typeable — haskell сделает это за вас (с помощью магического механизма automatic deriving):
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-} -- прагмы языка, позволяющие автоматически реализовать Binary
data Example = Example
deriving (Typeable, Generic)
instance Binary Example
Далее мы будем опускать deriving ..., instance Binary и прагмы ради краткости кода.
3. Теперь напишем логику запуска ноды:
spawnNode :: Process () -- (1) функция запуска логики ноды
spawnNode = do
liftIO $ threadDelay 3000000 -- даем bootstrap ноде время чтобы запуститься
let flowers = S.initial :: Flowers -- инициализирум GSet для хранения координат цветков
self <- getSelfPid -- (3) получаем наш Pid чтобы REPL мог посылать нам сообщения
repl <- spawnLocal $ runRepl self -- (2) создаем REPL в отдельном потоке
register "bees" self -- теперь нода будет получать сообщения из канала "bees"
spawnLocal $ forever $ do -- (3) запускаем тикер:
send self Tick -- оповестить основной поток что надо передать пирам свое состояние
liftIO $ threadDelay $ 10^6 -- ждемс 0.1 секунды перед тем, как снова отослать состояние
runNode (NodeConfig repl) flowers -- (5) запускаем ноду
В Cloud Haskell основная функциональная единица это Process (не путайте с процессом ОС). Они основаны на легковесных зеленых потоках и могут посылать другим процессам сообщения (функция send чтобы послать определенному процессу или P2P.nsendPeers чтобы послать всем знакомым нодам), принимать сообщения в свой mailbox (функция expect или receive*), запускать другие процессы (например локально с помощью spawnLocal) и т.д.
Нам необходимо реализовать REPL в отдельном потоке, иначе будет происходить блокировка основного потока (ноды), следовательно надо сделать потоко-безопасный интерфейс для GSet, чтобы он был доступен для изменения как для REPL, так и для ноды. Так как система основана на акторах, мы будем посылать сообщения на изменение множества и обрабатывать их последовательно в бесконечном цикле обработки соообщений в главном потоке.
Мы запускаем REPL как отдельный процесс cloud-haskell (т.е. как зеленый поток), и также передаем ему Pid основного процесса (уникальный идентификатор процесса) чтобы REPL знал, куда посылать команды, введенные пользователем, в виде сообщений. Далее мы получаем Pid REPL’а (его возвращает spawnLocal) чтобы посылать ему ответы на команды. Код REPL лежит тут.
Как будет работать репликация цветков?
Каждая нода будет периодически рассылать свое состояние всем пирам (broadcast) — и это в совокупности с CRDT решает проблему репликации:
Пусть есть ноды A и B. Предположим у A нет элемента x, а у Bx есть. После того, как B совершит broadcast, A добавит x — консенсус достигнут, ч.т.д.
Если бы мы имели обыкновенное множество, а не GSet, то ничего бы не получилось: Предположим у A и B есть элемент y. Пусть A удалит y. После того, как B совершит broadcast, A получит y обратно.
Когда мы посылаем сообщение всем нодам, мы должны указать имя сервиса — на самом деле мы посылаем сообщение только тем нодам, которые зарегистрировали себя в регистре как поддерживающиe этот сервис. Здесь мы регистрируем нашу ноду как поддерживающую сервис “bees”: register "bees" self.
Нода должна знать, когда надо послать другим свое состояние. Самое простое решение — делать это по таймеру: ждать секунду, а потом действовать, но тогда бы мы блокировали основной поток обработки сообщений. Здесь мы запускаем процесс через spawnLocal, который сначала посылает сообщение Tick главному процессу (когда главный процесс видет Tick, он посылает нодам свое состояние), а потом ждет 1 секунду и повторяет.
4. Ок, теперь (наконец-то!) мы можем приступить к логике работы основного процесса — код исполнения ноды:
runNode :: NodeConfig -> Flowers -> Process () -- (1) функция логики ноды
runNode config@(NodeConfig repl) flowers = do
let run = runNode config
receiveWait -- (2) ждем сообщений
[ match (command -> -- (3) если нам пришло что-то типа Command от REPL, то
newFlowers <- handleReplCommand config flowers -- получаем новое состояние цветков
run newFlowers)
, match (Tick -> do -- сигнал о том, что надо поделиться своим состоянием с другими
P2P.nsendPeers "bees" flowers -- отправить всем пирам цветки
run flowers)
, match (newFlowers -> do -- кто-то отправил ноде цветочки
run $ newFlowers `union` flowers) -- добавляем новые в базу - по сути обьединение множеств
]
Посмотрим на сигнатуру: runNode принимает конфигурацию ноды типа NodeConfig — та информация, которая не будет меняться во время исполнения. В нашем случае это просто Pid REPL. Еще она принимает свое текущее состояние — GSet цветочков. Но как добавить цветок, ведь GSet — неизменяемый тип данных? Очень просто: сделаем нашу функцию рекурсивной, и при каждом изменении состояния будем запускать ее заново.
receiveWait принимает список функций с одним аргументом (входящим сообщением), вытаскивает сообщение и вызывает функцию, подходящую по типу сообщения.
Если нам пришло сообщение такого типа: data Command = Add Flower | Show, то это — команда от REPL. handleReplCommmand — функция для обработки команды:
handleReplCommand :: NodeConfig -> Flowers -> Command -> Process Flowers
handleReplCommand (NodeConfig repl) flowers (Add flower) = do -- команда добавления элемента от пользователя
send repl (Added flower) -- отправить REPLу сообщение, что цветок добавлен
return $ S.add flower flowers -- запускаем ее уже с новым цветком
handleReplCommand (NodeConfig repl) flowers Show = do -- запрос показать цветочки
send repl (HereUR $ toList flowers) -- отправить цветочки в виде списка
return flowers
Если пришел Tick от тикера — значит надо отправить свое состояние: P2P.nsendPeers "bees" flowers. Здесь “bees” — имя сервиса, то есть мы пересылаем цветки только тем нодам, которые зарегистрировали себя как “bees”.
Если же нам пришли цветочки от какой-то другой пчелки, нам надо все незнакомые цветки добавить себе, то есть попросту объединить множество новых с множеством существующих.
5. Вот и все! Загрузим полный исходный код и скомпилируем:
REPL выведет приглашение. Попробуйте в одном терминале ввести Add (1, 2), т.е. добавить цветок с координатами (1, 2), а в другом — Show, и увидите, что и у второй ноды теперь есть такой цветок.
Часть 2>/dev/null нужна чтобы скрыть stderr, в который Cloud Haskell выводит лог. Если этого не сделать, то мы не сможем нормально пользоваться REPL. Можете заменить /dev/null на log.txt и потом посмотреть, что же он вывел.
Надеюсь, мы убедили вас, что создание распределенных систем на Haskell — это не так уж и страшно :)
Можно придумать много реальных юз-кейсов для похожей системы: например, решение проблемы зайцев в общественном транспорте: человек, проходя в транспорт по карточке, маркируется как зашедший (добавляем его id в первый GSet), а на выходе — как вышедший (добавляем id во второй GSet). Ночью (когда транспорт не работает) происходит проверка — если человек вошел и вышел, то он не заяц.