Реактивные акторы на java

в 11:00, , рубрики: actor, actor model, akka, java, Parallels, высокая производительность, Программирование

Существует много технологий для организации параллельных вычислений, одна из наиболее перспективных и простых (да-да) — модель акторов. Она позволяет частично избавится от насущных проблем параллелизма, вроде состояния гонки, блокирующих ожиданий окончания операций, бесконечных мьютексов и синхронизаций и многого иного. Так же подобный подход существенно облегчает распараллеливание кода.

Знакомится будем на примере фреймворка akka используя язык java (сам akka написан на scala).

Теория и принципы

Актор — это изолированный (нет прямых ссылок на экземпляр) объект, занимающийся асинхронной обработкой входящих сообщений. Сообщение — любой неизменяемый объект реализующий интерфейс Serializable. Они складируются в очередь, и когда система передаёт управление актору (об этом далее) — объекты извлекаются из очереди по принципу FIFO и обрабатываются согласно внутренней логике, которую мы должны явно задать. Акторы образуют древовидную иерархию, каждый из них имеет актора (или систему акторов) в качестве создателя.
Чтобы отправить сообщение актору нужно иметь на него ссылку (не путать ссылкой на объект). Она бывает двух видов — ActorRef и ActorSelection. Ссылка имеет ключевой метод .tell(message, sender) отправляющий сообщение по указанному пути. Sender — это тоже ссылка типа ActorRef, именно её актор получатель получит при вызове метода getSender() при обработке сообщения.

ActorRef — ссылка на конкретный, гарантированно существующий актор, аналог StrongReference. Отправляя по ней сообщение существуют гарантии, что оно дойдёт, или же будет брошен exception. В явном виде ActorRef можно получить, вызвав метод getSender() при обработке сообщений, получив прямую ссылку на отправителя, или же при создании дочерних акторов.

ActorSelection — слабая ссылка, существенным отличием которой является возможность создать ActorSelection зная лишь имя и путь к актору, но обратной стороной медали является отсутствие гарантий того, что по указанному пути хоть что-то существует. Отправить сообщение можно, но его доставку никто не гарантирует. Поэтому если нужно отправить данные с гарантией, но не имея прямой ссылки на нужный актор, ему отправляется произвольное сообщение, из ответа на которое можно взять сильную ссылку ActorRef.

Собственно, этот базовый минимум, после которого можно продолжать изучение уже на практике. Подключаем библиотеки, их в стандартной комплектации акки достаточно много, дабы избежать ручного разрешения зависимостей проще подключить сразу все (или использовать maven/etc).

Немного практики

Асинхронный мир начинает с системы акторов, создадим её. Обычно нет необходимости иметь более одной системы акторов в приложении.

ActomSystem system = ActorSystem.create("learning2hard");

Система имеет множество настроек, которые нужно выставить до её запуска (а она активна сразу после создания). Это настраивается в внешнем config файле или передаётся вторым аргументом методу-конструктору, о котором подробнее можно почитать на сайте разработчика, нам пока это не нужно, просто стоит знать.
Теперь следует создать актор ядра, дочерними к которому будет вся будущая логика. Технически — актор это объект, наследующий UntypedActor. Сделаем это.

public class Kernel extends UntypedActor {
	@Override
	public void onReceive(Object arg0) throws Exception {		
		unhandled(arg0);		
	}
}

Собственно, класс актора готов. Метод unhandled(arg0) помечает пришедший объект как необработанный и пишет об этом в лог. Добавим экземпляр kernel'а системе акторов:

final ActorRef kernel = system.actorOf(Props.create(Kernel.class), "kernel");

Мы создали по отношению к системе (она является корнем иерархического древа акторов) дочерний актор класса Kernel и дали ему название «kernel». В акке повсеместно используются строковые названия и пути. Если в конструктор Kernel'а необходимо передать какие-то объекты — это делается при помощи Props.create(Kernel.class, obj1, obj2, obj3...).

Сделали эталон программирования — программу, которая не делает ничего полезного, добавим функционал, например — обработку команд, читаемых из консоли. В контексте актора нельзя использовать блокирующие поток вызовы/операции, совсем (есть способ, как их таки можно туда засунуть, о загадочных монадах Future немного позже), а ожидание ввода команды в консоль — одна из таких. Поэтому целесообразно всячески выносить из мира акторов и избегать блокирующих вызовов, применим первое — будем вне системы акторов читать команды с консоли и отправлять их в виде сообщений ядру.

Scanner sc = new Scanner(System.in);
		while (sc.hasNext()) {
			String f = sc.nextLine();
			if (f.equals("exit")) break;
			kernel.tell(f, ActorRef.noSender());
		}
		system.shutdown();

Единственное, что стоит отметить — мы посылаем из «внешнего мира» сообщение в мир акторов, ссылки на отправляющего, ясное дело, нет, поэтому применяется заглушка.

Теперь классу kernel нужно дописать обработку пришедшей строки, сделаем простое эхо.

@Override
	public void onReceive(Object arg0) throws Exception {
		if (arg0 instanceof String) {
			String s = (String) arg0;
			System.out.println(s);
			return;
		}
		unhandled(arg0);		
	}

Примерно так выглядит логика любого актора, список блоков с условием instanceof и return при окончании + unhandled(arg0) после всех блоков, авось что-то не обработалось. Мы в качестве sender'а использовали ActorRef.noSender(), теперь самое время посмотреть, как выглядит адрес:

System.out.println("Адрес отправителя: " + getSender());

Получаем Actor[akka://learning2hard/deadLetters]
И видим, что адрес существует, а не какой-нибудь там null, указывает на заглушку корзины, которая принимает объекты и ничего с ними не делает. Такой тип адреса называется локальным и абсолютным, он верный только в рамках данной системы акторов. Подробнее о адресах ниже.

Но один в поле не воин, создадим ещё акторов, и придумаем простую распределённую задачу — проверка числа на простоту. Чтобы жизнь мёдом не казалась — будем их перебирать, а чтобы акторы не ленились — без оптимизаций. Вводится с консоли число, отправляется ядру, оно должно оценить количество делителей, разослать работу рабочим, где каждый перебирает свой диапазон делителей, и когда хотя бы один заявит, что число составное — написать результат, если все закончили свою работу и не нашли ничего — число простое.

Написать проще чем рассказать, сделаем работника с прозрачной логикой.

public class PrimeWorker extends UntypedActor {

	@Override
	public void onReceive(Object arg0) throws Exception {
		if (arg0 instanceof Job) {
			Job task = (Job) arg0;
			for (int i = task.from; i < task.to; i++) 
				if (task.number % i == 0) { //Число составное
					getSender().tell(new JobResult(task.jobID,false), getSelf());
					return;
				}
			getSender().tell(new JobResult(task.jobID,true), getSelf());
			
		}
		unhandled(arg0);		
	}
	
	public static class Job implements Serializable {
		private static final long serialVersionUID = 5095931000566324969L;
		
		public final int jobID; //Идентификатор задачи, используется для аггрегации результатов
		public final int number; //Исследуемое число
		public final int from; //Нижняя граница диапазона перебора
		public final int to; //Верхняя граница диапазона перебора
		public Job(int jobID, int number, int from, int to) {
			this.jobID = jobID; this.number = number; this.from = from; this.to = to;
		}
	}
	
	public static class JobResult implements Serializable {
		private static final long serialVersionUID = -1788069759380966076L;
		public final int jobID;
		public final boolean isPrime;
		public JobResult(int jobID, boolean isPrime) {
			this.jobID = jobID; this.isPrime = isPrime;
		}
	}

}

Ничего сложного быть не должно. Теперь займёмся кодом ядра, оно выполняет две функции — распределяет работу и объединяет результаты

public class Kernel extends UntypedActor {		
	
	//work ID -> (currentWorkerCount, isPrime)
	private TreeMap<Integer, Pair<Integer, Boolean>> jobs = new TreeMap<Integer, Pair<Integer, Boolean>>();
	private int job_id_counter = 0;

	@Override
	public void onReceive(Object arg0) throws Exception {
		if (arg0 instanceof String) {			
			int i = Integer.valueOf((String) arg0);
			for (int j = 2; j < i; j++) 
				getContext().actorOf(Props.create(PrimeWorker.class)).tell(new PrimeWorker.Job(job_id_counter, i, j, j+1), getSelf());			
			jobs.put(job_id_counter, Pair.get(i - 2, true));
			job_id_counter++;
			return;
		}
		
		if (arg0 instanceof JobResult) {
			JobResult jr = (JobResult) arg0;			
			Pair<Integer, Boolean> task = jobs.get(jr.jobID);
			if (!jr.isPrime) task.second = false;
			task.first--;
			if (task.first < 1){
				System.out.println("Число " + jr.number + (task.second ? " простое" : " составное"));
				jobs.remove(jr.jobID);
			}
			getSender().tell(PoisonPill.getInstance(), getSelf()); //Актор сделал свою работу, отправляем ему команду уничтожения
		}
		unhandled(arg0);		
	}
	
	public static class Pair<A,B> {
		public A first;
		public B second;
		public static <C,D> Pair<C,D> get(C a, D b) {
			Pair<C,D> p = new Pair();
			p.first = a;
			p.second = b;
			return p;
		}
	}
}

Не останавливаемся на отсутствии проверок, не это цель. Сразу в глаза бросаются некоторая расточительность — мы создаем акторы для одной операции и потом их уничтожаем. Конечно, можно поместить ссылки на них в массив, и по очереди вызывать оттуда, но, к счастью, всё уже сделано. Попробуем улучшить код применяя готовые решения.

Роутер — специализированный объект, передающий входящие сообщения акторам, используя определённую стратегию их выбора. Бывает двух типов — пул и группа. Группа — выбирает акторы согласно стратегии по указанному пути, пул — создает их сам. Роутер редко используется как самостоятельный объект, зачастую он инкапсулируется в актор, именно с такими роутерами-акторами мы будем иметь дело. Стратегий выбора существует много, о них можно почитать на сайте авторов фреймворка, наиболее универсальные это SmallestMailboxRouter(SM) и BroadcastRouter. Первый выбирает наименее загруженный актор из набора (по размеру мэилбокса), второй — рассылает сообщение всем. Объявим роутер с пулом акторов и стратегией SM.

private ActorRef router = getContext().actorOf(new SmallestMailboxPool(5).props(Props.create(PrimeWorker.class)), "workers");

В данном случае мы статически задаём размер пула, есть возможность сделать динамически изменяющиеся пулы, и даже написать свою логику изменения, но это выходит за рамки статьи.

Модифицируем код отправки заданий ядра (и убираем отправку сообщения для убиения актора):

if (arg0 instanceof String) {			
			int i = Integer.valueOf((String) arg0);
			for (int j = 2; j < i; j++) 
				router.tell(new PrimeWorker.Job(job_id_counter, i, j, j+1), getSelf());			
			jobs.put(job_id_counter, Pair.get(i - 2, true));
			job_id_counter++;
			return;
		}

Всё, готово. Теперь роутер сам позаботится о распределении задач. Базовые вещи продемонстрированы, можно переходить к более тонким моментам.

Контекст выполнения. Блокирующие операции. Передача состояния

Выше говорилось, что акторам передаётся управление, но не было уточнения — как. За это отвечает контекст выполнения, он явно указывается в файле конфигурации, по умолчанию стоит fork-join executor, универсальный и наиболее производительный на общих задачах. Создаётся некоторое количество потоков (неявно указано в конфиге) и они передают управление акторам, выбирая их в n потоков из списка. Критерием переключения к другому актору является или пустая очередь сообщений или обработка k сообщений подряд, если не указано другое. Очевидно, что происходит при попытке актора вызвать блокирующую операцию — поток блокируется, и остальные акторы начинают обрабатываться куда медленнее. А если количество заблокированных акторов становится = количеству потоков то система замирает.

Даже из такой неловкой ситуации есть выход — Future. Это совсем не те Future, к которым привыкли в java, а концентрированное функциональное добро прямиком из scala. Сначала о Future в java. Это объект, который может содержать результат выполнения асинхронной операции. Или нет. Главное отличие этого фьючера — колбэк, возможность совершить некоторые действия после завершения задачи, причём абсолютно асинхронно. Именно в этих фьючерах заключается львиная доля асинхронной «мощи» фреймворка. Как это выглядит?

			Future<String> f = future(new Callable<String>() {

				@Override
				public String call() throws Exception {
					//Some blocking stuff
					return "hello habr";
				}
				
			}, getContext().dispatcher());
			f.onComplete(new OnComplete<String>(){

				@Override
				public void onComplete(Throwable arg0, String arg1)	throws Throwable {
					System.out.println(arg0 != null ? arg0 : arg1);					
				}
				
			}, getContext().dispatcher());

При создании мы помимо Callable (аналог Runnable с возвращаемым значением) передаём некий dispatcher. Это то, что выполняет актор. Очевидное замечание — если мы заставляем контекст актора выполнить блокирующее действие — а не повиснет ли оно? Тут дело в хитрой акке, которая имеет два пула потоков, один — для акторов, который нельзя блокировать, второй — для всякой ерунды, вроде Future, у этого пула потоков переменный размер. Они оба ограничены, но вызывая блокирующую операцию таким образом мы практически (в рамках разумного) не рискуем замедлить основной набор акторов.

Вернувшись к коду выше — видны эти невероятные Callback. На автора они произвели неизгладимое впечатление. Те, кто знакомы с функциональными языками могут заметить, что Future ведёт себя как монада (ей и является). Значит, можно устраивать их композиции, частично применять функции — всё это есть, модуль Futures.*

Но даже если не вдаваться в далёкие от классической явы вещи — это возможность без создания потоков, продумывания архитектуры, просто создавать асинхронные операции посреди кода. А ещё их можно вкладывать, организуя цепочки логики вида «прочитай файл, спроси пользователя и в это же время обрабатывай данные, в зависимости от ответа сохрани результат». И это быстро, просто, компактно.

Конечно, и тут не без ложки дёгтя — отладка. Код реактивнее некуда, и executor акки всячески помогает вам — исключительные ситуации внутри фьючера не выводятся. Совсем. Если там что-то случилось — вы это узнаете только по отсутствию вызова OnComplete, а что именно — вообще никак. Для более удобной отладки есть один грязный хак — recover. Акка известна своей failsafe политикой, фьючеры — не исключения, поэтому они имеют штатные средства для устранения некорректного результата future'а. Как это выглядит:

Future<String> f = future(new Callable<String>() {

				@Override
				public String call() throws Exception {
					//Some blocking stuff
					return "hello habr";
				}
				
			}, getContext().dispatcher());
			f.recover(new Recover<String>() {

				@Override
				public String recover(Throwable arg0) throws Throwable {
					arg0.printStackTrace();
					return null;
				}
				
			}, getContext().dispatcher());
			f.onComplete(new OnComplete<String>(){

				@Override
				public void onComplete(Throwable arg0, String arg1) throws Throwable {
					System.out.println(arg0 != null ? arg0 : arg1);					
				}
				
			}, getContext().dispatcher());

Видно, что в данном примере recover ошибку не исправляет, возвращая null в onComplete, зато хотя бы пишет о ней. Как итог — ясно, что Future являются невероятно мощным инструментом, позволяющий реализовывать асинхронные ветвления прямо посреди синхронного кода и не боятся блокировок, но требуют некоторой осторожности.

Для мира акторов есть полезный и распространённый шаблон на основании Future — ask.

Future<Object> ask = Patterns.ask(ActorRef/ActorSelection, Object, timeout)

Он отправляет сообщение Object актору ActorRef/ActorSelection и ждёт timeout ответа. Если не дождался — будет эксепшин, дождался — вызван метод OnComplete. После выполнения временный актор удаляется. Future по прежнему не использует пул потоков для акторов, так что блокирующих операций можно не опасаться. Как и обычные future ask'и могут будут неограниченно вложенными. Подобный шаблон позволяет не плодить лишних акторов для выполнения простых задач и организовывать event-driven последовательности действий.
Другим полезным производным от Future является агент — переносчик асинхронной смены состояния.

Агент — сериализуемый объект, все копии которого связаны последовательной шиной данных. Состояние каждого экземпляра агента доступно мгновенно, и, вместе с тем, есть возможность асинхронно обновить состояние всех агентов. Как это работает:

Создадим агент

Agent<Config> agent = Agent.create(new Config(), getContext())

Можно его послать в виде сообщения, даже передать на другую машину (об этом позже, немного терпения). В любой момент времени из него можно извлечь экземпляр Config'а методом .get(). Но самое полезное свойство — метод send(config) асинхронно передаст аргумент в качестве состояния всем экземплярам агента, независимо от их местоположения. На этом основные инструменты организации базовой архитектуры закончились, самое время переходить в сеть.

TCP/IP

Если нырнуть не очень глубоко под капот (достаточно посмотреть список библиотек в поставке) становится ясно, что сеть работает на netty. Но в фреймворке приходится иметь дело с куда большим уровнем абстракции.

 final ActorRef tcpManager = Tcp.get(getContext().system()).manager();

Именно с ним и придётся иметь дело в дальнейшем. Для создания/приёма соединений нужно объявить актор-обработчик, например так

@Override
public void preStart() throws Exception {
    final ActorRef tcp = Tcp.get(getContext().system()).manager();
    tcp.tell(TcpMessage.bind(getSelf(),
         new InetSocketAddress("localhost", 0), 100), getSelf());
}
 
@Override
public void onReceive(Object msg) throws Exception {
    if (msg instanceof Bound) {
         manager.tell(msg, getSelf()); 
    } else if (msg instanceof CommandFailed) {
        getContext().stop(getSelf());
    } else if (msg instanceof Connected) {
        final Connected conn = (Connected) msg;
       manager.tell(conn, getSelf());
       final ActorRef handler = getContext().actorOf(
           Props.create(SomePacketHandler.class));
      getSender().tell(TcpMessage.register(handler), getSelf());
   }
}

Данный код взят с оф. сайта, но хорошо иллюстрирует, что для каждого соединения создаётся отдельный актор-хэндлер. Перезаписываемый метод preStart() активируется при пересоздании (или пересоздании) актора. На способе организации исходящих коннектов останавливаться не буду, особых тонкостей там нет (или не было найдено), дабы поскорее перебраться к интересному и важному.

Кластеризация и адресация

Вот и добрались до основного преимущества и конька фреймворка — эффективной кластеризации из коробки, без каких-либо усилий со стороны программиста. Но для этого нужно разобраться с адресацией акторов, о которой мы упоминали вскользь. Существует 3 типа адресации, с примерами:

Локальная относительная: «kernel/core/worker»;
Локальная абсолютная: "/learning2hard/user/kernel/core/worker" — адрес начинается с слеша;
Сетевая абсолютная: «akka.tcp://learning2hard@testhost.com:100500/user/kernel/core/worker».

Зная сетевой адрес можно отправить сообщение удалённому актору, если он входит в тот же кластер, что и текущая система акторов.

ActorSelection remove = getContext().actorSelection( "akka.tcp://learning2hard@testhost.com/user/kernel/core/worker");

Кластеризация требует правки конфига, однако это можно сделать и без него, не покидая кода программы.

ActorSystem system = ActorSystem.create("learning2hard", ConfigFactory.parseString(
              "akka {n" +		  
			"  actor.provider = "akka.cluster.ClusterActorRefProvider"n"+
			"  remote.netty.tcp {n"+
			"    hostname = "127.0.0.1"n"+
			"    port = 0" +
			"  }n"+
			"  cluster.seed-nodes = ["akka.tcp://learning2hardt@testhost.com+"]n"+
            "}"));

Хорошо видно, что используется некий модуль remote, о котором ничего не упоминалось — это именно то, что обеспечивает возможность коммуникации между акторами посредством сети, кластеризация — надстройка над remote, но мы на этом останавливаться не будем. Параметр «port=0» будет заменён на любой

На этом этапе кластер уже готов и может выполнять свои функции, о которых стоит рассказать подробнее.

Все важные события (подключение новых нод, отключение, проблемы с связью, метрика) доступны подписчикам шины кластера. Как это работает посмотрим на примере:

Cluster cluster = Cluster.get(getContext().system()); //инициализировали шину
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class); //подписались на получение интересующих событий

Класс таких шин есть в акке — EventBus. Удобные, но ничего уникального сравнимо с им подобными не делающие — 3 основных метода, publish, subscribe, unsubscribe.

Для балансировки нагрузки с каждой машины снимается метрика, данные о памяти, процессоре, загруженности сети. akka поддерживает несколько сборщиков информации, наиболее продвинутым и точным (и рекомендуемым разработчиками) является sigar. Чтобы akka начала его использовать — достаточно подключить в список импортируемых библиотек sigar.jar и добавить необходимые нативки, всё есть на гуглабельном сайте разработчиков. Сообщения метрики регулярно отправляются в шину кластера.

Как бы это странно не звучало — с теорией кластера всё. Безусловно, ещё есть много нюансов, роли ноды, сообщения о достижении определённого размера, циклы жизни нод (и циклы жизни акторов, этот вопрос мы вообще не рассматривали), но с ними можно ознакомится самостоятельно, тем более, что для начала они не критически необходимы. В качестве примера работы с шиной рассмотрим типичную ситуацию — отправка работы вновь подключившимся нодам.

Cluster cluster = Cluster.get(getContext().system());
cluster.subscribe(getSelf(), MemberUp.class);
@Override
public void onReceive(Object message) {
    if (message instanceof MemberUp) {
         if (member.hasRole("lazyslave")) getContext().actorSelection(member.address() + "/user/kernel").tell("hey, slave", getSelf());
        return;
    }
    unhandled(message);
}

Хорошо видно, как адресуются удалённые ноды, есть нечто новое — роль, в данном случае — «lazyslave» это просто параметр конкретной ноды, точнее — список строк, задаётся в конфиге akka.cluster.roles.

Нельзя не упомянуть о специфических роутерах для кластеров. Они фундаментально ничем не отличаются от вышеописанных роутеров, кроме того, что размещают (или ищут) акторы не только на локальной машине, а на всех (или с определённой ролью) нодах кластера. Типичный пример:

    int totalInstances = 100;
    int maxInstancesPerNode = 3;
    boolean allowLocalRoutees = false;
    String useRole = "compute";
    ActorRef workerRouter = getContext().actorOf(new ClusterRouterPool(new ConsistentHashingPool(0), 
new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode,
    allowLocalRoutees, useRole)).props(Props.create(Worker.class)), "workerRouter");

Вот и всё. Акка это хороший, производительный фреймворк для параллельных вычислений, я бы его назвал лучшим в своём классе, при этом обладающий низким порогом вхождения (сравнимо с «ручным» параллелизмом, синхронизациями, мьютексами, условиями гонки и прочими чудесами).

Вопросы, которые показались автору интересными и/или нетривиальными были рассмотрены, как и основные моменты использования данного фреймворка. В качестве послесловия добавлю небольшой список ответов на возникающие вопросы (или те, которые мне задавали ранее). Буду рад дополнением и пожеланиям, если что-то из не упомянутого интересует — допишу.

Вопросник

— Было упомянуто о failsafe, однако нигде, кроме «молчащих» о эксепшинах future'ов это не встречалось. В чём дело ?
Стратегии поведения акторов в случае возникновения эксепшина намеренно не рассматривались, экономя время читателя. В двух словах — у актора есть стратегия супервайзинга, что он будет делать, если кто-то из его дочерних акторов бросит exception. Их 4 основных: убить дочерний актор, перезапустить дочерний актор, ничего не делать, бросить эксепшин самому. Если интереснее подробнее — на сайт фреймворка.

— Предлагалось создавать даже целые цепочки из Future'ов, полученных посредством Patterns.ask, которые для каждого экземпляра создают временный актор, а потом он уничтожается — насколько это ресурсозатратно? Ведь немногим ранее способ, когда для задачи создавались акторы и уничтожались после неё, назван неэффективным.
Акторы акки очень легки и расходуют мало ресурсов, результаты бенчей можно нагуглить. А временные акторы, создаваемые ask'ом — расходуют их ещё менее, устранясь сразу после окончания таймаута. Они легче, потом как не несут стратегий супервайзинга, их жизненный цикл не контролируется и всё такое. Так что избыточного создания акторов стоит избегать, но не опасаться.

— Какое оптимальное количество акторов в системе?
Какое нужно. Для бизнес-решений предлагается создавать актор на каждый набор хранимых состояний, и иногда их количество переваливает за 4-5 млн/ноду. И ничего, всё успешно работает.

— Насколько производителен поиск акторов по имени?
Он хорошо оптимизирован и использует чёрно-красные деревья, так что вполне производителен. Так же очевидно то, что по относительным путям скорость поиска значительно выше.

Автор: harati

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js