Привет!
Напоминаем, что у нас уже открыт предзаказ на долгожданную книгу о языке Kotlin из знаменитой серии Big Nerd Ranch Guides. Сегодня мы решили предложить вашему вниманию перевод статьи, рассказывающей о корутинах Kotlin и о правильной работе с потоками в Android. Тема обсуждается очень активно, поэтому для полноты картины также рекомендуем посмотреть эту статью с Хабра и этот подробный пост из блога компании Axmor Software.
Современный фреймворк для обеспечения конкурентности в Java/Android учиняет ад обратных вызовов и приводит к блокирующим состояниям, так как в Android нет достаточно простого способа гарантировать потокобезопасность.
Корутины Kotlin – это очень эффективный и полный инструментарий, позволяющий гораздо проще и производительнее управлять конкурентностью.
Приостановка и блокирование: в чем разница
Корутины не заменяют потоков, а скорее дают фреймворк для управления ими. Философия корутин заключается в определении контекста, позволяющего ожидать, пока завершатся фоновые операции, не блокируя при этом основного потока.
Цель корутин в данном случае – обойтись без обратных вызовов и упростить конкуренцию.
Простейший пример
Для начала возьмем самый простой пример: запустим корутину в контексте Main
(главный поток). В нем мы извлечем изображение из потока IO
и отправим это изображение на обработку обратно в Main
.
launch(Dispatchers.Main) {
val image = withContext(Dispatchers.IO) { getImage() } // получить из контекста IO
imageView.setImageBitmap(image) // Возвращаемся в главный поток
}
Код прост как однопоточная функция. Причем, пока getImage
выполняется в выделенном пуле потоков IO
, главный поток свободен и может взяться за любую другую задачу! Функция withContext приостанавливает текущую корутину, пока работает ее действие (getImage()
). Как только getImage()
возвратится и looper
из главного потока станет доступен, корутина возобновит работу в главном потоке и вызовет imageView.setImageBitmap(image)
.
Второй пример: теперь нам требуется, чтобы были выполнены 2 фоновые задачи, чтобы ими можно было воспользоваться. Мы применим дуэт async/await, чтобы две эти задачи выполнялись параллельно, и воспользуемся их результатом в главном потоке, как только обе задачи будут готовы:
val job = launch(Dispatchers.Main) {
val deferred1 = async(Dispatchers.Default) { getFirstValue() }
val deferred2 = async(Dispatchers.IO) { getSecondValue() }
useValues(deferred1.await(), deferred2.await())
}
job.join() // приостанавливает выполнение актуальной корутины, пока задача не будет выполнена
async
подобен launch
, но возвращает deferred
(сущность Kotlin, эквивалентная Future
), поэтому ее результат можно получить при помощи await()
. При вызове без параметров она работает в контексте, задаваемом по умолчанию для текущей области видимости.
Опять же, главный поток остается свободен, пока мы дожидаемся наших 2 значений.
Как видите, функция launch
возвращает Job
, который можно использовать для ожидания, пока операция завершится – это делается при помощи функции join()
. Она работает как и в любом другом языке, с той оговоркой, что просто приостанавливает корутину, а не блокирует поток.
Диспетчеризация
Диспетчеризация – ключевая концепция при работе с корутинами. Это действие, позволяющее «перепрыгнуть» от одного потока к другому.
Рассмотрим, как в java выглядит эквивалент для диспетчеризации в Main
, то есть,
runOnUiThread:
public final void runOnUiThread(Runnable action) {
if (Thread.currentThread() != mUiThread) {
mHandler.post(action); // Диспетчеризация
} else {
action.run(); // Немедленное выполнение
}
}
Реализация контекста Main
для Android – это диспетчер на основе Handler
. Итак, это действительно очень подходящая реализация:
launch(Dispatchers.Main) { ... }
vs
launch(Dispatchers.Main, CoroutineStart.UNDISPATCHED) { ... }
// Начиная с kotlinx 0.26:
launch(Dispatchers.Main.immediate) { ... }
launch(Dispatchers.Main)
посылает Runnable
в Handler
, так что его код выполняется не сразу.
launch(Dispatchers.Main, CoroutineStart.UNDISPATCHED)
немедленно выполнит свое лямбда-выражение в текущем потоке.
Dispatchers.Main
гарантирует, что когда корутина возобновит работу, она будет направлена в главный поток; кроме того, Handler используется здесь как нативная реализация Android для отправки в цикл событий приложения.
Точная реализация выглядит так:
val Main: HandlerDispatcher = HandlerContext(mainHandler, "Main")
Вот хорошая статья помогающая разобраться в тонкостях диспетчеризации в Android:
Understanding Android Core: Looper, Handler, and HandlerThread.
Контекст корутины
Контекст корутины (он же – диспетчер корутины) определяет, в каком потоке будет выполняться ее код, что делать, если будет выброшено исключение, и обращается к родительскому контексту для распространения отмены.
val job = Job()
val exceptionHandler = CoroutineExceptionHandler {
coroutineContext, throwable -> whatever(throwable)
}
launch(Disaptchers.Default+exceptionHandler+job) { ... }
job.cancel()
отменит все корутины, родителем которых является job
. A exceptionHandler получит все исключения, выброшенные в этих корутинах.
Область видимости
Интерфейс coroutineScope
упрощает обработку ошибок:
Если откажет какая-либо из его дочерних корутин, то откажет и вся область видимости, и все дочерние корутины будут отменены.
В примере async
, если извлечь значение не удалось, а другая задача при этом продолжила работу – у нас возникает поврежденное состояние, и с этим надо что-то делать.
При работе с coroutineScope
функция useValues
будет вызываться лишь в случае, если извлечение обоих значений прошло успешно. Также, если deferred2
откажет, deferred1
будет отменена.
coroutineScope {
val deferred1 = async(Dispatchers.Default) { getFirstValue() }
val deferred2 = async(Dispatchers.IO) { getSecondValue() }
useValues(deferred1.await(), deferred2.await())
}
Также можно “поместить в область видимости” целый класс, чтобы задать для него контекст CoroutineContext
по умолчанию и использовать его.
Пример класса, реализующего интерфейс CoroutineScope
:
open class ScopedViewModel : ViewModel(), CoroutineScope {
protected val job = Job()
override val coroutineContext = Dispatchers.Main+job
override fun onCleared() {
super.onCleared()
job.cancel()
}
}
Запуск корутин в CoroutineScope
:
Диспетчер launch
или async
, задаваемый по умолчанию, теперь становится диспетчером актуальной области видимости.
launch {
val foo = withContext(Dispatchers.IO) { … }
// лямбда-выражение выполняется в контексте CoroutineContext области видимости
…
}
launch(Dispatchers.Default) {
// лямбда-выражение выполняется в задаваемом по умолчанию пуле потоков
…
}
Автономный запуск корутины (вне какого-либо CoroutineScope):
GlobalScope.launch(Dispatchers.Main) {
// лямбда-выражение выполняется в главном потоке.
…
}
Можно даже определить область видимости для приложения, задав диспетчер Main
по умолчанию:
object AppScope : CoroutineScope by GlobalScope {
override val coroutineContext = Dispatchers.Main.immediate
}
Замечания
- Корутины ограничивают интероперабельность с Java
- Ограничивают изменяемость во избежание блокировок
- Корутины предназначены для ожидания, а не для организации потоков
- Избегайте I/O в
Dispatchers.Default
(иMain
…) — для этого предназначен Dispatchers.IO - Потоки ресурсозатратны, поэтому используются однопоточные контексты
Dispatchers.Default
основан наForkJoinPool
, появившемся в Android 5+- Корутины можно использовать посредством каналов
Избавляемся от блокировок и обратных вызовов при помощи каналов
Определение канала из документации JetBrains:
Канал
Channel
концептуально очень похож наBlockingQueue
. Ключевое отличие заключается в том, что он не блокирует операцию put, он предусматривает приостанавливающийsend
(или неблокирующийoffer
), а вместо блокирования операции take предусматривает приостанавливающийreceive
.
Акторы
Рассмотрим простой инструмент для работы с каналами: Actor
.
Actor
, опять же, очень похож на Handler
: мы определяем контекст корутины (то, есть, поток, в котором собираемся выполнять действия) и работаем с ним в последовательном порядке.
Разница, конечно же, заключается в том, что здесь используются корутины; можно указать мощность, а выполняемый код – приостановить.
В принципе, actor
будет переадресовывать любую команду каналу корутины. Он гарантирует выполнение команды и ограничивает операции в ее контексте. Такой подход отлично помогает избавиться от вызовов synchronize
и держать все потоки свободными!
protected val updateActor by lazy {
actor<Update>(capacity = Channel.UNLIMITED) {
for (update in channel) when (update) {
Refresh -> updateList()
is Filter -> filter.filter(update.query)
is MediaUpdate -> updateItems(update.mediaList as List<T>)
is MediaAddition -> addMedia(update.media as T)
is MediaListAddition -> addMedia(update.mediaList as List<T>)
is MediaRemoval -> removeMedia(update.media as T)
}
}
}
// используем
fun filter(query: String?) = updateActor.offer(Filter(query))
// или
suspend fun filter(query: String?) = updateActor.send(Filter(query))
В данном примере мы пользуемся запечатанными классами Kotlin, выбирая, какое именно действие выполнить.
sealed class Update
object Refresh : Update()
class Filter(val query: String?) : Update()
class MediaAddition(val media: Media) : Update()
Причем, все эти действия будут поставлены в очередь, параллельно выполняться они никогда не будут. Это удобный способ добиться ограничения изменяемости.
Жизненный цикл Android + корутины
Акторы могут очень пригодиться и для управления пользовательским интерфейсом Android, упрощают отмену задач и предотвращают перегрузку главного потока.
Давайте это реализуем и вызовем job.cancel()
при уничтожении активности.
class MyActivity : AppCompatActivity(), CoroutineScope {
protected val job = SupervisorJob() // экземпляр Job для данной активности
override val coroutineContext = Dispatchers.Main.immediate+job
override fun onDestroy() {
super.onDestroy()
job.cancel() // отмена задачи при уничтожении активности
}
}
Класс SupervisorJob
похож на обычный Job
с тем единственным исключением, что отмена распространяется только в нисходящем направлении.
Поэтому мы не отменяем всех корутин в Activity
, когда одна из них отказывает.
Чуть лучше дела обстоят с функцией расширения, позволяющей открыть доступ к этому CoroutineContext
из любого View
в CoroutineScope
.
val View.coroutineContext: CoroutineContext?
get() = (context as? CoroutineScope)?.coroutineContext
Теперь мы можем все это скомбинировать, функция setOnClick
создает объединенный actor для управления ее действиями onClick
. В случае множественных нажатий промежуточные действия будут игнорироваться, исключая таким образом ошибки ANR (приложение не отвечает), и эти действия будут выполняться в области видимости Activity
. Поэтому при уничтожении активности все это будет отменено.
fun View.setOnClick(action: suspend () -> Unit) {
// запускаем один актор в качестве родителя задачи контекста
val scope = (context as? CoroutineScope)?: AppScope
val eventActor = scope.actor<Unit>(capacity = Channel.CONFLATED) {
for (event in channel) action()
}
// устанавливаем слушатель для активации этого актора
setOnClickListener { eventActor.offer(Unit) }
}
В данном примере мы задаем для Channel
значение Conflated
, чтобы он игнорировал часть событий, если их будет слишком много. Можно заменить его на Channel.UNLIMITED
, если вы предпочитаете ставить события в очередь, не теряя ни одного из них, но все равно хотите защитить приложение от ошибки ANR.
Также можно комбинировать корутины и фреймворки Lifecycle, чтобы автоматизировать отмену задач, связанных с пользовательским интерфейсом:
val LifecycleOwner.untilDestroy: Job get() {
val job = Job()
lifecycle.addObserver(object: LifecycleObserver {
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun onDestroy() { job.cancel() }
})
return job
}
// использование
GlobalScope.launch(Dispatchers.Main, parent = untilDestroy) {
/* здесь происходят удивительные вещи! */
}
Упрощаем ситуацию с обратными вызовами (часть 1)
Вот как можно преобразить использование API, основанного на обратных вызовах, благодаря Channel
.
API работает вот так:
requestBrowsing(url, listener)
инициирует синтаксический разбор папки, расположенной по адресуurl
.- Слушатель
listener
получаетonMediaAdded(media: Media)
для любого медиа-файла, обнаруженного в этой папке. listener.onBrowseEnd()
вызывается по завершении синтаксического разбора папки
Вот старая функция refresh
в поставщике контента для обозревателя VLC:
private val refreshList = mutableListOf<Media>()
fun refresh() = requestBrowsing(url, refreshListener)
private val refreshListener = object : EventListener{
override fun onMediaAdded(media: Media) {
refreshList.add(media))
}
override fun onBrowseEnd() {
val list = refreshList.toMutableList()
refreshList.clear()
launch {
dataset.value = list
parseSubDirectories()
}
}
}
Как это улучшить?
Создаем канал, который будет запускаться в refresh
. Теперь обратные вызовы обозревателя будут лишь направлять медиа в этот канал, а затем закрывать его.
Теперь функция refresh
стала понятнее. Она создает канал, вызывает обозреватель VLC, затем формирует список медиа-файлов и обрабатывает его.
Вместо функций select
или consumeEach
можно использовать for
для ожидания медиа, и этот цикл будет разрываться, как только канал browserChannel
закроется.
private lateinit var browserChannel : Channel<Media>
override fun onMediaAdded(media: Media) {
browserChannel.offer(media)
}
override fun onBrowseEnd() {
browserChannel.close()
}
suspend fun refresh() {
browserChannel = Channel(Channel.UNLIMITED)
val refreshList = mutableListOf<Media>()
requestBrowsing(url)
// Приостанавливается на каждой итерации в ожидании медиа
for (media in browserChannel) refreshList.add(media)
// Канал закрыт
dataset.value = refreshList
parseSubDirectories()
}
Упрощаем ситуацию с обратными вызовами (часть 2): Retrofit
Второй подход: мы вообще не используем корутины kotlinx, зато применяем корутинный core-фреймворк.
Смотрите, как на самом деле работают корутины!
Функция retrofitSuspendCall
оборачивает запрос на вызов Retrofit Call
, чтобы сделать из него функцию suspend
.
При помощи suspendCoroutine
мы вызываем метод Call.enqueue
и приостанавливаем корутину. Предоставленный таким образом обратный вызов обратится к continuation.resume(response)
, чтобы возобновить корутину откликом от сервера, как только тот будет получен.
Далее нам остается просто объединить наши функции Retrofit в retrofitSuspendCall
, чтобы с их помощью возвращать результаты запросов.
suspend inline fun <reified T> retrofitSuspendCall(request: () -> Call <T>
) : Response <T> = suspendCoroutine { continuation ->
request.invoke().enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
continuation.resume(response)
}
override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
suspend fun browse(path: String?) = retrofitSuspendCall {
ApiClient.browse(path)
}
// использование (в контексте корутины Main)
livedata.value = Repo.browse(path)
Таким образом, вызов, блокирующий сеть, делается в выделенном потоке Retrofit, корутина находится здесь, ожидая отклика от сервера, а использовать ее в приложении – проще некуда!
Такая реализация вдохновлена библиотекой gildor/kotlin-coroutines-retrofit.
Также имеется JakeWharton/retrofit2-kotlin-coroutines-adapter с другой реализацией, дающей аналогичный результат.
Эпилог
Channel
можно использовать и многими другими способами; посмотрите в BroadcastChannel более мощные реализации, которые могут вам пригодиться.
Также можно создавать каналы при помощи функции Produce.
Наконец, при помощи каналов удобно организовать коммуникацию между компонентами UI: адаптер может передавать события нажатий в свой фрагмент/активность через Channel
или, например, через Actor
.
Автор: ph_piter