Виктор Васнецов, Рыцарь на распутье; fatcatart.com
Привет! Здесь краткий пересказ интересной баги c GitHub. Для воспроизведения см. проект spring-flux-callstack.
Не так давно я заметил, что при ошибках приложения, стектрейс иногда довольно длинный. И в нем повторялось по многу раз один и тот же набор строк (сам стектрейс под катом):
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
Как вы уже поняли, это методы из Project Reactor, который обеспечивает асинхронную работу для Router Function в WebFlux.
Налицо неэффективность, ведь каждый такой блок кода порождает несколько объектов в куче, а таких блоков много. И создаюся они на каждый вызов.
at org.springframework.web.reactive.function.server.CoRouterFunctionDsl$asHandlerFunction$1$1.invokeSuspend(CoRouterFunctionDsl.kt:599)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:313)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:26)
at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:109)
at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:158)
at kotlinx.coroutines.reactor.MonoKt$monoInternal$1.accept(Mono.kt:55)
at kotlinx.coroutines.reactor.MonoKt$monoInternal$1.accept(Mono.kt)
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2267)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2075)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1949)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
at reactor.core.publisher.Operators.complete(Operators.java:135)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:441)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:211)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:161)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
at reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:64)
at reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:228)
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:465)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:170)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:387)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Router Function
Для начала, что такое Router Function. Для регистрации асинхронного API, начиная с Spring Framework v5, можно использовать следующий подход:
@Bean
open fun httpEndpoints(): RouterFunction<ServerResponse> {
return coRouter {
GET("/api/users") {
ServerResponse
.ok()
.bodyValueAndAwait("Ok !!!!")
}
GET("/api/developers") {
ServerResponse
.badRequest()
.bodyValueAndAwait("Not ok (((")
}
}
}
Метод GET
принимает на вход асинхронную функцию suspend (ServerRequest) -> ServerResponse
, в которой можно по ServerRequest
создать ServerResponse
. Всё просто и логично. И если в вашем приложении есть набор сервисов, которые работают по более-менее одинаковой схеме, то вы можете их свести к следующей модели (казалось бы):
@Bean
open fun httpEndpoints(): RouterFunction<ServerResponse> {
// сразу скажу - так делать не надо
val urlToMethod: Map<String, RequestProcessor> = createRequestProcessors()
return coRouter {
urlToMethod.forEach { url, processor -> // forEach здесь не уместен
GET("/$url") {
processor.createResponse(it)
}
}
}
}
В итоге, если у вас большинство API методов в приложении берут на вход примерно одно и то же, то можно выделить интерфейс и сделать один обработчик (как описано выше). Получается явный код, без магии аннотаций и пр.
Как работает Router Function?
Интерфейс RouterFunction здесь, однако на момент написания статьи он выглядел так:
@FunctionalInterface
public interface RouterFunction<T extends ServerResponse> {
Mono<HandlerFunction<T>> route(ServerRequest request);
default RouterFunction<T> and(RouterFunction<T> other) {
return new RouterFunctions.SameComposedRouterFunction<>(this, other);
}
default RouterFunction<?> andOther(RouterFunction<?> other) {
return new RouterFunctions.DifferentComposedRouterFunction(this, other);
}
default RouterFunction<T> andRoute(RequestPredicate predicate, HandlerFunction<T> handlerFunction) {
return and(RouterFunctions.route(predicate, handlerFunction));
}
default RouterFunction<T> andNest(RequestPredicate predicate, RouterFunction<T> routerFunction) {
return and(RouterFunctions.nest(predicate, routerFunction));
}
default <S extends ServerResponse> RouterFunction<S> filter(HandlerFilterFunction<T, S> filterFunction) {
return new RouterFunctions.FilteredRouterFunction<>(this, filterFunction);
}
default void accept(RouterFunctions.Visitor visitor) {
visitor.unknown(this);
}
}
Как видно, он позволяет асинхронно и каскадно находить правильный обработчик для входного url. Условно, у функции есть три варианта:
- Обработать запрос
- Сделегировать запрос вложенной функции (и откусив префикс)
- Отвергнуть запрос, вернув
Mono.empty()
, что означает "за этот url я не отвечаю".
Как результат, есть абстрактый интерфейс, к которому можно добавлять новые и новые реализации. Можно даже сделать прокси-сервер, который в каждый момент времени не знает, корректен ли URL (т.е. он может спросить о корректности у одного из других серверов). В итоге, мы получаем расширяемую систему, в которой есть уже готовая реализация (с методами GET
, как выше), однако никто не мешает сделать свою (с рулем, седлом, педалями и т.д.).
А теперь вернемся к примеру выше:
@Bean
open fun httpEndpoints(): RouterFunction<ServerResponse> {
// сразу скажу - так делать не надо
val urlToMethod: Map<String, RequestProcessor> = createRequestProcessors()
return coRouter {
urlToMethod.forEach { url, processor -> // forEach здесь не уместен
GET("/$url") {
processor.createResponse(it)
}
}
}
}
Как я думаю, вы уже поняли, что на деле каждый вызов метода GET
создает новую RouterFunction
. И каждая из них проверяет, что URL совпал (или не совпал), а потом переключается дальше. Если на вход придет неподдерживаемый адрес, то каждая функция его проверит. То есть, для адреса размера N
, сложность поиска рабочей функции будет O(N*M*K)
, где M
— средняя длина адреса в функции, а K
— число RouterFunction
(говоря другими словами, если удвоить число маршрутов сайта, то время поиска правильного увеличится вдвое, что странно, так как тот же switch
в Java не обладает таким свойством). Хуже всего то, что на каждой итерации создается еще несколько объектов в куче (для асинхронности), что вряд ли дает ускорение всей системе.
Казалось бы, если все адреса статические (или, по-другому, сервер заранее знает все поддерживаемые), то можно использовать префиксное дерево. Этот алгоритм зачастую используется, например, для поиска на web страницах. Его минус в том, что надо заранее знать все возможные варианты, однако далее он работает со сложностью O(min(M, N))
, где N
— это длина адреса, M
— это или длина совпавшего префикса, или максимальная длина префикса.
Как с этим бороться?
Вариант 1: использовать pattern. Например, если все поддерживаемые адреса можно выразить строкой /[static-prefix]/[suffix]
, где static-prefix
для всех один, то можно выразить запрос так:
GET("/static-prefix/{suffix}"),
req -> ok().body(
execute(
req.pathVariable("suffix")
)
)
Полный пример можно посмотреть здесь. Если устроить switch
по суфиксу, то решение будет работать за линейное время от запроса (напомню, что в Java
/Kotlin
/Scala
switch
по строке высчитывает сначала хеш, и только потом делает сравнение символов, чтобы избежать коллизий).
Вариант 2: использовать иерархию в функциях. Как пример кода отсюда:
public RouterFunction<ServerResponse> productSearch(ProductService ps) {
return route().nest(
RequestPredicates.path("/product"),
builder -> { // мы можем добавить сколько угодно функций сюда
builder.GET("/name/{name}", req -> ok().body(ps.findByName(req.pathVariable("name"))));
}
).build();
}
...
И в заключении: если у вашего приложения нет более-менее серьезной нагрузки, то подобные оптимизации бессмысленны. Сборщик мусора стерпит, а заодно код получается простым для чтения. Однако если ваш сайт постоянно грузят пользователи, то можно ускорить сервис путем уменьшения число роутеров.
Как видно, из-за абстранций и идеи расширяемости, не стоит полагаться на то, что команда Spring всё волшебным образом ускорит. Лучше просто упростить конфигурацию.
Автор: Игорь Манушин