Привет! Представляю вашему вниманию перевод статьи Understanding Publish, Connect, RefCount and Share in RxSwift.
Примечание переводчика.
В оригинале статьи используется Swift второй версии и соответствующая версия RxSwift. Я имел смелость переписать приведенные ниже куски кода под Swift 3.
Так же хочется отметить, что такие понятия, как Observable и Sequence, можно считать одним и тем же. То же касается Observer и Subscriber.
В этой статье я постараюсь объяснить такие операторы для работы с Connectable Observable в RxSwift, как publish, connect, refCount
и share
. Они используются вместе в различных комбинациях. Очень важно понимать разницу между:
publish().connect()
- и
publish().refcount()
(или простоshare()
)
Активные и пассивные Observables
Прежде чем перейти к сути, мне хотелось бы сказать пару слов о hot и cold Observables. Как по мне, так понятия горячих и холодных Observables немного размыты.
Давайте горячий Observable мы будем называть Active Sequence, а холодный Passive Sequence.
- Active Sequence эмитит элементы постоянно, независимо от того, подписан на нее кто-нибудь или нет
- Passive Sequence начинает эмитить элементы по запросу
Примером Passive Sequence может служить запрос в сеть, который начинается только тогда, когда мы подписались на последовательность. Примерами Active Sequence могут служить web-socket соединение, события таймера или текст, производимый UITextField
'ом.
И это все. Думайте об активных и пассивных последовательностях. Понятия горячих/холодных/теплых/прохладных Observables слишком запутанны и могут сбить с толку.
Несколько подписок на один Observable
Если Вы когда-нибудь подписывались дважды (или больше) на один и тот же Observable, Вы могли бы быть удивлены результатами.
Взгляните на следующий кусочек кода:
let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
.rx.data(request: URLRequest(url: url))
requestObservable.subscribe(onNext: {
print($0)
})
requestObservable.subscribe(onNext: {
print($0)
})
Взглянув в консоль, мы увидим два HTTP респонса. Observable выполнил запрос дважды, хоть это противоречит нашим ожиданиям.
share()
как спасение
Очевидно, что это не то, чего мы хотим от обычного HTTP-реквеста. Но мы можем изменить такое поведение и выполнить всего один запрос. Надо просто применить оператор share()
к нашему Observable.
let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
.rx.data(request: URLRequest(url: url))
.share()
requestObservable.subscribe(onNext: {
print($0)
})
requestObservable.subscribe(onNext: {
print($0)
})
Как и ожидалось, выполнился только один HTTP-запрос.
По сути, оператор share()
— это просто обертка над publish().refcount()
.
Стоп-стоп-стоп! Что еще за publish()
, что за refcount()
?
publish()
и его друг connect()
Тогда, когда применен оператор publish(), то Observable трансформируется в Connectable Observable. В документации ReactiveX говорится:
Connectable Observable похож на обычный Observable за исключением одного момента. Он начинает производить элементы не тогда, когда на него подписываются, а только тогда, когда на нем вызван оператор
connect()
.
let myObservable = Observable.just(1).publish()
print("Subscribing")
myObservable.subscribe(onNext: {
print("first = ($0)")
})
myObservable.subscribe(onNext: {
print("second = ($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("Calling connect after 3 seconds")
myObservable.connect()
}
/* Output:
Subscribing
Calling connect after 3 seconds
first = 1
second = 1
*/
В приведенном выше примере, Observer'ы подписываются на myObservable
сразу после того, как он был создан. Но срабатывают они только через 3 секунды, когда был вызван оператор connect()
. Проще говоря, connect()
активирует Connectable Observable и включает подписчиков.
Интересная штука в том, как происходит очистка ресурсов. Посмотрите на этот код.
let myObservable = Observable<Int>
.interval(1, scheduler: MainScheduler.instance)
.publish()
myObservable.connect()
print("Starting at 0 seconds")
let mySubscription = myObservable.subscribe(onNext: {
print("Next: ($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("Disposing at 3 seconds")
mySubscription.dispose()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
print("Subscribing again at 6 seconds")
myObservable.subscribe(onNext: {
print("Next: ($0)")
})
}
// Output:
/*
Starting at 0 seconds
Next: 0
Next: 1
Next: 2
Disposing at 3 seconds
Subscribing again at 6 seconds
Next: 6
Next: 7
Next: 8
Next: 9
...
*/
Даже если все подписчики отписались от нашего Observable, то последний все еще живет и продолжает производить события под капотом.
connect()
возвращает Disposable
. Таким образом, остановить продюсинг элементов можно, вызвав метод dispose()
у данного Disposable
, либо предоставить эту возможность DisposeBag
'у.
Теперь давайте сравним это с publish().refcount()
.
Разница между publish().connect()
и publish().refcount()
Вы можете воспринимать оператор refcount()
как магию, которая за Вас обрабатывают отписку Observer'ов. refcount()
вызывает connect()
автоматически, когда подписывается первый Observer, так что нет нужды делать это самостоятельно.
let myObservable = Observable<Int>
.interval(1, scheduler: MainScheduler.instance)
.publish()
.refCount()
print("Starting at 0 seconds")
let mySubscription = myObservable.subscribe(onNext: {
print("Next: ($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("Disposing at 3 seconds")
mySubscription.dispose()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
print("Subscribing again at 6 seconds")
myObservable.subscribe(onNext: {
print("Next: ($0)")
})
}
// Output:
/*
Starting at 0 seconds
Next: 0
Next: 1
Next: 2
Disposing at 3 seconds
Subscribing again at 6 seconds
Next: 0
Next: 1
Next: 2
Next: 3
...
*/
Обратите внимание вот на что. Когда мы подписались заново, Observable начал эмитить элементы с начала.
Заключение
Чувствуете разницу теперь? publish().connect()
и publish().refcount()
(или share()
) управляют механизмом отписки от Obervable'ов по-разному.
Когда Вы используете publish().connect()
, Вам необходимо вручную управлять механизмом очистки ресурсов вашего Observable (об этом говорилось в примечании под спойлером). Ваша последовательность ведет себя как активная и производит элементы все время, независимо от подписок.
С другой стороны, publish().refcount()/share()
следит за том, как много Observer'ов подписано на Observable и не отключает первых от последнего до тех пор, пока существует хотя бы один подписчик. Другими словами, когда счетчик подписчиков падает до нуля, Observable «умирает» и перестает производить какие-либо элементы.
Если что-то осталось не до конца ясным, пожалуйста, дайте знать об этом в комментариях. Спасибо.
Автор: dolenko_alexey