Если вы новичок в общении с RxJava или пытались разобраться в этом, но не довели дело до конца, то ниже вы найдете для себя кое-что новое.
Оригинал статьи написан 29 ноября 2017. Перевод вольный.
Нам в GO-JEK требуется выполнять большое количество асинхронных операций в приложениях и мы не можем позволить себе идти на компромиссы в ущерб скорости работы и плавности пользовательского интерфейса.
Написание сложных многопоточных Android приложений может быть достаточно трудоемким процессом, который время от времени будет вас сильно перегружать из-за необходимости заботиться о большом количестве связанных друг с другом вещей. Это и многие другие причины убедили нас использовать RxJava в разрабатываемых Android приложениях.
В этой статье мы поговорим о том как мы использовали реальные возможности работы с многопоточностью в RxJava для того, чтобы сделать процесс разработки приложения максимально простым, легким и веселым. Во всех примерах кода ниже будет использоваться RxJava 2, но описанные концепции можно будет применять и в других реактивных расширениях.
Почему реактивное программирование?
Каждая статья о реактивном программировании начинается с такого обязательного блока и мы не нарушим эту традицию. Существует несколько преимуществ использования реактивного подхода к построению Android приложений, обратим внимание на те, которые вам действительно нужны.
Никаких больше обратных вызовов
Если вы давно разрабатываете под Android, то, должно быть, заметили, как быстро вещи становятся через чур сложными и неподконтрольными с использованием вложенных обратных вызовов.
Это происходит, когда вы выполняете несколько асинхронных операций последовательно и хотите, чтобы дальнейшие действия зависели от результата предыдущих операций. Почти сразу код становится слишком перегруженным и сложным для поддержки.
Простой контроль ошибок
В императивном мире, в ситуации когда выполняется множество сложных асинхронных операций, ошибки могут возникать в большом количестве мест. И в каждом месте вы должны обрабатывать эти ошибки, в результате появляется много повторяющегося шаблонного кода, методы становятся громоздкими.
Очень простое использование многопоточности
Все мы знаем (и тайно признаем) насколько иногда сложной может быть работа с многопоточностью в Java. Например, выполнение части кода в фоновом потоке и возврат результата обратно в главный поток. Это только звучит просто, но на практике появляется много подводных камней, которые нужно обходить.
RxJava делает безумно легким выполнение нескольких сложных операций в любом потоке на ваш выбор, заботясь о корректной синхронизации и позволяя без проблем переключаться между потоками.
Преимущества RxJava бесконечны. Мы можем говорить об этом часами и адски утомить вас, но вместо этого давайте копнем глубже и начнем изучать реальную работу с многопоточностью в RxJava.
RxJava НЕ многопоточна по умолчанию
Да, вы прочли всё верно. RxJava по умолчанию не многопоточна в любом случае. Определение, данное для RxJava на официальном сайте, выглядит примерно следующим образом:
«Библиотека для составления асинхронных и основанных на событиях программ с использованием последовательностей (observable sequences) для виртуальной Java машины».
Увидев слово «асинхронных», многие люди ошибочно полагают, что RxJava многопоточна по умолчанию. Да, RxJava поддерживает многопоточность, предлагает множество мощных возможностей для легкой работы с асинхронными операциями, но это не значит что поведение RxJava по умолчанию многопоточно.
Если вы уже немного работали с RxJava, то её знаете базовые конструкции:
- Наблюдаемый источник (source Observable), далее
- несколько операторов (Operators), затем
- целевой подписчик (Subscriber)
Observable.just(1, 2, 3, 4, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
println("Emitting item on: " + currentThread().getName());
}
})
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
println("Processing item on: " + currentThread().getName());
return integer * 2;
}
})
.subscribeWith(new DisposableObserver<Integer>() {
@Override
public void onNext(@NonNull Integer integer) {
println("Consuming item on: " + currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Если вы запустите данный пример кода, то ясно увидите, что все действия выполняются в основном потоке приложения (проследите за именами потоков в логе в консоли). Этот пример показывает, что по умолчанию поведение RxJava блокирующее. Всё выполняется в том же потоке, в котором вызван код.
Бонус: Интересно, что же делает doOnNext()
? Это не что иное, как side-effect оператор. Он помогает внедряться в цепочку объектов observable
и выполнять грязные (impure) операции. Например, внедрять дополнительный код в цепочке вызовов для отладки. Прочитать больше можно здесь.
Простой пример
Для того, чтобы начать работать с многопоточностью с применением RxJava необходимо познакомиться с базовыми классами и методами, такими как Schedulers, observeOn/subscribeOn.
Давайте рассмотрим один из самых простых примеров. Допустим, мы хотим получить список объектов Book
сетевым запросом и показать его в основном потоке приложения. Довольно общий и понятный пример для начала.
getBooks().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<Book>() {
@Override
public void onNext(@NonNull Book book) {
// Вы можете получить доступ к объектам Book здесь
}
@Override
public void onError(@NonNull Throwable e) {
// Отлавливаем ошибки здесь
}
@Override
public void onComplete() {
// Все объекты Book получены. Готово!
}
});
Здесь мы видим метод getBooks()
, который осуществляет сетевой вызов и собирает список книг для нас. Сетевой вызов занимает время (несколько миллисекунд или секунд), поэтому мы используем subscribeOn()
и указываем планировщик Schedulers.io()
для выполнения операции в потоке ввода-вывода.
Также мы используем оператор observeOn()
вместе с планировщиком AndroidSchedulers.mainThread()
для того, чтобы обрабатывать результат в основном потоке и показать список книг в пользовательском интерфейсе приложения.
Не волнуйтесь, скоро мы перейдем к более продвинутым вещам. Этот пример был предназначен только для того, чтобы вспомнить базовые понятия, прежде чем погрузиться глубже.
Подружимся с планировщиками (Schedulers)
RxJava предоставляет мощный набор планировщиков. Вы не можете получить прямой доступ к потокам или управлять ими. Если вам нужно работать с потоками, то необходимо воспользоваться встроенными планировщиками.
Можете представлять планировщики как потоки или пулы потоков (коллекции потоков) для выполнения разного рода задач.
Говоря проще, если вам нужно выполнить задачу в отдельном потоке — необходимо использовать верный планировщик, который возьмёт поток из своего пула доступных потоков и выполнит в нём задачу.
В RxJava доступны несколько типов планировщиков. Самая сложная часть — выбрать верный планировщик для вашей задачи. Задача никогда не будет выполняться оптимально, если вы не выберете верный планировщик. Давайте разберем каждый планировщик.
Schedulers.io()
Этот планировщик основывается на неограниченном пуле потоков и используется для интенсивной работы с вводом-выводом без использования ЦП, например, доступ к файловой системе, выполнение сетевых вызовов, доступ к базе данных и так далее. Количество потоков в этом планировщике неограничено и может расти по мере необходимости.
Schedulers.computation()
Этот планировщик используется для выполнения работы, высоко нагружающей ЦП, такой как обработка больших объемов данных, изображений и так далее. Планировщик основывается на ограниченном пуле потоков с размером в количество доступных процессоров.
Так как этот планировщик подходит только для интенсивной работы с ЦП — количество его потоков ограничено. Сделано это для того, чтобы потоки не конкурировали за процессорное время и не простаивали.
Schedulers.newThread()
Этот планировщик создает совершенно новый поток при каждом вызове. В данном случае использование пула потоков не принесет никакой выгоды. Потоки очень затратно создавать и уничтожать. Вы должны быть осторожны и не злоупотреблять чрезмерным созданием потоков, так как это может привести в замедлению работы системы и переполнению памяти. Новый поток будет создаваться для обработки каждого элемента, полученного из observable-источника.
В идеале вы должны использовать этот планировщик довольно редко, в основном для выведения в отдельный поток долго работающих частей программы.
Schedulers.single()
Этот планировщик основывается на единственном потоке, который используется для последовательного выполнения задач. Он может быть очень полезен, когда у вас есть набор фоновых заданий в разных местах вашего приложения, но нельзя допустить одновременного выполнения более чем одного из этих заданий.
Schedulers.from(Executor executor)
Этот планировщик будет основываться на вашем собственном Executor
. Может возникнуть ситуация, в которой необходимо будет выполнять определенные задачи в планировщике на основании собственной логики распределения потоков.
Допустим, вы хотите ограничить число параллельных сетевых вызовов, которые делает ваше приложение. Можно создать собственный планировщик, который будет работать на базе ограниченного в размерах пула потоков (Scheduler.from(Executors.newFixedThreadPool(n))
) и использовать его во всех местах, связанных с сетевыми вызовами.
AndroidSchedulers.mainThread()
Это специальный планировщик, который недоступен в библиотеке RxJava. Необходимо использовать расширяющую библиотеку RxAndroid для доступа к этому планировщику. Этот планировщик полезен в Android приложениях для выполнения действий в потоке пользовательского интерфейса.
По умолчанию этот планировщик ставит задания в очередь в Looper
, связанный с основным потоком, но есть возможность переопределения: AndroidSchedulers.from(Looper looper)
.
Заметка: Будьте осторожны в использовании планировщиков, основанных на неограниченных пулах потоков, таких как Schedulers.io()
. Всегда есть риск бесконечного роста количества потоков.
Понимание subscribeOn() и observeOn()
Теперь, когда у вас есть представление о типах планировщиков, разберем subscribeOn() и observeOn() в деталях.
Вы должны глубоко разбираться в том, как эти два оператора работают по отдельности и вместе, чтобы профессионально работать с многопоточностью в RxJava.
subscribeOn()
Простыми словами, этот оператор говорит в какой поток наблюдаемый источник (source observable) будет передавать элементы. Вы должны уяснить важность слова «источник». Когда у вас цепь наблюдаемых элементов (observables), источник (source observable) — это всегда корневой элемент или верхняя часть цепи, откуда происходит создание событий.
Как вы уже видели, если не использовать subscribeOn()
, то все события происходят в том потоке, в котором произошел вызов кода (в нашем случае — main
поток).
Давайте перенаправим события в вычислительный поток с помощью subscribeOn()
и планировщика Schedulers.computation()
. Когда вы запустите нижеследующий пример кода, то увидите, что события происходят в одном из вычислительных потоков, доступных в пуле — RxComputThreadPool-1
.
В целях сокращения кода мы не будем полностью переопределять все методы DisposableSubscriber
, так как нам не нужно переопределять onError()
и onComplete()
. Воспользуемся doOnNext()
и лямбдами.
Observable.just(1, 2, 3, 4, 5, 6)
.subscribeOn(Schedulers.computation())
.doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
Не важно в каком месте в цепочке вызовов вы используете subscribeOn()
. Он работает только с наблюдаемым источником (source observable), и контролирует в какой поток наблюдаемый источник передает события.
В нижеследующем примере после observable-источника создаются другие объекты observable (методами map()
и filter()
), а оператор subscribeOn()
помещен в конце цепочки вызовов. Но как только вы запустите этот код, то заметите, что все события будут возникать в потоке, указанном в subscribeOn()
. Это станет более понятным при добавлении observeOn()
в цепь вызовов. И даже если мы разместим subscribeOn()
ниже observeOn()
, то логика работы не изменится. subscribeOn()
работает только с наблюдаемым источником (source observable).
Observable.just(1, 2, 3, 4, 5, 6)
.doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
.map(integer -> integer * 3)
.filter(integer -> integer % 2 == 0)
.subscribeOn(Schedulers.computation())
.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
Также важно понять, что нельзя использовать subscribeOn()
несколько раз в одной цепочке вызовов. Можно, конечно, написать ещё раз, но никаких изменений это не повлечет. В примере ниже мы последовательно вызываем три различных планировщика, можете ли вы догадаться, какой планировщик сработает при запуске?
Observable.just(1, 2, 3, 4, 5, 6)
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())
.doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
Если вы ответили Schedulers.io()
, то вы правы! Даже если делать вызов многократно — сработает только первый subscribeOn()
, вызванный после observable-источника.
Под капюшоном
Стоит потратить ещё немного времени на более подробное изучение рассмотренного примера. Почему срабатывает только планировщик Schedulers.io()
? Обычно все думают, что сработает Schedulers.newThread()
, так как этот вызов находится в конце цепочки.
Необходимо понять, что в RxJava подписка создаётся после обратного вызова всех экземпляров Observable
. Код ниже поможет нам разобраться в этом. Это ранее рассмотренный пример, но расписанный подробнее.
Observable<Integer> o1 = Observable.just(1, 2, 3, 4, 5);
Observable<Integer> o2 = o1.filter(integer -> integer % 2 == 0);
Observable<Integer> o3 = o2.map(integer -> integer * 10);
o3.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
Для того, чтобы понять как всё работает — начнем разбирать всё с последней строки примера. В ней целевой подписчик (target subscriber), вызывает метод subscribe()
у observable объекта o3
, который затем делает неявный вызов subscribe()
у своего родительского observable объекта o2
. Реализация наблюдателя (observer), предоставляемая объектом o3
, умножает переданные числа на 10.
Процесс повторяется и o2
неявно вызывает subscribe()
у объекта o1
, передавая реализацию наблюдателя, которая позволяет обрабатывать только четные числа. Теперь мы достигли корневого элемента (o1
), у которого нет родителя для последующего вызова subscribe()
. На этом этапе завершается цепочка наблюдаемых (observable) элементов, после чего observable-источник начинает передавать (emit) элементы.
Теперь для вас должна быть понятна концепция работы подписок в RxJava. К настоящему времени у вас должно появиться понимание того, как формируются цепочки наблюдаемых (observable) объектов и как события распространяются, начиная с observable-источника.
observeOn()
Как мы уже видели, subscribeOn()
указывает observable-источнику передавать элементы в определенный поток и этот поток будет отвечать за продвижение элементов вплоть до подписчика (Subscriber). Поэтому, по умолчанию, подписчик получает обработанные элементы в этом же потоке.
Но это может быть не то поведение, которого вы ожидаете. Предположим, вы хотите получить некие данные из сети и отобразить их в пользовательском интерфейсе.
Нужно выполнить две вещи:
- Сделать сетевой вызов в неблокирующем потоке ввода-вывода
- Получить результат в основном потоке приложения
У вас будет Observable
, который осуществляет сетевой вызов в потоке ввода-вывода и передает результат подписчику. Если вы используете только subscribeOn(Schedulers.io())
, то целевой подписчик будет обрабатывать результат в том же потоке ввода-вывода. И нам не повезло, так как работать с пользовательским интерфейсом в Android можно только в основном потоке.
Теперь нам крайне необходимо переключить потоки и мы используем для этого observeOn()
. Когда observeOn()
встречается в цепочке вызовов, то передаваемые observable-источником элементы незамедлительно перебрасываются в поток, указанный в observeOn()
.
getIntegersFromRemoteSource()
.doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
В этом придуманном примере мы наблюдаем получение целых чисел из сети и их дальнейшую передачу из observable-источника. В реальных примерах это может быть любая другая асинхронная операция, например, чтение большого файла, выборка данных из базы данных и т.д. Вы можете запустить данный пример и посмотреть на результаты, просто следите за логами в консоли.
Теперь рассмотрим более сложный пример, в котором observeOn()
будет вызываться несколько раз для переключения потоков в процессе обработки данных.
getIntegersFromRemoteSource()
.doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map(integer -> {
println("Mapping item " + integer + " on: " + currentThread().getName());
return integer * integer;
})
.observeOn(Schedulers.newThread())
.filter(integer -> {
println("Filtering item " + integer + " on: " + currentThread().getName());
return integer % 2 == 0;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
В примере выше observable-источник передаёт элементы в цепочку обработчиков в потоке ввода вывода, так как мы использовали subscribeOn()
вместе с Schedulers.io()
. Далее мы хотим преобразовать каждый элемент, используя оператор map()
, но сделать это нужно в вычислительном потоке. Для этого используем observeOn()
вместе с Schedulers.computation()
перед вызовом map()
для переключения потока и передачи элементов в целевой вычислительный поток.
Следующим шагом отфильтруем некоторые элементы и по какой-то причине мы хотим выполнить эту операцию в новом потоке для каждого из элементов. Используем снова observeOn()
, но уже в паре с Schedulers.newThread()
перед вызовом оператора filter()
для передачи каждого элемента в новый поток.
В итоге мы хотим, чтобы подписчик получил результат обработки в потоке пользовательского интерфейса. Для этого используем observeOn()
вместе с планировщиком AndroidSchedulers.mainThread()
.
Но что произойдет, если использовать observeOn()
несколько раз последовательно? В примере ниже в каком потоке подписчик получит результат?
getIntegersFromRemoteSource()
.doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.single())
.observeOn(Schedulers.computation())
.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
Если запустите пример, то увидите, что подписчик получит элементы в вычислительном потоке RxComputationThreadPool-1
. Это значит, что сработал последний вызванный observeOn()
. Интересно почему?
Под капюшоном
Возможно вы уже догадались. Как мы знаем, подписка (subscription) вызывается после обратного обхода всех Obsevable
, но с передачей событий (emissions) всё происходит наоборот, то есть в обычном порядке, как записан код. Вызов происходит от observable-источника и далее вниз по цепочке вызова вплоть до подписчика.
Оператор observeOn()
всегда работает в прямом порядке, поэтому последовательно происходит переключение потоков и последним происходит переключение на вычислительный поток (observeOn(Schedulers.computation())
). Итак, когда нужно переключить поток для обработки данных в новом потоке, то просто сначала вызовите observeOn()
, а далее обрабатывайте элементы. Синхронизация, исключение состояния гонки, всё это и многие другие сложности многопоточности RxJava обрабатывает за вас.
Резюме
Сейчас у вас должно быть достаточно хорошее представление о том, как правильно использовать RxJava для написания многопоточных приложений, обеспечивающих быструю и плавную работу пользовательского интерфейса.
Если понимание не пришло сразу, ничего страшного. Прочитайте статью ещё раз, поэкспериментируйте с примерами кода. Здесь достаточно много нюансов для понимания, не торопитесь.
Автор: tehreh1uneh