Привет! Меня зовут Константинов Александр, я Android-разработчик в «Студии Олега Чулакова». Сегодня мы сравим операторы RxJava 3 и Flow. Статья будет полезна как для изучения операторов, так и для более легкого перехода с RxJava на Flow. Буду рад вашему фидбеку и комметариям.
Ну что ж, давайте начинать!
В этой статье рассмотрим только самые популярые операторы и примеры кода:
Операторы преобразования
RxJava map / Flow map:
Преобразует элементы, применяя функцию к каждому из них. Работает последовательно.
Пример кода
//RxJava map
fun main() {
Observable.range(1, 5)
.map { it * 2 }
.subscribe(::println) // Вывод: 2, 4, 6, 8, 10
}
//Flow map
fun main() = runBlocking {
(1..5).asFlow()
.map { it * 2 }
.collect { println(it) } // Вывод: 2, 4, 6, 8, 10
}
RxJava flatMap / Flow flatMapMerge:
Преобразует каждое значение в поток. RxJava flatMap
/ Flow flatMapMerge
, в отличие от RxJava concatMap
/ Flow flatMapConcat
, обрабатывает элементы исходного потока параллельно, а не последовательно. Потоки для каждого элемента запускаются одновременно и эмитят значения в порядке их готовности, а не поочередно, поэтому порядок значений может отличаться и не соответствовать исходному. Следует использовать, когда порядок обработки не критичен и можно выиграть в производительности от параллельной обработки.
Пример кода
//RxJava flatMap
fun main() {
Observable.just(1, 2, 3)
.flatMap { i ->
Observable.just(i * 10)
.concatWith(Observable.just(i * 20).delay(10, TimeUnit.MILLISECONDS))
}
.blockingSubscribe(::println)
}
//Flow flatMapMerge
fun main() = runBlocking {
flowOf(1, 2, 3)
.flatMapMerge { flow { emit(it * 10); delay(10); emit(it * 20) } }
.collect { println(it) }
}
RxJava concatMap / Flow flatMapConcat:
Преобразует каждое значение в поток и объединяет их последовательно. Берет каждый элемент из исходного потока и, для каждого элемента, создает новый поток. Эти потоки затем объединяются в один линейный поток. Важно отметить, что объединение происходит поочередно: следующий поток начинается только после того, как завершен предыдущий.
Пример кода
//RxJava concatMap
fun main() {
Observable.just(1, 2, 3)
.concatMap { i -> Observable.just(i * 10, i * 20) }
.subscribe(::println) // Вывод: 10, 20, 20, 40, 30, 60
}
//Flow flatMapConcat
fun main() = runBlocking {
flowOf(1, 2, 3)
.flatMapConcat { flowOf(it * 10, it * 20) }
.collect { println(it) } // Вывод: 10, 20, 20, 40, 30, 60
}
RxJava buffer / Flow buffer:
Группирует элементы в списки фиксированного размера.
Используется для обработки элементов в буферах (или пакетами), что позволяет повысить производительность, особенно в случаях, когда обработка каждого элемента по отдельности блокирует лишние ресурсы.
Пример кода
//RxJava buffer
fun main() {
Observable.just(1, 2, 3, 4, 5)
.doOnNext { println("Emitting $it") }
.buffer(2) // Буферизуем элементы по 2
.subscribe { buffer ->
println("Collected $buffer")
}
Thread.sleep(1000)
}
//Flow buffer
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.onEach { println("Emitting $it") }
.buffer() // Буферизуем все элементы
.collect { value ->
println("Collected $value")
}
}
RxJava scan / Flow scan:
Последовательно применяет функцию к элементам, возвращая промежуточные результаты. Используется для накопления значений с возможностью учета предыдущих накопленных значений.
Пример кода
//RxJava scan
fun main() {
Observable.fromArray(1, 2, 3)
.scan(0) { accumulator, value ->
accumulator + value
}
.subscribe { result ->
println(result) // Вывод: 0, 1, 3, 6
}
}
//Flow scan
fun main() = runBlocking<Unit> {
// Создаем Flow от 1 до 3
val flow = (1..3).asFlow()
// Применяем оператор scan
flow.scan(0) { accumulator, value ->
accumulator + value
}.collect { result ->
println(result) // Вывод: 0, 1, 3, 6
}
}
RxJava groupBy / Аналог на Flow отсутствует:
Разбивает поток на несколько потоков, сгруппированных по ключам.
Пример кода
fun main() {
val observable = Observable.just(
"one", "two", "three", "four", "five", "six"
)
observable.groupBy { it.length }
.flatMapSingle { group ->
group.toList().map { list -> Pair(group.key, list) }
}
.subscribe { pair ->
println("Length: ${pair.first}, Words: ${pair.second}")
}
}
Фильтрация
RxJava distinctUntilChanged / Flow distinctUntilChanged
Исключает повторяющиеся последовательные элементы.
Пример кода
//RxJava distinctUntilChanged
fun main() {
Observable.just(1, 1, 2, 2, 3, 1)
.distinctUntilChanged()
.subscribe(::println) // Вывод: 1, 2, 3, 1
}
//Flow distinctUntilChanged
fun main() = runBlocking {
flowOf(1, 1, 2, 2, 3, 1)
.distinctUntilChanged()
.collect { println(it) } // Вывод: 1, 2, 3, 1
}
RxJava filter / Flow filter:
Отбирает элементы, соответствующие условию.
Пример кода
//RxJava filter
fun main() {
Observable.range(1, 5)
.filter { it % 2 == 0 }
.subscribe(::println) // Вывод: 2, 4
}
//Flow filter
fun main() = runBlocking {
(1..5).asFlow()
.filter { it % 2 == 0 }
.collect { println(it) } // Вывод: 2, 4
}
RxJava take / Flow take:
Берет первые N элементов из входящего потока.
Пример кода
//RxJava take
fun main() {
Observable.range(1, 10)
.take(3)
.subscribe(::println) // Вывод: 1, 2, 3
}
//Flow take
fun main() = runBlocking {
(1..10).asFlow()
.take(3)
.collect { println(it) } // Вывод: 1, 2, 3
}
RxJava skip / Flow drop:
Пропускает первые N элементов.
Пример кода
//Flow drop
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.drop(2) // Пропускаем первые 2 элемента
.collect { value ->
println(value) // Выведет 3, 4, 5
}
}
//RxJava skip
fun main() {
Observable.just(1, 2, 3, 4, 5)
.skip(2) // Пропускаем первые 2 элемента
.subscribe { value ->
println(value) // Выведет 3, 4, 5
}
}
Комбинирование
RxJava merge / Flow merge:
Объединяет заданные потоки в один поток. Все потоки объединяются одновременно, без ограничения на количество одновременно собираемых потоков.
Пример кода
//Flow merge
fun main() = runBlocking {
val flow1 = flow {
repeat(3) {
emit("From flow1: $it")
delay(500L) // Эмиссия каждые 500 мс
}
}
val flow2 = flow {
repeat(3) {
emit("From flow2: $it")
delay(1000L) // Эмиссия каждые 1000 мс
}
}
val startTime = System.currentTimeMillis()
merge(flow1, flow2).collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms")
}
}
//RxJava merge
fun main() {
val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
.take(3)
.map { "From observable1: $it" }
val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.take(3)
.map { "From observable2: $it" }
val startTime = System.currentTimeMillis()
Observable.merge(observable1, observable2)
.subscribe { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms")
}
// Ожидание завершения потоков
Thread.sleep(4000)
}
RxJava zip / Flow zip:
Объединяет потоки, применяя функцию комбинирования к элементам. Элемент для которого нет пары не испускается.
Пример кода
//RxJava zip
fun main() {
val nums = Observable.range(1, 3)
val letters = Observable.just("A", "B", "C", "D")
Observable.zip(nums, letters) { n, l -> "$n -> $l" }
.subscribe(::println)
// Вывод: 1 -> A, 2 -> B, 3 -> C //D - не эмиттится
}
//Fllow zip
fun main() = runBlocking {
val numbers = (1..3).asFlow()
val letters = flowOf("A", "B", "C", "D")
numbers.zip(letters) { n, l -> "$n -> $l" }
.collect { println(it) }
// Вывод: 1 -> A, 2 -> B, 3 -> C //D - не эмиттится
}
RxJava combineLatest / Flow combine:
Объединяет потоки, используя последние элементы из каждого.
Пример кода
//Flow combine
fun main() = runBlocking {
val flow1 = flow {
repeat(3) {
emit("String: s$it")
delay(500L) // Эмиссия каждые 500 мс
}
}
val flow2 = flow {
repeat(3) {
emit(it)
delay(1000L) // Эмиссия каждые 1000 мс
}
}
flow1.combine(flow2) { str, num ->
"$str and Int: $num"
}.collect { value ->
println(value)
}
}
//RxJava combineLatest
fun main() {
val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
.take(3)
.map { "String: s$it" }
val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.take(3)
Observable.combineLatest(observable1, observable2) { str: String, num: Long ->
"$str and Int: $num"
}
.subscribe { value ->
println(value)
}
Thread.sleep(5000L)
}
Агрегация
RxJava reduce / Flow reduce:
В Kotlin Flow оператор reduce аналогичен одноименному оператору в RxJava. Оба используются для преобразования коллекции значений в одиночное значение, используя начальное значение и функцию агрегирования (аккумулятора). Применяет функцию агрегирования ко всем элементам, возвращает одно значение.
Пример кода
//Flow reduce
fun main() = runBlocking {
val result = (1..5).asFlow()
.reduce { accumulator, value ->
accumulator + value
}
println("Flow Reduce Result: $result")
}
//RxJava reduce
fun main() {
Observable.fromIterable(listOf(1, 2, 3, 4, 5))
.reduce { accumulator, value ->
accumulator + value
}
.subscribe { result ->
println("RxJava Reduce Result: $result")
}
}
-
count:
Используются для подсчета количества элементов в потоке, которые удовлетворяют определенному условию (или всех элементов, если условие отсутствует)
RxJava count / Flow count:
Используются для подсчета количества элементов в потоке, которые удовлетворяют определенному условию (или всех элементов, если условие отсутствует)
Пример кода
//Flow count
fun main() = runBlocking {
val count = (1..10).asFlow()
.count { it % 2 == 0 } // Считаем количество четных чисел
println("Flow Count Result: $count")
}
//RxJava count
fun main() {
Observable.fromIterable(1..10)
.filter { it % 2 == 0 } // Отфильтруем четные числа
.count() // Подсчитываем количество элементов после фильтрации
.subscribe { count ->
println("RxJava Count Result: $count")
}
}
Управление временем
RxJava debounce / Flow debounce:
Пропускает элементы, если они генерируются слишком быстро.
Пример кода
//RxJava debounce
import io.reactivex.rxjava3.core.Observable
import java.util.concurrent.TimeUnit
fun main() {
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(10)
.debounce(150, TimeUnit.MILLISECONDS) // Испускает только элементы,
//за которыми следует 150мс тишины
.subscribe(::println)
Thread.sleep(2000)
}
//Flow debounce
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
(1..10).asFlow()
.onEach { delay(100L) }
.debounce(150L) // Испускает только элементы,
//за которыми следует 150мс тишины
.collect { println(it) }
}
RxJava delay / Flow - другой подход
В RxJava Задерживает эмиссию элементов на указанное время. В Flow используется delay из kotlinx.coroutines.delay внутри построителя flow
или внутри операторов.
Пример кода
//RxJava delay
fun main() {
val startTime = System.currentTimeMillis()
Observable.range(1, 5)
.concatMap { item ->
Observable.just(item)
.delay(1000, TimeUnit.MILLISECONDS) // Задержка каждого элемента
}
.subscribe { value ->
println("RxJava emitted $value at ${System.currentTimeMillis() - startTime} ms")
}
// Чтобы приложение не завершилось раньше времени
Thread.sleep(6000)
}
//Flow: delay в onEach
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
(1..5).asFlow()
.onEach { delay(1000) } // Задержка перед каждым элементом
.collect { value ->
println("Flow emitted $value " +
"at ${System.currentTimeMillis() - startTime} ms")
}
}
Надеюсь статья дала Вам более развернутое понимание, как сопоставляются различые операторы в RxJava и Kotlin Flow и поможет более быстро мигрировать с одоного на другое. Так же в статье постарался использовать много временЫх диаграмм для лучшего поимания принципов работы оператора.
Спасибо за внимание!
Автор: AleksandrKonst