В переводе статьи пойдёт речь об UndeliverableException в RxJava2 версии 2.0.6 и новее. Если кто-то столкнулся и не может разобраться, или совсем не слышал об этой проблеме — прошу под кат. Побудили к переводу проблемы в production после перехода с RxJava1 на RxJava2. Оригинал был написан 28 декабря 2017, но лучше узнать поздно, чем никогда.

Все мы хорошие разработчики и ловим ошибки в onError, когда используем RxJava. Это значит что мы обезопасили себя от падений приложения, верно?
Нет, не верно.
Ниже мы рассмотрим пару примеров в которых приложение будет падать из-за RxJava, даже если корректно реализован onError.
Базовый обработчик ошибок в RxJava
В роли базового обработчика ошибок в RxJava используется RxJavaPlugins.onError. Он обрабатывает все ошибки, которые не удается доставить до подписчика. По умолчанию, все ошибки отправляются именно в него, поэтому могут возникать критические сбои приложения.
В примечаниях к релизу 2.0.6 данное поведение описано:
Одна из целей дизайна 2.х — отсутсвие потерянных ошибок. Иногда последовательность кончается или отменяется до того, как источник вызывает
onError. В данном случае ошибке деться некуда и она направляется вRxJavaPlugins.onError
Если у RxJava нет базового обработчика ошибок — подобные ошибки будут скрыты от нас и разработчики будут находится в неведении относительно потенциальных проблем в коде.
Начиная с версии 2.0.6, RxJavaPlugins.onError пытается быть умнее и разделяет ошибки библиотеки/реализации и ситуации когда ошибку доставить невозможно. Ошибки, отнесенные к категории «багов» вызываются как есть, остальные же оборачиваются в UndeliverableException и после вызываются. Всю эту логику можно посмотеть здесь (методы onError и isBug).
Одна из основных ошибок, с которыми сталкиваются новички в RxJava — OnErrorNotImplementedException. Эта ошибка возникает, если observable вызывает ошибку, а в подписчике не реализован метод onError. Данная ошибка — пример ошибки, которая для базового обработчика ошибок RxJava является «багом» и не оборачивается в UndeliverableException.
UndeliverableException
Поскольку ошибки относящиеся к «багам» легко исправить — не будем на них останавливаться. Ошибки, которые RxJava оборачивает в UndeliverableException, интереснее, так как не всегда может быть очевидно почему же ошибка не может быть доставлена до onError.
Случаи, в которых это может произойти, зависят от того, что конкретно делают источники и подписчики. Примеры рассмотрим ниже, но в общем можно сказать, что такая ошибка возникает, если нет активного подписчика, которому может быть доставлена ошибка.
Пример с zipWith()
Первый вариант, в котором можно вызвать UndeliverableException — оператор zipWith.
val observable1 = Observable.error<Int>(Exception())
val observable2 = Observable.error<Int>(Exception())
val zipper = BiFunction<Int, Int, String> { one, two -> "$one - $two" }
observable1.zipWith(observable2, zipper)
.subscribe(
{ System.out.println(it) },
{ it.printStackTrace() }
)
Мы объединяем вместе два источника, каждый из которых вызывает ошибку. Чего мы ожидаем? Можем предположить, что onError будет вызван дважды, но это противоречит спецификации Reactive streams.
После единственного вызова терминального события (
onError,onCompelete) требуется, чтобы никаких вызовов больше не осуществлялось
Получается, что при единственном вызове onError повторный вызов уже невозможен. Что произойдёт при возникновении в источнике второй ошибки? Она будет доставлена в RxJavaPlugins.onError.
Простой способ попасть в подобную ситуациюю — использовать zip для объединения сетевых вызовов (например, два вызова Retrofit, возвращающие Observable). Если в обоих вызовах возникает ошибка (например, нет интернет соединения) — оба источника вызовут ошибки, первая из которых попадёт в реализацию onError, а вторая будет доставлена базовому обработчику ошибок (RxJavaPlugins.onError).
Пример с ConnectableObservable без подписчиков
ConnectableObservable также может вызвать UndeliverableException. Стоит напомнить, что ConnectableObservable вызывает события независимо от наличия активных подписчиков, достаточно вызвать метод connect(). Если при отсутствии подписчиков в ConnectableObservable возникнет ошибка — она будет доставлена базовому обработчику ошибок.
Вот довольно невинный пример, который может вызвать такую ошибку:
someApi.retrofitCall() // Сетевой вызов с использованием Retrofit
.publish()
.connect()
Если someApi.retrofitCall() вызовет ошибку (например, нет подключения к интернету) — приложение упадет, так как сетевая ошибка будет доставлена базовому обработчику ошибок RxJava.
Этот пример кажется выдуманным, но очень легко попасть в ситуацию, когда ConnectableObservable все еще соединен(connected), но подписчиков у него нет. Я столкнулся с этим при использовании autoConnect() при вызове к API. autoConnect() автоматически не отключает Observable. Я отписывался в onStop методе Activity, но результат сетевого вызова возвращался после уничтожения Activity и приложение падало с UndeliverableException.
Обрабатываем ошибки
Итак, что же делать с этими ошибками?
Первый шаг — посмотреть на возникающие ошибки и попытаться определить что их вызывает. Идеально, если вам удастся исправить проблему у её источника, чтобы предотвратить передачу ошибки в RxJavaPlugins.onError.
Решение для примера с zipWith — взять один или оба источника и реализовать в них один из методов для перехватыва ошибок. Например, вы можете использовать onErrorReturn для передачи вместо ошибки значения по умолчанию.
Пример с ConnectableObservable исправить проще — просто убедитесь в том, что вы отсоединили Observable в момент, когда последний подписчик отписывается. autoConnect(), к примеру, имеет перегруженную реализацию, которая принимает функцию, отлавливающую момент соединения (больше можно посмотреть здесь).
Другой путь решения проблемы — подменить базовый обработчик ошибок своим собственным. Метод RxJavaPlugins.setErrorHandler(Consumer<Throwable>) поможет вам в этом. Если это подходящее для вас решение — можете перехватывать все ошибки отправленные в RxJavaPlugins.onError и обрабатывать их по своему усмотрению. Это решение может оказаться довольно сложным — помните, что RxJavaPlugins.onError получает ошибки от всех потоков (streams) RxJava в приложении.
Если вы вручную создаете свои Observable, то можете вместо emitter.onError() вызывать emitter.tryOnError(). Этот метод передает ошибку только если поток (stream) не уничтожен (terminated) и имеет подписчиков. Помните, что данный метод экспериментальный.
Мораль данной статьи в том, что вы не можете быть уверены в отсутсвии ошибок при работе с RxJava, если просто реализовали onError в подписчиках. Вы должны быть в курсе ситуаций, в которых ошибки могут оказаться недоступны для подписчиков, и убедиться, что эти ситуации обрабатываются.
Автор: tehreh1uneh
