Фреймворк Akka позволяет реализовать многопоточность в Java-приложении, используя концепцию акторов, взаимодействующих посредством посылки друг другу сообщений. Создав несколько копий акторов одного и того же типа, мы можем таким образом распределить нагрузку в приложении между несколькими потоками.
В данной статье приводится пример использования Akka в Spring-приложении, что представляет некоторую сложность, поскольку в силу ее особенностей, акторов нельзя создавать посредством простого вызова new.
Создадим вначале некую модельную проблему
Сгенерируем обычное spring-boot приложение:
@SpringBootApplication
public class AkkaProjectApplication {
public static void main(String[] args) {
SpringApplication.run(AkkaProjectApplication.class, args);
}
}
С тестом на то, что контекст стартует без проблем:
@RunWith(SpringRunner.class)
@SpringBootTest
public class AkkaProjectApplicationTests {
@Test
public void contextLoads() {
}
}
Предположим, что у нас есть некий внешний сервис ExternalService с довольно-таки небыстрой операцией:
@Service
public class ExternalServiceFakeImpl implements ExternalService {
@Value("${delay.base:300}")
long delayBase;
@Value("${delay.spread:300}")
long delaySpread;
@Override
public ServiceResponse timeConsumingOperation(ServiceRequest request) {
try {
sleep(delayBase + (int) (delaySpread * random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
return buildServiceResponse(SUCCESS);
}
private ServiceResponse buildServiceResponse(StatusCodeType statusCode) {
ServiceResponse result = new ServiceResponse();
result.setStatusCode(statusCode);
return result;
}
}
И нам требуется доставать данные из некоторого хранилища и отправлять на обработку в этот небыстрый сервис. Данные предоставляет DataProvider:
@Component
public class DataProvider implements Dao {
@Override
public List<DataItem> retrieveItems(int maxSize) {
List<DataItem> result = new ArrayList<>();
for (int i = 0; i < maxSize * random(); i++) {
result.add(buildDataItem());
}
return result;
}
private DataItem buildDataItem() {
DataItem dataItem = new DataItem();
dataItem.setTime(LocalDateTime.now());
dataItem.setValue(random());
return dataItem;
}
}
Где класс DataItem используется как контейнер данных:
public class DataItem {
private LocalDateTime time;
private Double value;
public LocalDateTime getTime() {
return time;
}
public void setTime(LocalDateTime time) {
this.time = time;
}
public Double getValue() {
return value;
}
public void setValue(Double value) {
this.value = value;
}
}
При условии, что инстансов ExternalService у нас много/несколько, и они стоят за балансером, есть смысл слать запросы в него в несколько потоков. Собственно для этого и используем Akka
Подготовка инфраструктуры для использования Akka в Spring-приложении
Добавляем нужные зависимости:
<properties>
<akka.version>2.5.4</akka.version>
</properties>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.12</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.12</artifactId>
<version>${akka.version}</version>
</dependency>
Для интеграции фреймворка со Spring, необходимо использовать свой класс SpringActorProducer, имплементирующий IndirectActorProducer:
public class SpringActorProducer implements IndirectActorProducer {
private final ApplicationContext applicationContext;
private final String actorBeanName;
public SpringActorProducer(ApplicationContext applicationContext,
String actorBeanName) {
this.applicationContext = applicationContext;
this.actorBeanName = actorBeanName;
}
@Override
public Actor produce() {
return (Actor) applicationContext.getBean(actorBeanName);
}
@Override
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
}
}
а также SpringExtension, имплементирующий Extension:
@Component
public class SpringExtension implements Extension {
private ApplicationContext applicationContext;
public void initialize(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
public Props props(String actorBeanName) {
return Props.create(SpringActorProducer.class, applicationContext,
actorBeanName);
}
}
Для создания Actor system, создадим отдельный конфиг AkkaConfig:
@Configuration
@Lazy
public class AkkaConfig {
@Autowired
ApplicationContext applicationContext;
@Autowired
SpringExtension springExtension;
@Bean
public ActorSystem actorSystem() {
ActorSystem system = ActorSystem.create("KMT", akkaConfiguration());
// Initialize the application context in the Akka Spring Extension
springExtension.initialize(applicationContext);
return system;
}
/**
* Read configuration from application.conf file
*/
@Bean
public Config akkaConfiguration() {
return ConfigFactory.load();
}
}
# Loggers to register at boot time (akka.event.Logging$DefaultLogger logs to STDOUT)
loggers = [«akka.event.slf4j.Slf4jLogger»]
# Log level used by the configured loggers (see «loggers») as soon
# as they have been started; before that, see «stdout-loglevel»
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = «INFO»
# Log level for the very basic logger activated during ActorSystem startup.
# This logger prints the log messages to stdout (System.out).
# Options: OFF, ERROR, WARNING, INFO, DEBUG
stdout-loglevel = «INFO»
}
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level %X{akkaSource} - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
{akkaSource} позволяет видеть в логах, какой именно поток делает конкретное действие
Теперь акторы в сприн-бинах можем создавать при помощи props метода у SpringExtension. Это и сделаем в методе, аннотированном @PostConstruct в классе Scheduler, который достает записи при помощи DataProvider и шлет акторам:
@Component
public class Scheduler {
private static final Logger LOGGER = LoggerFactory.getLogger(Scheduler.class);
private final long DELAY = 10_000;
@Value("${scheduler.batchSize:5}")
int batchSize;
@Autowired
ApplicationContext context;
@Autowired
Dao dataProvider;
ActorRef migrationActor;
@Scheduled(fixedDelay = DELAY)
public void performRegularAction() {
LOGGER.info("---------------------------------------------------");
LOGGER.info("Try to retrieve {} records from DB...", batchSize);
List<DataItem> dataItems = dataProvider.retrieveItems(batchSize);
LOGGER.info("{} records retrieved", dataItems.size());
int itemNumber = 0;
for (DataItem dataItem : dataItems) {
LOGGER.info("Send to actor item №{}...", itemNumber++);
migrationActor.tell(new MigrationActor.Send(dataItem),
ActorRef.noSender());
}
}
@PostConstruct
public void postConstructMethod() {
ActorSystem system = context.getBean(ActorSystem.class);
SpringExtension springExtension = context.getBean(SpringExtension.class);
// Use the Spring Extension to create props for a named actor bean
migrationActor = system.actorOf(
springExtension.props("migrationActor")
.withRouter(new RoundRobinPool(4)));
}
}
Метод postConstructMethod создает reference на migrationActor, сообщения к которому будут попадать через роутер RoundRobinPool длиной 4, в итоге одновременно будут работать 4 инстанса MigrationActor, каждый в своем потоке. В итоге сообщения в коде шлем одному и тому же объекту — а роутер раскидывает их по разным инстансам
Пишем тест для Scheduler-а (извините, на JMock):
public class SchedulerTest {
@org.junit.Rule
public final JUnitRuleMockery mockery = new JUnitRuleMockery() {{
setImposteriser(ClassImposteriser.INSTANCE);
}};
Scheduler scheduler;
static ActorSystem system;
static TestProbe probe;
@BeforeClass
public static void setup() {
system = ActorSystem.create("TestSystem");
probe = TestProbe.apply(system);
}
@AfterClass
public static void tearDown() {
TestKit.shutdownActorSystem(system);
system = null;
}
@SuppressWarnings("unchecked")
@Before
public void setUp() {
scheduler = new Scheduler();
scheduler.dataProvider = mockery.mock(Dao.class);
scheduler.migrationActor = probe.ref();
scheduler.batchSize = 5;
}
@Test
public void performRegularAction() throws Exception {
DataItem dataItem = new DataItem();
List<DataItem> dataItems = Arrays.asList(dataItem);
mockery.checking(new Expectations() {
{
oneOf(scheduler.dataProvider).retrieveItems(scheduler.batchSize);
will(returnValue(dataItems));
}
});
scheduler.performRegularAction();
MigrationActor.Send send = probe.expectMsgClass(MigrationActor.Send.class);
assertThat("Wrong message in Send", send.dataItem, is(dataItem));
}
}
Наконец описываем сам актор:
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MigrationActor extends AbstractActor {
private final LoggingAdapter LOGGER =
Logging.getLogger(getContext().getSystem(), this);
@Autowired
@Qualifier("externalServiceFakeImpl")
ExternalService externalService;
public MigrationActor() {
}
public MigrationActor(ExternalService externalService) {
this.externalService = externalService;
}
public static Props props(ExternalService externalService) {
return Props.create(MigrationActor.class, externalService);
}
@Override
public void preStart() throws Exception {
super.preStart();
LOGGER.info("Migration actor started");
}
@Override
public void postStop() throws Exception {
LOGGER.info("Migration actor stopped");
super.postStop();
}
public static class Send {
public final DataItem dataItem;
public Send(DataItem dataItem) {
this.dataItem = dataItem;
}
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Send.class, this::onSend)
.build();
}
private void onSend(Send send) {
LOGGER.info("Start call service by onSend...");
ServiceRequest request = new ServiceRequest();
request.setDataItem(send.dataItem);
externalService.timeConsumingOperation(request);
LOGGER.info("Finish call to service by onSend");
}
}
На что здесть стоит обратить внимание:
— актор имеет scope=prototype, поскольку стандартный scope=singleton не подходит для стратегии «актор-на-поток»
— как создается логгер
— классы месседжей, передаваемые актору, определяются прямо внутри него, т.к. по сути это — часть его контракта. Вообще — месседжи, как используемые для обмена между потоками, должны быть immutable
— определяем статический метод props, чтобы создавать Акторы, используя Props.create(). Это понадобится для теста на актор
— как следствие — нам приходится определить конструктор с сервисом в виде параметра
— т.к. акторы создаются как спринг-бины, а конструктор с параметрами уже есть — также объявляем и пустой конструктор
— наличие методов preStart и postStop, где можно отследить моменты старта/запуска актора
— метод createReceive, где собственно и происходит матчинг получаемых сообщений на методы, которые вызываются как реакция на сообщения
Пишем тест для актора:
public class MigrationActorTest {
@org.junit.Rule
public final JUnitRuleMockery mockery = new JUnitRuleMockery() {{
setImposteriser(ClassImposteriser.INSTANCE);
setThreadingPolicy(new Synchroniser());
}};
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("TestSystem");
}
@AfterClass
public static void tearDown() {
TestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void testMigrationActorSend() {
ExternalService externalService = mockery.mock(ExternalService.class);
DataItem dataItem = new DataItem();
mockery.checking(new Expectations() {
{
oneOf(externalService).timeConsumingOperation(with(allOf(
any(ServiceRequest.class),
hasProperty("dataItem", is(dataItem))
)));
}
});
TestActorRef<MigrationActor> ref = TestActorRef.create(system,
MigrationActor.props(externalService), "migrationActor");
ref.tell(new MigrationActor.Send(dataItem), ActorRef.noSender());
}
}
Тут понадобилось для JUnitRuleMockery указать ThreadingPolicy, т.к. иначе mockery была бы non-ThreadSafe.
Вот вкратце и все. Стоит только заметить, что в случае необходимости также можно передавать результаты работы одного актора — другому или самому отправителю, если он — тоже актор. Для тестирования таких акторов стоит использовать класс TestKit
Исходный код приведен здесь
Автор: Андрей