Всем привет!
Последний в этом году курс «Разработчик Java Enterprise» успешно запущен и у нас остался последний материал по данной теме, которым мы хотим поделиться с вами, где разбирается использование асинхронного подхода и стейджинга для разработки отзывчивых реактивных приложений.
Поехали.
Реактивное программирование сперва звучит, как название зарождающейся парадигмы, но на самом деле, относится к методу программирования, в котором для работы с асинхронными потоками данных используется событийно-ориентированный подход. Основываясь на постоянно текущих данных, реактивные системы реагируют на них путем выполнения ряда событий.
Реактивное программирование следует шаблону проектирования “Наблюдатель”, который можно определить следующим образом: если в одном объекте происходит изменение состояния, то все прочие объекты оповещаются и обновляются соответствующим образом. Поэтому, вместо того, чтобы опрашивать события на предмет изменений, события пушатся асинхронно, чтобы наблюдатели могли их обработать. В этом примере, наблюдатели — функции, которые исполняются, когда событие отправлено. А упомянутый поток данных — фактический наблюдаемый.
Почти все языки и фреймворки используют этот подход в своей экосистеме, и последние версии Java — не исключение. В этой статье я объясню как можно применить реактивное программирование, используя последнюю версию JAX-RS в Java EE 8 и функционал Java 8.
Реактивный Манифест
В Реактивном Манифесте перечислены четыре фундаментальных аспекта, необходимых приложению, чтобы быть более гибким, слабо связанным и простым для масштабирования, а следовательно и способным быть реактивным. В нем говорится, что приложение должно быть отзывчивым, гибким (а значит и масштабируемым), устойчивым и message-driven.
Основополагающая цель — действительно отзывчивое приложение. Предположим, есть приложение, в котором обработкой запросов пользователей занимается один большой поток, и после выполнения работы этот поток отправляет ответы обратно оригинальным запрашивателям. Когда приложение получает больше запросов, чем может обработать, этот поток становится bottleneck’ом, и приложение теряет свою былую отзывчивость. Чтобы сохранить отзывчивость, приложение должно быть масштабируемым и устойчивым. Устойчивым можно считать приложение, в котором есть функционал для авто-восстановления. По опыту большинства разработчиков, только message-driven архитектура позволяет приложению быть масштабируемым, устойчивым и отзывчивым.
Реактивное программирование стало внедряться в версии Java 8 и Java EE 8. Язык Java представил такие понятия, как CompletionStage
, и его реализацию CompletableFuture
, а Java начал использовать эти функции в таких спецификациях, как Reactive Client API в JAX-RS.
JAX-RS 2.1 Reactive Client API
Посмотрим, как реактивное программирование может использоваться в приложениях Java EE 8. Чтобы разобраться в процессе, нужны базовые знания API Java EE.
JAX-RS 2.1 представил новый способ создания REST клиента с поддержкой реактивного программирования. Дефолтная реализация invoker, предлагаемая в JAX-RS — синхронная, это значит, что создаваемый клиент отправит блокирующий вызов точке назначения (endpoint) сервера. Пример реализации представлен в Listing 1.
Listing 1
Response response =
ClientBuilder.newClient()
.target("http://localhost:8080/service-url")
.request()
.get();
Начиная с версии 2.0, JAX-RS предоставляет поддержку создания асинхронного invoker на клиентском API с помощью простого вызова метода async()
, как показано в Listing 2.
Listing 2
Future<Response> response =
ClientBuilder.newClient()
.target("http://localhost:8080/service-url")
.request()
.async()
.get();
Использование асинхронного invoker на клиенте возвращает инстанс Future
с типом javax.ws.rs.core.Response
. Это может привести к опросу ответа, с вызовом future.get()
, или регистрации колбека, который будет вызываться при наличии доступного HTTP ответа. Обе реализации подходят для асинхронного программирования, но все обычно усложняется, если вы хотите сгруппировать обратные вызовы или добавить условные кейсы в эти асинхронные минимумы выполнения.
JAX-RS 2.1 предоставляет реактивный способ преодоления этих проблем с новым JAX-RS Reactive Client API для сборки клиента. Это так же просто, как вызов rx()
метода во время сборки клиента. В Listing 3 rx()
метод возвращает реактивный invoker, который существует во время выполнения клиента, и клиент возвращает ответ с типом CompletionStage.rx()
, который позволяет переход от синхронного invoker к асинхронному с помощью простого вызова.
Listing 3
CompletionStage<Response> response =
ClientBuilder.newClient()
.target("http://localhost:8080/service-url")
.request()
.rx()
.get();
CompletionStage<Т>
— новый интерфейс, введенный в Java 8. Он представляет вычисление, которое может являться этапом в рамках большего вычисления, как и следует из названия. Это единственный представитель реактивности Java 8, который попал в JAX-RS.
После получения инстанса ответа, я могу вызывать AcceptAsync()
, где я могу предоставить фрагмент кода, который будет выполняться асинхронно, когда ответ станет доступным, как это показано в Listing 4.
Listing 4
response.thenAcceptAsync(res -> {
Temperature t = res.readEntity(Temperature.class);
//do stuff with t
});
Добавление реактивности в точку endpoint REST
Реактивный подход не ограничивается клиентской стороной в JAX-RS; его можно использовать и на стороне сервера. Для примера, сперва я создам простой сценарий, где смогу запросить список местоположений одной точки назначения. Для каждого положения я сделаю отдельный вызов с данными местоположения до другой точки, чтобы получить значения температуры. Взаимодействие точек назначения будет таким, как показано на Figure 1.
Figure 1. Взаимодействие между точками назначения
Сначала я просто определяю модель области определения, а затем сервисы для каждой модели. В Listing 5 показано, как определяется класс Forecast
, который оборачивает классы Location
и Temperature
.
Listing 5
public class Temperature {
private Double temperature;
private String scale;
// getters & setters
}
public class Location {
String name;
public Location() {}
public Location(String name) {
this.name = name;
}
// getters & setters
}
public class Forecast {
private Location location;
private Temperature temperature;
public Forecast(Location location) {
this.location = location;
}
public Forecast setTemperature(
final Temperature temperature) {
this.temperature = temperature;
return this;
}
// getters
}
Для обертки списка прогнозов, класс ServiceResponse
имплементирован в Listing 6.
Listing 6
public class ServiceResponse {
private long processingTime;
private List<Forecast> forecasts = new ArrayList<>();
public void setProcessingTime(long processingTime) {
this.processingTime = processingTime;
}
public ServiceResponse forecasts(List<Forecast> forecasts) {
this.forecasts = forecasts;
return this;
}
// getters
}
LocationResource
, показанный в Listing 7, определяет три образца местоположений, возвращаемых с путем /location
.
Listing 7
@Path("/location")
public class LocationResource {
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getLocations() {
List<Location> locations = new ArrayList<>();
locations.add(new Location("London"));
locations.add(new Location("Istanbul"));
locations.add(new Location("Prague"));
return Response.ok(new GenericEntity<List<Location>>(locations){}).build();
}
}
TemperatureResource
, показанный в Listing 8, возвращает случайно сгенерированное значение температуры между 30 и 50 для заданной локации. Задержка в 500 мс добавлена в имплементацию для симуляции считывания датчика.
Listing 8
@Path("/temperature")
public class TemperatureResource {
@GET
@Path("/{city}")
@Produces(MediaType.APPLICATION_JSON)
public Response getAverageTemperature(@PathParam("city") String cityName) {
Temperature temperature = new Temperature();
temperature.setTemperature((double) (new Random().nextInt(20) + 30));
temperature.setScale("Celsius");
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
ignored.printStackTrace();
}
return Response.ok(temperature).build();
}
}
Сначала я покажу реализацию синхронного ForecastResource
(смотрите Listing 9), который выдает все местоположения. Затем, для каждого положения он вызывает температурный сервис, чтобы получить значения в градусах по Цельсию.
Listing 9
@Path("/forecast")
public class ForecastResource {
@Uri("location")
private WebTarget locationTarget;
@Uri("temperature/{city}")
private WebTarget temperatureTarget;
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getLocationsWithTemperature() {
long startTime = System.currentTimeMillis();
ServiceResponse response = new ServiceResponse();
List<Location> locations = locationTarget
.request()
.get(new GenericType<List<Location>>(){});
locations.forEach(location -> {
Temperature temperature = temperatureTarget
.resolveTemplate("city", location.getName())
.request()
.get(Temperature.class);
response.getForecasts().add(
new Forecast(location).setTemperature(temperature));
});
long endTime = System.currentTimeMillis();
response.setProcessingTime(endTime - startTime);
return Response.ok(response).build();
}
}
Когда точка назначения прогноза запрашивается как /forecast
, вы получите вывод, похожий на тот, что указан в Listing 10. Обратите внимание, что время обработки запроса заняло 1.533 мс, что логично, так как синхронный запрос значений температуры из трех различных местоположений добавляет до 1.5 мс.
Listing 10
{
"forecasts": [
{
"location": {
"name": "London"
},
"temperature": {
"scale": "Celsius",
"temperature": 33
}
},
{
"location": {
"name": "Istanbul"
},
"temperature": {
"scale": "Celsius",
"temperature": 38
}
},
{
"location": {
"name": "Prague"
},
"temperature": {
"scale": "Celsius",
"temperature": 46
}
}
],
"processingTime": 1533
}
Пока все идет по плану. Настало время ввести реактивное программирование на серверной стороне, где вызовы к каждой локации могут выполняться параллельно после получения всех местоположений. Это явно может улучшить синхронный поток, показанный ранее. Это выполняется в Listing 11, где показано определение реактивной версии сервиса прогнозов.
Listing 11
@Path("/reactiveForecast")
public class ForecastReactiveResource {
@Uri("location")
private WebTarget locationTarget;
@Uri("temperature/{city}")
private WebTarget temperatureTarget;
@GET
@Produces(MediaType.APPLICATION_JSON)
public void getLocationsWithTemperature(@Suspended final AsyncResponse async) {
long startTime = System.currentTimeMillis();
// Создать этап (stage) для извлечения местоположений
CompletionStage<List<Location>> locationCS =
locationTarget.request()
.rx()
.get(new GenericType<List<Location>>() {});
// Создав отдельный этап на этапе местоположений,
// описанном выше, собрать список прогнозов,
//как в одном большом CompletionStage
final CompletionStage<List<Forecast>> forecastCS =
locationCS.thenCompose(locations -> {
// Создать этап для получения прогнозов
// как списка СompletionStage
List<CompletionStage<Forecast>> forecastList =
// Стрим местоположений и обработка каждого
// из них по отдельности
locations.stream().map(location -> {
// Создать этап для получения
// значений температуры только одного города
// по его названию
final CompletionStage<Temperature> tempCS =
temperatureTarget
.resolveTemplate("city", location.getName())
.request()
.rx()
.get(Temperature.class);
// Затем создать CompletableFuture, в котором
// содержится инстанс прогноза
// с местоположением и температурным значением
return CompletableFuture.completedFuture(
new Forecast(location))
.thenCombine(tempCS,
Forecast::setTemperature);
}).collect(Collectors.toList());
// Вернуть финальный инстанс CompletableFuture,
// где все представленные объекты completable future
// завершены
return CompletableFuture.allOf(
forecastList.toArray(
new CompletableFuture[forecastList.size()]))
.thenApply(v -> forecastList.stream()
.map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join)
.collect(Collectors.toList()));
});
// Создать инстанс ServiceResponse,
// в котором содержится полный список прогнозов
// вместе со временем обработки.
// Создать его future и объединить с
// forecastCS, чтобы получить прогнозы
// и вставить в ответ сервиса
CompletableFuture.completedFuture(
new ServiceResponse())
.thenCombine(forecastCS,
ServiceResponse::forecasts)
.whenCompleteAsync((response, throwable) -> {
response.setProcessingTime(
System.currentTimeMillis() - startTime);
async.resume(response);
});
}
}
Реактивная реализация может показаться сложной, на первый взгляд, но после более внимательного изучения вы заметите, что она довольно проста. В реализации ForecastReactiveResource
я сначала создаю клиентский вызов на сервисы местоположений с помощью JAX-RS Reactive Client API. Как я упоминал выше, это дополнение для Java EE 8, и оно помогает создавать реактивный вызов просто с помощью метода rx()
.
Теперь я создаю новый этап на основе местоположения, чтобы собрать список прогнозов. Они будут храниться, как список прогнозов, в одном большом completion stage, названном forecastCS
. В конечном итоге, я создам ответ вызова сервиса, используя только forecastCS
.
А теперь, соберем прогнозы в виде списка completion stage’eй, определенных в переменной forecastList
. Чтобы создать completion stage для каждого прогноза, я передаю данные по местоположениям, а затем создаю переменную tempCS
, снова используя JAX-RS Reactive Client API, который вызывает сервис температуры с названием города. Здесь для сборки клиента я использую метод resolveTemplate()
, и это позволяет мне передавать название города сборщику в качестве параметра.
В качестве последнего шага потоковой передачи, я совершаю вызов CompletableFuture.completedFuture()
, передавая новый инстанс Forecast
в качестве параметра. Я объединяю этот future с tempCS
этапом, чтобы у меня было значение температуры для проитерированных локаций.
Метод CompletableFuture.allOf()
в Listing 11 преобразует список completion stage’ей в forecastCS
. Выполнение этого шага возвращает большой инстанс completable future, когда все предоставленные объекты completable future завершены.
Ответ сервиса — инстанс класса ServiceResponse
, поэтому я создаю completed future, а затем объединяю forecastCS
completion stage со списком прогнозов и вычисляю время отклика сервиса.
Конечно, реактивное программирование заставляет только серверную сторону выполняться асинхронно; клиентская сторона будет заблокирована до тех пор, пока сервер не отправит ответ обратно запрашивающему. Чтобы преодолеть эту проблему, Server Sent Events (SSEs) может быть использован для частичной отправки ответа, как только он окажется доступен, чтобы температурные значения для каждой локации передавались клиенту одно за другим. Вывод ForecastReactiveResource
будет похож на тот, что представлен в Listing 12. Как показано в выводе, время обработки составляет 515 мс, что является идеальным временем выполнения для получения температурных значений из одной локации.
Listing 12
{
"forecasts": [
{
"location": {
"name": "London"
},
"temperature": {
"scale": "Celsius",
"temperature": 49
}
},
{
"location": {
"name": "Istanbul"
},
"temperature": {
"scale": "Celsius",
"temperature": 32
}
},
{
"location": {
"name": "Prague"
},
"temperature": {
"scale": "Celsius",
"temperature": 45
}
}
],
"processingTime": 515
}
Вывод
В примерах этой статьи, я сначала показал синхронный способ получения прогнозов с помощью сервисов местоположения и температуры. Затем, я перешел к реактивному подходу для того, чтобы асинхронная обработка выполнялась между вызовами сервиса. Когда вы используете JAX-RS Reactive Client API в Java EE 8 вместе с классами CompletionStage
и CompletableFuture
, доступными в Java 8, сила асинхронной обработки вырывается на волю, благодаря реактивному программированию.
Реактивное программирование — это больше, чем просто реализация асинхронной модели из синхронной; оно также упрощает работу с такими концепциями, как nesting stage. Чем больше оно используется, тем проще будет управлять сложными сценариями в параллельном программировании.
THE END
Спасибо за внимание. Как всегда ждём ваши комментарии и вопросы.
Автор: MaxRokatansky