kotlin flow

Understanding Kotlin Flows

In this article we will explain what a Kotlin Flow is and how to use it, what benefits does it bring to us and some use cases where their use is sensible.

You will be able to understand when to use a Kotlin Flow, a Kotlin Sequence or just plain collections.

Let’s start!

Introduction

The main goal of a Kotlin Flow is to be able to obtain the result of multiple asynchronous operations as part of the same “stream” or “flow”. This means that a Kotlin Flow is the equivalent for what sequences are for synchronous operations.

To clarify this, inside sequences we can’t use suspend functions, whereas in flows we can directly call them. Flows are quite similar to sequences, although generally flows are more powerful.

We could say then that a Kotlin flow is a data stream of results generated from asynchronous operations. This stream of elements could be finite or infinite, but theoretically it should either be completed or cancelled with a failure at some point.

kotlin flow
Image Credit: Author

In the image shown above you can see how a Kotlin flow works. There’s a producer emitting values as the results from different asynchronous operations. On the other end of the flow, a consumer will be collecting the previously generated values as a stream of elements after subscribing to the stream by using collect method.

What’s their main benefit then? Couldn’t we just perform an operation in the background asynchronously and just return a list of elements?

Well, there are several reasons why Kotlin flows could help us enormously. Let’s go through some of them!

Benefits

There are several reasons to use Kotlin flows in any of our applications, some of them are the following:

  • Lower memory footprint
    The use of flows or sequences in general allow us to be able to process each element independently. This has a huge benefit in some applications, the memory footprint becomes much lower.
    Specially in big data applications, loading the whole set of data into memory is not an option, therefore the use of flows or sequences is highly recommended to avoid OutOfMemory errors. We process one element at a time, taking into heap memory just the amount of memory that a single element takes, instead of the whole collection.
  • Failfast
    Another characteristic of flows is that the producer won’t be able to emit a new value once an exception has been raised in any of the terminal operations in the data stream. This means that the whole flow gets cancelled as soon as just an error happened.
  • Structured concurrency
    We’ve already talked about this in two of our previous articles, so we’ll just mention that having a construct that implements structured concurrency allow us to transparently obtain a safer code with no resource leaks.

Now that we understand some of its benefits, let’s try to understand how they work internally.

Internals

One of the main characteristics of Kotlin flows is that they’re fully compliant with the Reactive Streams initiative. The goal of Reactive Streams initiative is to define some standards in the process of exchanging data between two threads or thread pools within an asynchronous context.

For additional information around reactive standards, you can also check the Reactive Manifesto.

Before we continue, it’s also worth mentioning that the future addition of virtual threads to the JDK could change things radically for Reactive streams. You can read our article “Understand Java Virtual Threads: The Death of Async Programming” for further details.

As part of Reactive Streams, one of the most important aspects that must be implemented is backpressure. We’ll see how backpressure works very soon, but first let’s see the two principles which any implementation of Kotlin Flow should be built on.

Principles

Although Kotlin provides its own Flow implementation, Kotlin Flow is just an interface. A contract to which new implementers will have to adhere to:

public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}

Any implementation of Flow interface will have to fulfil two key principles:

  • Context preservation
  • Exception transparency

Let’s explain briefly what they mean.

Context preservation

Context preservation means that each flow has its own execution context, not leaking or propagating this context to the downstream.

This concept is related to something we’ve talked about previously, structured concurrency. You can read our article “Discover Structured Concurrency” if you want to know more about this topic.

The way that contexts work in Kotlin flows is very similar to Kotlin coroutines. Kotlin coroutines implements structured concurrency, therefore, our Kotlin flow will never leak resources once it’s completed.

A Kotlin flow always uses its own execution context, which will never be leaked. There is only one way to override the context of a flow, and this is by using flowOn method.

If you feel that you need to understand Kotlin coroutines better before continuing reading, you can read our article “Understanding Kotlin Coroutines”.

Exception transparency

When an exception is raised on the producer, the flow must be stopped immediately and stop emitting new values. The exception will be propagated by the flow, if for any reason you need to emit a value when something fails, you can use catch operator and do anything you need passing in a function to this operator.

For instance, this example in Kotlin Flow documentation uses catch:

flow { emitData() }
    .map { computeOne(it) }
    .catch { doSomething() } 
    .map { computeTwo(it) }
    .collect { process(it) } // throws exceptions from process and computeTwo

Kotlin Flow will raise an IllegalStateException when a producer tries to emit a value once an exception has already been raised previously, enforcing in this way exception transparency at runtime.

Now that we understand the two principles that compose Kotlin flows, let’s get back to backpressure and try to understand it better.

Backpressure

As we just said, one of the main components of this initiative is the implementation of backpressure. We don’t want consumers to buffer an unlimited amount of data and at the same time. Also, specially in the JVM world, we need to guarantee that we can work safely with a fixed number of threads in our thread pools.

This is the main reason why backpressure must be enforced in any implementation of Reactive streams, to guarantee that producers can “slow down” if the consumers are not able to keep up with the pace at which producers are producing messages. Backpressure is a key component in the resilience of any Reactive stream.

We will show now a brief example of how backpressure works in Kotlin flows.

The first thing you will need to do is to import coroutines dependency, in Gradle it looks like this:

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4'

Once we have access to coroutines libraries, we can create a flow in our producer:

package com.theboreddev.flow

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow

class Producer {

    fun produce(): Flow<Int> {
        return flow {
            (1..10000).forEach { value ->
                println("Emitting value $value")
                emit(value)
            }
        }
    }
}

In this example we are creating a flow in our producer that will producer 10,000 elements without any delay.

Now we will create a consumer, but a very slow consumer. We’ll achieve that by adding a delay when the consumer processes the element.

package com.theboreddev.flow

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.withContext

class Consumer {

    suspend fun subscribe(flow: Flow<Int>) {
        withContext(Dispatchers.IO) {
            flow.collect { value ->
                delay(1000)
                println("Consumed value $value")
            }
        }
    }
}

Our consumer will wait one second before informing us that the element has been consumed. Initially we could expect that our consumer will have to buffer elements or maybe get overflown, but Kotlin will do everything for us by “slowing down” the producer.

We will write a test to simulate this scenario:

class BackpressureTest {

    private val producer = Producer()
    private val consumer = Consumer()

    @Test
    fun `should be able to handle when producers are much faster than consumers`() {

        val flow = producer.produce()

        runBlocking {
            consumer.subscribe(flow)
        }
    }
}

If we run this test, the output looks like this:

Emitting value 1
Consumed value 1
Emitting value 2
Consumed value 2
Emitting value 3
Consumed value 3
...

The producer will wait until the consumer consumes the first element to produce the next one. That’s great, right?

As you can see, Kotlin coroutines is doing this work for us to make our stream safe and resilient.

What happens if we add an additional consumer?

class BackpressureTest {

    private val producer = Producer()
    private val consumer1 = Consumer("consumer 1")
    private val consumer2 = Consumer("consumer 2")

    @Test
    fun `should be able to handle when producers are much faster than consumers`() {

        val flow = producer.produce()

        runBlocking {
            launch {
                consumer1.subscribe(flow)
            }
            launch {
                consumer2.subscribe(flow)
            }
        }
    }
}

What would you expect in this case? Both consumers are “subscribed” to the same flow, does it mean that they will consume one element each every time?

Not really. Every consumer will have independent “subscriptions”, therefore, the same element will be consumed by both consumers. The output in this case will look like this:

Emitting value 1
Emitting value 1
Consumed value 1 by consumer consumer 1
Emitting value 2
Consumed value 1 by consumer consumer 2
Emitting value 2
Consumed value 2 by consumer consumer 2
Emitting value 3
Consumed value 2 by consumer consumer 1

As you will notice, backpressure is independent for each consumer. This means that if one of the consumer is faster than the other, the slowness of the second consumer won’t affect the other consumer at all.

These “standard” Kotlin flows are of a type we call “cold” flows because they only get started once a collector has subscribed to consume or process its messages.

What would we need if we need multiple consumers sharing the same flow then? To achieve that, we would need to use channels. For now we’ll focus on flows, but we promise to be talking about channels very soon!

Conclusion

In this article we’ve tried to explain as clearly as possible what a Kotlin Flow is and what are the main benefits of using it. On our journey we’ve discovered some interesting terms like structured concurrency, context preservation, exception transparency or even backpressure.

We hope that you’ve found all of these terms interesting if you didn’t know them already. If you did, we hope we managed to make them clearer in your mind.

This is all from us today, hoping to see you back with us very soon! As always, if you’re interested in reading more of our articles, please subscribe with your email to get notified when a new article gets published!

Thanks for reading us!