flow kotlin что это

kotlinx.coroutines 1.4.0: представляем StateFlow и SharedFlow

В преддверии старта курса «Kotlin Backend Developer» приглашаем всех желающих записаться на открытый урок по теме «Kotlin multiplatform: front/back на одном языке».

А сейчас предлагаем к прочтению традиционный перевод статьи.

API-интерфейс Flow в Kotlin предназначен для асинхронной обработки потока данных, который выполняется последовательно. По сути, Flow — это последовательность. В Kotlin с помощью Flow можно выполнять такие же операции, как с помощью Sequences: преобразовывать, фильтровать, сопоставлять и т. д. Основное различие между Sequences и Flow в Kotlin заключается в том, что Flow позволяет приостанавливать выполнение.

Во Flow приостановку можно выполнить в любом месте: в функции сборки или в любом из операторов, предоставляемых API-интерфейсом Flow. Приостановка во Flow работает как контроль backpressure, при этом вам не нужно ничего делать — всю работу выполняет компилятор.

Интерфейс Flow так же прост в использовании, как и Sequences. Однако Flow несет в себе все преимущества реактивного программирования, в котором не требуется управлять backpressure.

Flow является удобным API-интерфейсом, однако он не обеспечивает возможность управлять состоянием, которая требуется в некоторых случаях. Например, у процесса может быть несколько промежуточных и одно конечное состояние. Примером такого процесса является загрузка файла: процесс загрузки длится некоторое время и мы можем определить такие промежуточные состояния процесса, как «Запущен» и «Выполняется», и конечные состояния «Успешно» и «Сбой». В этом случае нам интересны только результаты: успешно была выполнена загрузка или нет.

При реализации описанного выше сценария с помощью API-интерфейса Flow нам нужно публиковать изменения состояния для наблюдателей, которые могут совершать те или иные действия исходя из этих изменений. Ранее мы всегда рекомендовали использовать для этого ConflatedBroadcastChannel. Однако ConflatedBroadcastChannel является слишком сложным для этой задачи. Кроме того, имеются некоторые логические нестыковки, которые возникают при использовании каналов для управления состоянием. Например, канал может быть закрыт или отменен. Это не очень хорошо сочетается с управлением состоянием, поскольку состояние-то нельзя отменить!

Мы решили отказаться от ConflatedBroadcastChannel и вместо этого внедрить пару новых API-интерфейсов — StateFlow и SharedFlow!

StateFlow

StateFlow имеет две разновидности: StateFlow и MutableStateFlow:

Состояние представлено значением. Любое изменение значения будет отражено во всех коллекторах потока путем выдачи значения с изменениями состояния.

Давайте посмотрим, как можно реализовать описанный ранее пример с загрузкой файла с помощью нового API-интерфейса.

Как видите, никаких API для работы с каналами здесь не используется. Мы не запускаем никаких дополнительных корутин, и нет нужды изучать какие-либо новые концепции. Только простой императивный код, в котором для описания реализации используется переменная, а клиентам предоставляется state как Flow.

SharedFlow

Что, если вместо управления состоянием нам потребуется управлять рядом обновлений состояния, то есть потоком событий? Для таких случаев у нас есть новый API-интерфейс под названием SharedFlow. Этот API-интерфейс подходит для обработки ряда выдаваемых значений, например для вычисления скользящего среднего из потока данных.

Общий поток — это просто поток, где есть кэш повтора, который можно использовать в качестве атомарного моментального снимка. Каждый новый подписчик сначала получает значения из кэша повтора, а затем получает новые выданные значения.

Вместе с SharedFlow мы также предоставляем MutableSharedFlow.

С помощью MutableSharedFlow можно выдавать значения из приостанавливающего и неприостанавливающего контекста. Как можно заключить из имени, кэш повтора MutableSharedFlow можно сбрасывать. Кроме того, он предоставляет количество своих коллекторов как поток.

Реализовать собственный MutableSharedFlow может быть довольно сложно. Поэтому мы предоставляем несколько удобных методов для работы с SharedFlow.

Чтобы инициализировать экземпляр MutableSharedFlow с помощью приведенной выше функции, можно указать количество значений, которые повторяются для новых подписчиков, емкость буфера, а также действия, которые следует выполнять в случае заполнения буфера. Например, если буфер заполнится, можно приостановить поток.

Резюме

Новые API-интерфейсы StateFlow и SharedFlow обеспечивают более элегантный способ работы с состоянием в программах на Kotlin с корутинами. Они намного проще и понятнее, чем использование широковещательных каналов для публикации изменений состояния из контекста потока.

Попробуйте новые API, испытайте их на прочность и отправьте нам свой отзыв!

Подробные сведения о нововведениях в Kotlin Coroutines можно узнать, посмотрев выступление Всеволода Толстопятова на конференции Kotlin 1.4 Online Event.

Источник

Android, Kotlin Flow во ViewModel — все сложно

Загрузка данных для UI в приложении Android может быть непростой задачей. Нам надо учитывать жизненный цикл компонентов Android и изменения конфигурации, потому что все это приводит к уничтожению и восстановлению Activity.

Отдельные экраны приложения постоянно переключаются между активным и неактивным состоянием, когда пользователь ходит вперед назад по экранам, переключается с одного приложения на другое, блокирует и разблокирует экран. Каждый компонент должен выполнять активную работу в нужном состоянии экрана.

Изменения конфигурации происходят в случаях:

при изменении ориентации экрана;

когда приложение переключается в мульти-оконный режим;

при переключении визуальной темы смартфона;

Повышаем эффективность

Для улучшения пользовательского опыта, эффективная загрузка данных во Fragment и Activity должна учитывать следующие правила:

Кеширование: актуальные загруженные данные, должны быть доставлены немедленно и не загружаться повторно. В частности, когда существующий Fragment или Activity становятся видимыми снова или Activity пересоздается после изменения конфигурации.

Избегать фоновую работу: когда Activity или Fragment скрываются (состояние изменяется со STARTED на STOPPED ), любая работа по загрузке внешних данных должна вставать на паузу или отменяться для экономии ресурсов. Это особенно важно для бесконeчных потоков данных, как например геолокация или периодическое обновление каких-либо данных.

Работа не прерывается при изменении конфигурации: это исключение из правила #2, Во время смены конфигурации, текущая Activity заменяется новым экземпляром с сохранением состояния, поэтому отменять текущую работу в старом экземпляре Activity и перезапускать работу при создании нового экземпляра Activiti было бы контр продуктивно.

Современный подход: ViewModel и LiveData

В 2017 Google зарелизила первый набор библиотек Architecture Components<:target="_blank">, там появились ViewModel и LiveData компоненты, которые помогают разработчикам эффективно работать с данными, поддерживая все 3 правила выше.

ViewModel<:target="_blank">, сохраняет данные при изменении конфигурации, используется для достижения правил #1 и #3: операции загрузки выполняются непрерывно во время изменения конфигурации, полученные данные могут кешироваться и совместно использоваться одним, или несколькими Fragment или Activity.

LiveData<:target="_blank">, простой контейнер данных, поддерживающий подписку на изменения и учитывающий жизненный цикл компонентов Android. Новые данные отправляются наблюдателям только когда их жизненный цикл в состоянии не менее STARTED (видимый), наблюдатели отписываются автоматически, что избавляет от утечек памяти. LiveData используется для достижения правил #1 и #2: кеширует последнее значение данных и это значение автоматически отправляется новым наблюдателям. В дополнение, LiveData уведомляет, когда в состоянии STARTED больше нет наблюдателей и можно избежать ненужной фоновой работы.

ViewModel scope.

Опытный разработчик, как правило уже знаком со всем этим. Но важно вспомнить все возможности, чтобы сравнить их с Flow.

LiveData + Coroutines

LiveData довольна ограничена по сравнению с реактивными решениями (например RxJava):

передает и берет данные только на главном потоке (main thread). Интересно, что оператор map выполняет трансформацию объектов на главном потоке и не может использоваться на I/O потоках или для тяжелых вычислений на CPU. Для этого используется оператор switchMap совместно с ручным запуском асинхронной операции в нужном потоке, даже если в основной поток надо отправить единственное значение.

Чтобы преодолеть эти ограничения, библиотеки Jetpack дают специальные «мосты» из LiveData для других технологий, таких как RxJava или Kotlin корутины.

Самый простой и наиболее элегантный из них, по мнению автора, это LiveData coroutine builder, подключается через androidx.lifecycle:lifecycle-livedata-ktx Gradle зависимость. Этот функционал похож на flow <> builder function из библиотеки Kotlin Coroutines и позволяет грамотно обернуть корутину в экземпляр LiveData:

Вы можете использовать все силу корутин, их контекстов для написания асинхронного кода в синхронной манере без колбеков, автоматически переключаясь между нужными потоками;

Новые значения отправляются наблюдателям LiveData в главном потоке через suspending методы emit() или emitSource() из корутины;

Корутина использует специальную область видимости (scope) и жизненный цикл привязанный к экземпляру LiveData. Когда LiveData становится неактивной (это значит, что больше нет наблюдателей в состоянии STARTED ), то корутина будет автоматически отменена, работает правило #2;

В реальности отмена корутины будет задержана на 5 секунд после того как LiveData станет неактивной для правильной обработки смены конфигурации: если новая Activity немедленно заменит старую и LiveData станет активной до срабатывания задержки, то отмена корутины не будет и цена перезапуска будет нулевой (правило #3);

если пользователь вернется назад на экран и LiveData станет активной, то корутина автоматически перезапустится, но только если она была отменена до завершения. Как только корутина завершится, она больше не будет перезапускаться, те же данные не будут загружаться дважды, если входные параметры не изменились (правило #1).

Вывод: используйте LiveData coroutines builder, это дает простой код и лучшее поведение по умолчанию.

А что если, репозиторий возвращает поток значений в форме Flow (вместо suspend функций с единственным значением)? В этом случае также возможно сконвертировать поток в LiveData и использовать все преимущества перечисленные выше, используя asLiveData() функцию-расширение.

Внутри asLiveData() также использует LiveData coroutines builder для создания простой корутины обрабатывающий Flow пока LiveData активна:

Но давайте остановимся ненадолго – что такое Flow и можно ли им полностью заменить LiveData?

Введение в Kotlin Flow

Flow это класс из библиотеки Kotlin Coroutines представленной в 2019 году, класс является потоком значений, вычисляемый асинхронно. Концептуально похож на RxJava Observable, но основан на корутинах и имеет более простой API.

Читайте также:  при какой температуре выпекается выпечка

Изначально были доступны только холодные потоки (cold flows): потоки без состояний, которые создаются по требованию каждый раз, когда наблюдатель начинает собирать значения в области видимости (scope) корутины. Каждый наблюдатель получает собственную последовательность значений, они не общие.

SharedFlow публикует данные, которые распространяются всем слушателям. Класс может управлять дополнительным кешем и/или буфером и фактически заменяет все варианты устаревшего BroadcastChannel API.

StateFlow и LiveData много общего:

Эти классы наблюдаемые (observable)

Они хранят и распространяют последнее значение любому количеству наблюдателей

Но есть и важные отличия:

StateFlow не учитывает жизненный цикл (not lifecycle-aware). Однако, Flow может быть использовано в корутине с учетом жизненного цикла, это требует некоторого кода для настройки без использования LiveData (детали ниже).

Наблюдение за LiveData против сбора данных в Flow

Организовать наблюдение за экземпляром LiveData довольно просто:

Эта операция однократная и дальше LiveData берет на себя синхронизацию потока данных с жизненным циклом наблюдателей.

Аналогичная операция для Flow называется сбором (collecting) и сбор должен выполняться в корутине. Из-за того, что Flow не знает ничего о жизненном цикле, ответственность за жизненный цикл возлагают на корутину, работающую с Flow.

Чтобы создать корутину для работы с Flow, учитывающую жизненный цикл Activity/Fragment (запускать работу с данными при состоянии STARTED и автоматически отменять эту работу при уничтожении):

Однако, есть и другие виды Flow:

горячие потоки, которые всегда активные и посылают результаты всем текущим наблюдателями (включая приостановленные);

холодные потоки с колбэком или поддержкой канала, которые подписываются на активный источник данных, когда сбор данных запускается и останавливает подписку, когда сбор данных отменяется (не приостанавливается).

В этих случаях, основной производитель Flow будет оставаться активным даже когда корутина будет приостановлена, сохраняя (в буфер) новые результаты в фоновом режиме. Ресурсы расходуются впустую, правило #2 нарушается.

Как видно, эффективно и безопасно работать с данными в Actvivity или Fragment проще с помощью LiveData.

Заменяем LiveData на StateFlow во ViewModel

Давайте-ка вернемся к ViewModel. Мы убедились, что это простой и эффективный способ работы с данными в асинхронном режиме:

Похоже, что мы сумели выполнить 3 наших правила и воспроизвести почти такое же поведение как у LiveData с использованием более сложного кода.

Это проблематично? Для простых случаев нет, Activity или Fragment могут сделать дополнительную проверку, чтобы не делать лишнее обновление UI, если данные не изменились.

Проблемы возникают в более сложных, реальных случаях, как мы увидим в следующем разделе.

Использование StateFlow как триггер во ViewModel

MutableLiveData для этого работает очень хорошо:

При обновлении, оператор switchMap() подключает наблюдателей к новому источнику LiveData, заменяя старый. И так как в примере выше, используется LiveData coroutine builder, старая LiveData автоматически отменит связанную с ним корутину через 5 секунд после отключения от своих наблюдателей. Работа с устаревшими данными прекращается с небольшой задержкой.

Этот код достаточно прост и соблюдает все правила по эффективности выше.

Наивный подход

API MutableLiveData и MutableLiveData выглядят очень похоже, код триггера выглядит почти одинаково. Самое большое различие это использование mapLatest, это эквивалент функции switchMap() в LiveData для возвращения единственного значения (для возвращения нескольких значений, надо использовать flatMapLatest).

Вроде выглядит неплохо. Однако здесь всплывает основная проблема: так как StateFlow не поддерживает версионность, триггер отправит повторно последнее значение, когда Flow перезапустится. Это случается каждый раз, когда Activity/Frgament становится видимым опять, после того, как был невидимым более 5 секунд.

Триггер выдает значение повторно, mapLatest() снова запускается, еще раз дергается метод в репозитории с теми же аргументами, хотя результат уже был получен и обработан. Правило #1 не работает: актуальные данные не должны загружаться повторно.

Чиним повторную отправку последнего сообщения

Вопросы приходящие на ум: должны ли мы предотвращать повторную отправку и как это сделать? StateFlow уже позаботился об этом внутри коллекции flow и оператор distinctUntilChanged() делает то же самое для других типов flow. При этом нет стандартного оператора для отмены повторной отправки среди множества коллекций одного и того же flow, потому что flow коллекции должны быть самодостаточные. Это главная разница с LiveData.

К сожалению, логика в коде выше несовершенна и перестанет работать, как задумано, когда трансформация flow будет отменена до завершения.

TL;DR Использование StateFlow в качестве триггера в ViewModel приводит к дублирующейся работе каждый раз когда Activity/Fragment становится видимой повторно и здесь нет простого пути избежать такого поведения.

Выводы

Мои рекомендации на основании примеров выше:

Продолжайте использовать LiveData в вашем Android-UI слое и ViewModels, особенно в качестве триггера. Используйте это везде, для передачи данных в Activity/Fragment: код будет простым и эффективным;

LiveData coroutine builder ваш друг и может заменить Flows во ViewModels в большинстве случаев;

Вы можете использовать мощь Flow операторов при необходимости, конвертируя Flows в LiveData;

Теперь вы знаете все компромиcсы при переходе от LiveData к подходу «полностью на Flow» в вашем Android-UI слое.

Источник

Введение в Kotlin Flow

Into the Flow: Kotlin cold streams primer — одна из лучших статей о новой возможности Kotlin под названием Flow.

Введение в Kotlin Flow

В Kotlin уже есть мощный механизм асинхронного программирования под названием короутины (coroutines). Они позволяют писать чистый асинхронный неблокируемый код, построенный на последовательном вызове функций. Например, мы можем объявить такую функцию:

Допустим, она будет выполнять сложные расчеты в фоне, а затем вернет результат в виде списка. Это отлично работает, за исключением того, что функция должна заранее выполнить расчет всех элементов списка. Если элементов слишком много или мы имеем дело с неопределенным количеством элементов, начинаются проблемы.

Для их решения в Kotlin есть другой инструмент под названием Flow (поток):

Именно поэтому разработчики Kotlin называют потоки холодными, в противовес горячим каналам (Channel), которые также присутствуют в языке.

Для создания самого потока можно использовать функцию flow :

В данном случае функция «выпускает» в поток три объекта типа Int с перерывом в 100 миллисекунд. Обрати внимание, что flow — это suspend-функция, которая может запускать другие suspend-функции (в данном случае delay ( ) ).

Как уже было сказано выше, функция, возвращающая поток, не должна быть suspend-функцией. Но метод collect ( ) объекта типа Flow — suspend-функция, которая должна работать внутри CoroutineScope :

Создать поток можно и другими способами, например с помощью метода asFlow ( ) :

По умолчанию функции flow и collect запускаются внутри текущего CoroutineScope, но его можно изменить, используя метод flowOn ( ) :

Несколько потоков можно объединить в один с помощью метода zip ( ) :

Этот код объединит каждый элемент первого потока с соответствующим элементом второго потока:

Другой вариант объединения — функция combine ( ) :

В данном случае каждый элемент первого потока будет объединен с последним элементом второго потока:

После трансформации потоков мы можем получить структуры данных, включающие в себя потоки потоков ( Flow Flow X > > ). Чтобы «выровнять» такие данные, можно использовать один из следующих методов:

На этом все. Теперь вы знаете, что представляет Kotlin Flow.

Источник

Asynchronous Flow

A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values? This is where Kotlin Flows come in.

Representing multiple values

Multiple values can be represented in Kotlin using collections. For example, we can have a simple function that returns a List of three numbers and then print them all using forEach:

You can get the full code from here.

Sequences

If we are computing the numbers with some CPU-consuming blocking code (each computation taking 100ms), then we can represent the numbers using a Sequence:

You can get the full code from here.

This code outputs the same numbers, but it waits 100ms before printing each one.

Suspending functions

However, this computation blocks the main thread that is running the code. When these values are computed by asynchronous code we can mark the simple function with a suspend modifier, so that it can perform its work without blocking and return the result as a list:

You can get the full code from here.

This code prints the numbers after waiting for a second.

Flows

Using the List result type, means we can only return all the values at once. To represent the stream of values that are being asynchronously computed, we can use a Flow type just like we would use the Sequence type for synchronously computed values:

You can get the full code from here.

This code waits 100ms before printing each number without blocking the main thread. This is verified by printing «I’m not blocked» every 100ms from a separate coroutine that is running in the main thread:

Notice the following differences in the code with the Flow from the earlier examples:

A builder function for Flow type is called flow.

The simple function is no longer marked with suspend modifier.

Values are emitted from the flow using emit function.

Values are collected from the flow using collect function.

Flows are cold

Flows are cold streams similar to sequences — the code inside a flow builder does not run until the flow is collected. This becomes clear in the following example:

You can get the full code from here.

This is a key reason the simple function (which returns a flow) is not marked with suspend modifier. By itself, simple() call returns quickly and does not wait for anything. The flow starts every time it is collected, that is why we see «Flow started» when we call collect again.

Читайте также:  какой кенгуру для ребенка лучше выбрать

Flow cancellation basics

Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be cancelled when the flow is suspended in a cancellable suspending function (like delay). The following example shows how the flow gets cancelled on a timeout when running in a withTimeoutOrNull block and stops executing its code:

You can get the full code from here.

Notice how only two numbers get emitted by the flow in the simple function, producing the following output:

See Flow cancellation checks section for more details.

Flow builders

flowOf builder that defines a flow emitting a fixed set of values.

So, the example that prints the numbers from 1 to 3 from a flow can be written as:

You can get the full code from here.

Intermediate flow operators

Flows can be transformed with operators, just as you would with collections and sequences. Intermediate operators are applied to an upstream flow and return a downstream flow. These operators are cold, just like flows are. A call to such an operator is not a suspending function itself. It works quickly, returning the definition of a new transformed flow.

The basic operators have familiar names like map and filter. The important difference to sequences is that blocks of code inside these operators can call suspending functions.

For example, a flow of incoming requests can be mapped to the results with the map operator, even when performing a request is a long-running operation that is implemented by a suspending function:

You can get the full code from here.

It produces the following three lines, each line appearing after each second:

Transform operator

Among the flow transformation operators, the most general one is called transform. It can be used to imitate simple transformations like map and filter, as well as implement more complex transformations. Using the transform operator, we can emit arbitrary values an arbitrary number of times.

For example, using transform we can emit a string before performing a long-running asynchronous request and follow it with a response:

You can get the full code from here.

The output of this code is:

Size-limiting operators

You can get the full code from here.

Terminal flow operators

Terminal operators on flows are suspending functions that start a collection of the flow. The collect operator is the most basic one, but there are other terminal operators, which can make it easier:

Conversion to various collections like toList and toSet.

Operators to get the first value and to ensure that a flow emits a single value.

Reducing a flow to a value with reduce and fold.

You can get the full code from here.

Prints a single number:

Flows are sequential

Each individual collection of a flow is performed sequentially unless special operators that operate on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator. No new coroutines are launched by default. Each emitted value is processed by all the intermediate operators from upstream to downstream and is then delivered to the terminal operator after.

See the following example that filters the even integers and maps them to strings:

You can get the full code from here.

Flow context

Collection of a flow always happens in the context of the calling coroutine. For example, if there is a simple flow, then the following code runs in the context specified by the author of this code, regardless of the implementation details of the simple flow:

This property of a flow is called context preservation.

You can get the full code from here.

Running this code produces:

Since simple().collect is called from the main thread, the body of simple ‘s flow is also called in the main thread. This is the perfect default for fast-running or asynchronous code that does not care about the execution context and does not block the caller.

Wrong emission withContext

Try running the following code:

You can get the full code from here.

This code produces the following exception:

flowOn operator

The exception refers to the flowOn function that shall be used to change the context of the flow emission. The correct way to change the context of a flow is shown in the example below, which also prints the names of the corresponding threads to show how it all works:

You can get the full code from here.

Another thing to observe here is that the flowOn operator has changed the default sequential nature of the flow. Now collection happens in one coroutine («coroutine#1») and emission happens in another coroutine («coroutine#2») that is running in another thread concurrently with the collecting coroutine. The flowOn operator creates another coroutine for an upstream flow when it has to change the CoroutineDispatcher in its context.

Buffering

Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when the emission by a simple flow is slow, taking 100 ms to produce an element; and collector is also slow, taking 300 ms to process an element. Let’s see how long it takes to collect such a flow with three numbers:

You can get the full code from here.

It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each):

We can use a buffer operator on a flow to run emitting code of the simple flow concurrently with collecting code, as opposed to running them sequentially:

You can get the full code from here.

It produces the same numbers just faster, as we have effectively created a processing pipeline, having to only wait 100 ms for the first number and then spending only 300 ms to process each number. This way it takes around 1000 ms to run:

Note that the flowOn operator uses the same buffering mechanism when it has to change a CoroutineDispatcher, but here we explicitly request buffering without changing the execution context.

Conflation

When a flow represents partial results of the operation or operation status updates, it may not be necessary to process each value, but instead, only most recent ones. In this case, the conflate operator can be used to skip intermediate values when a collector is too slow to process them. Building on the previous example:

You can get the full code from here.

We see that while the first number was still being processed the second, and third were already produced, so the second one was conflated and only the most recent (the third one) was delivered to the collector:

Processing the latest value

Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values. The other way is to cancel a slow collector and restart it every time a new value is emitted. There is a family of xxxLatest operators that perform the same essential logic of a xxx operator, but cancel the code in their block on a new value. Let’s try changing conflate to collectLatest in the previous example:

You can get the full code from here.

Since the body of collectLatest takes 300 ms, but new values are emitted every 100 ms, we see that the block is run on every value, but completes only for the last value:

Composing multiple flows

There are lots of ways to compose multiple flows.

Just like the Sequence.zip extension function in the Kotlin standard library, flows have a zip operator that combines the corresponding values of two flows:

You can get the full code from here.

This example prints:

Combine

When flow represents the most recent value of a variable or operation (see also the related section on conflation), it might be needed to perform a computation that depends on the most recent values of the corresponding flows and to recompute it whenever any of the upstream flows emit a value. The corresponding family of operators is called combine.

For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms, then zipping them using the zip operator will still produce the same result, albeit results that are printed every 400 ms:

We use a onEach intermediate operator in this example to delay each element and make the code that emits sample flows more declarative and shorter.

You can get the full code from here.

However, when using a combine operator here instead of a zip:

You can get the full code from here.

We get quite a different output, where a line is printed at each emission from either nums or strs flows:

Читайте также:  при смещении позвонков поясничного отдела какие симптомы

Flattening flows

Flows represent asynchronously received sequences of values, so it is quite easy to get in a situation where each value triggers a request for another sequence of values. For example, we can have the following function that returns a flow of two strings 500 ms apart:

Now if we have a flow of three integers and call requestFlow for each of them like this:

Then we end up with a flow of flows ( Flow > ) that needs to be flattened into a single flow for further processing. Collections and sequences have flatten and flatMap operators for this. However, due to the asynchronous nature of flows they call for different modes of flattening, as such, there is a family of flattening operators on flows.

flatMapConcat

Concatenating mode is implemented by flatMapConcat and flattenConcat operators. They are the most direct analogues of the corresponding sequence operators. They wait for the inner flow to complete before starting to collect the next one as the following example shows:

You can get the full code from here.

The sequential nature of flatMapConcat is clearly seen in the output:

flatMapMerge

Another flattening mode is to concurrently collect all the incoming flows and merge their values into a single flow so that values are emitted as soon as possible. It is implemented by flatMapMerge and flattenMerge operators. They both accept an optional concurrency parameter that limits the number of concurrent flows that are collected at the same time (it is equal to DEFAULT_CONCURRENCY by default).

You can get the full code from here.

The concurrent nature of flatMapMerge is obvious:

Note that the flatMapMerge calls its block of code ( < requestFlow(it) >in this example) sequentially, but collects the resulting flows concurrently, it is the equivalent of performing a sequential map < requestFlow(it) >first and then calling flattenMerge on the result.

flatMapLatest

In a similar way to the collectLatest operator, that was shown in «Processing the latest value» section, there is the corresponding «Latest» flattening mode where a collection of the previous flow is cancelled as soon as new flow is emitted. It is implemented by the flatMapLatest operator.

You can get the full code from here.

The output here in this example is a good demonstration of how flatMapLatest works:

Note that flatMapLatest cancels all the code in its block ( < requestFlow(it) >in this example) on a new value. It makes no difference in this particular example, because the call to requestFlow itself is fast, not-suspending, and cannot be cancelled. However, it would show up if we were to use suspending functions like delay in there.

Flow exceptions

Flow collection can complete with an exception when an emitter or code inside the operators throw an exception. There are several ways to handle these exceptions.

Collector try and catch

A collector can use Kotlin’s try/catch block to handle exceptions:

This code successfully catches an exception in collect terminal operator and, as we see, no more values are emitted after that:

Everything is caught

The previous example actually catches any exception happening in the emitter or in any intermediate or terminal operators. For example, let’s change the code so that emitted values are mapped to strings, but the corresponding code produces an exception:

You can get the full code from here.

This exception is still caught and collection is stopped:

Exception transparency

But how can code of the emitter encapsulate its exception handling behavior?

The emitter can use a catch operator that preserves this exception transparency and allows encapsulation of its exception handling. The body of the catch operator can analyze an exception and react to it in different ways depending on which exception was caught:

Exceptions can be turned into emission of values using emit from the body of catch.

Exceptions can be ignored, logged, or processed by some other code.

For example, let us emit the text on catching an exception:

You can get the full code from here.

The output of the example is the same, even though we do not have try/catch around the code anymore.

Transparent catch

Catching declaratively

We can combine the declarative nature of the catch operator with a desire to handle all the exceptions, by moving the body of the collect operator into onEach and putting it before the catch operator. Collection of this flow must be triggered by a call to collect() without parameters:

You can get the full code from here.

Flow completion

When flow collection completes (normally or exceptionally) it may need to execute an action. As you may have already noticed, it can be done in two ways: imperative or declarative.

Imperative finally block

You can get the full code from here.

This code prints three numbers produced by the simple flow followed by a «Done» string:

Declarative handling

For the declarative approach, flow has onCompletion intermediate operator that is invoked when the flow has completely collected.

The previous example can be rewritten using an onCompletion operator and produces the same output:

You can get the full code from here.

The key advantage of onCompletion is a nullable Throwable parameter of the lambda that can be used to determine whether the flow collection was completed normally or exceptionally. In the following example the simple flow throws an exception after emitting the number 1:

You can get the full code from here.

As you may expect, it prints:

The onCompletion operator, unlike catch, does not handle the exception. As we can see from the above example code, the exception still flows downstream. It will be delivered to further onCompletion operators and can be handled with a catch operator.

Successful completion

Another difference with catch operator is that onCompletion sees all exceptions and receives a null exception only on successful completion of the upstream flow (without cancellation or failure).

We can see the completion cause is not null, because the flow was aborted due to downstream exception:

Imperative versus declarative

Now we know how to collect flow, and handle its completion and exceptions in both imperative and declarative ways. The natural question here is, which approach is preferred and why? As a library, we do not advocate for any particular approach and believe that both options are valid and should be selected according to your own preferences and code style.

Launching flow

It is easy to use flows to represent asynchronous events that are coming from some source. In this case, we need an analogue of the addEventListener function that registers a piece of code with a reaction for incoming events and continues further work. The onEach operator can serve this role. However, onEach is an intermediate operator. We also need a terminal operator to collect the flow. Otherwise, just calling onEach has no effect.

As you can see, it prints:

The launchIn terminal operator comes in handy here. By replacing collect with launchIn we can launch a collection of the flow in a separate coroutine, so that execution of further code immediately continues:

The required parameter to launchIn must specify a CoroutineScope in which the coroutine to collect the flow is launched. In the above example this scope comes from the runBlocking coroutine builder, so while the flow is running, this runBlocking scope waits for completion of its child coroutine and keeps the main function from returning and terminating this example.

Note that launchIn also returns a Job, which can be used to cancel the corresponding flow collection coroutine only without cancelling the whole scope or to join it.

Flow cancellation checks

You can get the full code from here.

We get only numbers up to 3 and a CancellationException after trying to emit number 4:

However, most other flow operators do not do additional cancellation checks on their own for performance reasons. For example, if you use IntRange.asFlow extension to write the same busy loop and don’t suspend anywhere, then there are no checks for cancellation:

You can get the full code from here.

All numbers from 1 to 5 are collected and cancellation gets detected only before return from runBlocking :

Making busy flow cancellable

You can get the full code from here.

With the cancellable operator only the numbers from 1 to 3 are collected:

Flow and Reactive Streams

For those who are familiar with Reactive Streams or reactive frameworks such as RxJava and project Reactor, design of the Flow may look very familiar.

Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible, be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in Reactive Streams and Kotlin Flows article.

Источник

Сказочный портал