Совсем недавно я перешел с горячо любимого мной объектно-ориентированного C++ на новый для меня и еще не совсем понятный функциональный Scala. Причины перехода — совершенно отдельная история. Но одной из них было наличие достаточно хорошей, судя по отзывам, поддержки модели акторов — с помощью библиотеки Akka. Я давно мечтал опробовать на собственном опыте все описываемые преимущества этой технологии, а существующие реализации на C++ (CAF_C++ и Theron), которые я немного повертел в небольших тестах, оказались достаточно сырыми для моих нужд. Наиболее каноническое же (по моему мнению) решение модели акторов — Erlang, — я отмел, так как посчитал, что для его освоения мне понадобится слишком много времени, да и не факт, что я смогу найти необходимые мне сторонние библиотеки для этого далеко не универсального языка. Поэтому в результате выбор мой пал именно на Scala в связке с Akka, тем более что Scala я когда-то давно уже начинал изучать, но забросил за нецелесообразностью. Однако, как оказалось, на этот раз время для своего эксперимента я выбрал не самое удачное, в чем я убедился только спустя некоторое время.
Начало
Разработка с самого начала шла ударными темпами: почти весь необходимый функционал, который был нужен для моего приложения, был в изобилии доступен в интернете в качестве сторонних библиотек. А отсутствие того, что еще не написано конкретно для Scala, с лихвой компенсировалось богатейшим разнообразием различных компонент для Java. Однако, проблема, как всегда, возникла в самом неожиданном месте.
Дело в том, что в определенный момент моему приложению необходимо подключаться к IMAP-серверу для чтения и обработки полученных почтовых сообщений. А так как модель акторов подразумевает асинхронную работу с сетью, — мне понадобилась такая библиотека, которая и смогла бы подключаться к почтовому серверу и получать почту асинхронно, чтобы красиво и органично вписаться в структуру моего нового приложения. После недолгих поисков я наткнулся на модуль akka-camel, который позволяет использовать библиотеку apache-camel в качестве канала сообщений для акторов. А camel, как выяснилось, умеет подключаться, в числе всего прочего, к почтовым серверам. Кроме того, при указании нужных параметров подключения, camel может читать только свежие (с флагом Recent) сообщения, удалять прочитанные сообщения, или копировать/перемещать их в специально созданную папку. О большем я и мечтать не мог. Для начала работы в SBT всего лишь нужно было упомянуть в зависимостях akka-camel, camel-core и camel-mail.
Первая попытка
Создание актора и подключение его к IMAP-серверу заняло буквально несколько строк кода. И вот в лог приложения вывалился текст сообщения, которое я сам себе отправил на почту для теста. Я уже начал удовлетворенно потирать руки и думать над следующей задачей, но решил на всякий случай попробовать подключиться к рабочему ящику, в который будет в результате приходить почта для обработки. И вот тут мой актор выкинул исключение и «упал». Как оказалось, он не смог правильно распарсить ответ сервера. В интернете никакой информации об этой ошибке и возможных путях решения я не нашел. И немного приуныл. Тратить время на изучение спецификаций протокола и написание своего клиента как-то не очень хотелось. И я, скрепя сердце, во имя экономии времени, решил отступить от намеченного курса полной асинхронности и использовать синхронную блокирующую библиотеку JavaMail. Однако, в том же самом месте с тем же самым исключением упала и эта библиотека. После этого я уже твердо решил, что отказываться от идеалов — путь для слабаков и лентяев, а я все-таки напишу свого IMAP-клиента, с асинхронностью и акторами. Тем более, реализовывать весь IMAP целиком мне было не нужно, функционал требовался весьма ограниченный: авторизация, выбор папки INBOX, получение списка сообщений, чтение конкретного сообщения, копирование сообщения в другую папку и удаление.
Вторая попытка
Выбирать, с чего начать, мне особенно не пришлось. Как известно, разработчики Akka на определенном этапе отказались от Netty для сетевого ввода-вывода в пользу Spray. В дальнейшем, разработка Akka и Spray настолько тесно переплелась, что даже документация их взаимно ссылается друг на друга, а куски кода из spray.io плавно перекочевали в akka.io. И вот здесь-то меня и ждал основной подвох: когда-то, в период разработки версий 2.x, Akka взяла на вооружение используемую в Spray идею каналов (они же «трубопроводы», они же pipelines на англ.), которые позволяют (по утверждению авторов) с легкостью создавать сетевые протоколы, поддерживают обратное давление — то есть возможность «прикрутить вентиль» чтобы «трубы не забивались» на случай, если получатель не успевает обрабатывать поток данных от отправителя, фильтровать, разделять, умножать данные и что еще только с ними не делать. Но что-то в этих «трубопроводах» пошло не так, и они, так и не выйдя из стадии experimental, были объявлены deprecated. Последнее анонсированное новшество от Akka, цель которого — полноценно заменить каналы, — это «реакционные потоки» (reactive streams), о которых уже писали на хабре. Но так как это новшество все еще в стадии анонса, то в последней версии akka 2.3.6 его еще нет, а каналов — уже нет. Каналы остались в spray, но вся документация по ним ведет на устаревшую документацию Akka Version 2.2.0-RC1, которая уже не отражает всей нынешней действительности. А новая документация по Akka говорит, что каналы остались в Spray. В общем, первая версия моего почтового клиента получилась приблизительно похожей на многострадальное дитя Франкенштейна — собранной из разных кусочков разрозненной и местами противоречивой документации. От «трубопровода» я сразу решил отказаться в виду показавшейся мне заоблачной сложности этой концепции, и поэтому работал мой клиент напрямую с потоком символов от сервера в виде ByteString. Точнее сказать, с обрывками этого потока, так как никто не гарантирует, что интересующий ответ придет одним цельным куском, или два ответа не слепятся вместе. Каким-то чудесным образом, через кучу излитых в монитор матюков и переписанных кусков кода, мне удалось таки прикрутить к моему актору шифрование SSL/TLS с помощью нескольких найденных в разных местах кусков кода. Код, найденный мной только в какой-то (сильно устаревшей) конкретной из множества версий официальной документации работать отказался.
С реализацией каждого последующего этапа своего скромного функционала мой клиент становился все монструознее. В конце концов, после очередной итерации, часа в 3 ночи, я понял, что дальше так жить нельзя, с досады написал себе в TODO попробовать все тот же блокирующий JavaMail, но через POP3, без возможности перемещения сообщений по папкам, и ушел спать.
Третья попытка
Однако, будучи упертым отморозком, я на следующее утро (а точнее, в обед), вместо попыток приручить JavaMail, первым делом полез в исходники Spray на гитхабе. Я потратил пару дней на их изучение и адаптирование полученной информации под свои нужды, но потраченное время окупилось сторицей. В первую очередь, в исходниках я наткнулся на не описанный нигде в документации класс ConnectionHandler, который намного упростил мне и моему созданию жизнь, расставив многое по своим местам. Именно изучая применение этого класса в spray-can я понял, каким образом и где можно использовать эти самые «трубопроводы», о которых из документации я выяснил только то, какие задачи они призваны решать, но не как они это делают. Оттуда же, из исходников, я выяснил, как можно «соединять трубы» — то есть, объединять несколько «труб» (стадий) пайплайна (PipelineStage) в один общий «трубопровод», к чему это ведет, и как это предполагается использовать. И тут же я понял, как именно и почему работает прикрученное мной накануне SSL-шифрование, которое до этого момента оставалось для меня черным ящиком, который «просто работает и не надо лезть».
Просветление
Для тех, кому интересны подробности: «трубопровод» складывается из частей, в оригинале они называются «стадиями» или «этапами» (англ. stages), но я их буду называть «трубами» для поддержания образности. Объединяются эти «трубы» в коде с помощью оператора >>, порядок имеет значение. Первыми идут «трубы», наиболее «близкие» к клиенту, последними — к серверу. То есть, все, что идет от клиента — проходит по «трубопроводу» слева направо, от сервера — наоборот, справа налево. К примеру, «труба», осуществляющая шифрование, указывается последней, поэтому все, что посылает в «трубопровод» клиент, сначала проходит все необходимые трансформации, и только потом шифруется, и на сервер в итоге отправляются шифрованные данные. И наоборот, все, что присылает сервер — сначала расшифровывается, потом трансформируется оставшейся частью «трубопровода». Для чего же вообще нужна вся эта сантехника? Для самых разнообразных вещей. Например, для фильтрации посылаемых или получаемых данных. Или для трансформации одних сущностей в другие, что полезно при реализации протоколов. Скажем, существует некий case class DeleteMessage(id: String). Клиент посылает в «трубопровод» экземпляр DeleteMessage(«23»), и на одной из стадий (в одной из «труб») этот класс преобразовывается в понятную серверу команду «a001 STORE 23 +FLAGS.SILENT (Deleted)». Еще «трубы» могут задерживать доставку данных, если, например, ответ от сервера неполный, и ожидается дополнение.
Основной момент, который поначалу совершенно сбил меня с толку, — это наличие концептуальных понятий «событие» (Event) и «команда» (Command), и соответственное разбиение трубопровода на два: событийный (event pipeline) и командный (command pipeline) в рамках одного класса: PipelineStage. Именно эти не понятые мной в самом начале концепции (ну мануалы же, тем более такие разрозненные и непонятные, дочитывают до конца только неудачники, нормальные парни сразу идут напролом и набивают шишек) заставили меня подумать о трубопроводах плохое и решить для себя, что это слишком сложно и не стоит потраченного времени. Мне показалось, что это как-то связано с тем самым «обратным давлением», которое обязательно придется учитывать и реализовывать, хотя мне оно совершенно не нужно. И это впридачу к тому, что я вообще поначалу не понимал, куда и одну-то «трубу» «втыкать», и как в нее что-то засунуть, чтобы это что-то дошло до сервера. И как потом из нее вытащить ответ. А тут этих труб еще и две оказалось. С другой стороны, если бы не это недопонимание — я бы не ощутил в полной мере всей мощи «сантехнического» подхода после изобретения своего маленького чудовища. На самом деле, идея оказалось настолько простой, что мне стало даже смешно: Event — это то, что приходит от сервера клиенту, Command — это то, что идет от клиента на сервер. Труба в результате оказалась одна, просто внутри она сама для себя разделяет два встречных потока, чтоб не запутаться, что куда откуда идет.
Результат
В общем, в результате моих изысканий у меня появился новый класс, отвечающий за соединение с IMAP-сервером, который принял такой краткий и лаконичный вид:
class Connection(client: ActorRef, remoteAddress: InetSocketAddress, sslEncryption: Boolean, connectTimeout: Duration)(implicit sslEngineProvider: ClientSSLEngineProvider) extends ConnectionHandler { actor =>
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
def tcp = IO(Tcp)(context.system)
log.debug("Attempting connection to {}", remoteAddress)
tcp ! Tcp.Connect(remoteAddress)//, timeout = Some(Duration(connectTimeout, TimeUnit.SECONDS)))
context.setReceiveTimeout(connectTimeout)
val pipeline = eventFrontend >> ResponseParsing() >> SslTlsSupport(512, publishSslSessionInfo = false)
override def receive: Receive = {
case connected: Tcp.Connected =>
val connection = sender()
connection ! Tcp.Register(self, keepOpenOnPeerClosed = sslEncryption)
client ! connected
context.watch(connection)
context.become(running(connection, pipeline, pipelineContext(connected)))
case Tcp.CommandFailed(_: Tcp.Connect) =>
throw new ConnectionFailure(1, "Failed to connect to IMAP server")
case ReceiveTimeout =>
log.warning("Connect timed out after {}", connectTimeout)
throw new ConnectionFailure(2, "Connect timed out")
}
def eventFrontend = new PipelineStage {
def apply(context: PipelineContext, commandPL: CPL, eventPL: EPL): Pipelines = new Pipelines {
val commandPipeline: CPL = commandPL
val eventPipeline: EPL = {
case event => client ! event
}
}
}
def pipelineContext(connected: Tcp.Connected) = new SslTlsContext {
def actorContext = context
def remoteAddress = connected.remoteAddress
def localAddress = connected.localAddress
def log = actor.log
def sslEngine = if (sslEncryption) sslEngineProvider(this) else None
}
}
Этот класс я получил упрощением класса spray.can.client.HttpClientConnection. Он наследуется от spray.io.ConnectionHandler, а тот, в свою очередь — от akka.Actor. То есть, он представляет собой обычного актора. По сути — это сантехник, назовем его Станиславом. Станислав отвечает за трубопровод от клиента к серверу и доставку данных по этому трубопроводу, туда и обратно. Он прокладывает этот трубопровод при инициализации актора (т.е., его самого) стандартным context.actorOf(…). Свойство pipeline — и есть тот самый трубопровод, собранный в данном случае из трех труб — фронтенда, парсера ответов от сервера и SSL/TLS-шифровальщика. И теперь, все данные, отправленные сантехнику Станиславу (обычной посылкой ему, как актору, сообщения оператором !, методом tell или любыми другими доступными способами), он бережно сложит в трубу и отправит на сервер. А все, что придет в ответ от сервера, так же бережно достанет из трубы и отошлет клиенту. Такой вот он, наш работящий парень Стасик.
Что касается использованных мною «труб».
SslTlsSupport — это реализованная в Spray стандартная возможность подключения SSL/TLS-шифрования. Требует особого контекста (возвращаемого методом pipelineContext), а также требует поддерживать клиентскую сторону соединения открытой, даже после того как сервер закроет соединение со своей стороны (т.н. полуоткрытое соединение).
ResponseParsing — это написанный уже мной объект с функцией apply(), которая возращает экземпляр «трубы», отвечающей за парсинг — разбор потока символов от сервера (в виде «сырых» сообщений Tcp.Received) на case-классы конкретных ответов, понимаемые и обрабатываемые уже целевым актором (коим и является мой IMAP-клиент). На парсер также возложена ответственность следить за целостностью возвращаемых данных: ждать дополнительных данных, если ответ от сервера не полный, а также отделять несколько ответов друг от друга, если они пришли одним куском. Это очень сильно разгрузило код моего клиента, который теперь из ужасного лоскутного монстра превратился в простого, понятного, прямого и незамысловатого парня Василия, закадычного друга нашего аккуратиста Стасика (еще бы им не дружить, Стас просто пришел и кучу грязной работы взял на себя). Масса тестов, необходимых для поддержания Васиной работоспособности, тоже заметно поуменьшилась.
Наконец, eventFrontend — функция, возвращающая экземпляр «трубы» — PipelineStage, суть которой в одном: передавать все «события» (то есть, данные, прошедшие по всему трубопроводу от сервера, и уже претерпевшие все необходимые изменения) клиенту, то есть Васе, адрес которого Станислав знает благодаря переданной в конструкторе класса переменной.
Какого-то специального рендеринга команд я не делал, за неимением такой необходимости. Все команды на сервер посылаются с помощью простого Tcp.Write.
Эпилог
Вот, собственно, и вся сантехника. В качестве эпилога могу сказать, что сам клиент представляет из себя конечный автомат (Finite-state machine), основанный на Akka.FSM. Я просто влюбился в реализацию этой концепции в Akka, так как написание автомата и юнит-тестов для него — это такая себе увлекательная мини-игра.
Автор: PHmaster