This guide explains key differences between Kotlin coroutines and reactive streams and shows how they can be used together for greater good. Prior familiarity with basic coroutine concepts that are covered in Guide to kotlinx.coroutines is not required, but is a big plus. If you are familiar with reactive streams, you may find this guide a better introduction into the world of coroutines.
There are several modules in kotlinx.coroutines
project that are related to reactive streams:
- kotlinx-coroutines-reactive -- utilities for Reactive Streams
- kotlinx-coroutines-reactor -- utilities for Reactor
- kotlinx-coroutines-rx1 -- utilities for RxJava 1.x
- kotlinx-coroutines-rx2 -- utilities for RxJava 2.x
This guide is mostly based on Reactive Streams specification and uses
its Publisher
interface with some examples based on RxJava 2.x,
which implements reactive streams specification.
You are welcome to clone
kotlinx.coroutines
project
from GitHub to your workstation in order to
run all the presented examples. They are contained in
reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide
directory of the project.
This section outlines key differences between reactive streams and coroutine-based channels.
The Channel is somewhat similar concept to the following reactive stream classes:
- Reactive stream Publisher;
- Rx Java 1.x Observable;
- Rx Java 2.x Flowable, which implements
Publisher
.
They all describe an asynchronous stream of elements (aka items in Rx), either infinite or finite, and all of them support backpressure.
However, the Channel
always represents a hot stream of items, using Rx terminology. Elements are being sent
into the channel by producer coroutines and are received from it by consumer coroutines.
Every receive invocation consumes an element from the channel.
Let us illustrate it with the following example:
fun main(args: Array<String>) = runBlocking<Unit> {
// create a channel that produces numbers from 1 to 3 with 200ms delays between them
val source = produce<Int>(coroutineContext) {
println("Begin") // mark the beginning of this coroutine in output
for (x in 1..3) {
delay(200) // wait for 200ms
send(x) // send number x to the channel
}
}
// print elements from the source
println("Elements:")
source.consumeEach { // consume elements from it
println(it)
}
// print elements from the source AGAIN
println("Again:")
source.consumeEach { // consume elements from it
println(it)
}
}
You can get full code here
This code produces the following output:
Elements:
Begin
1
2
3
Again:
Notice, how "Begin" line was printed just once, because produce coroutine builder, when it is executed, launches one coroutine to produce a stream of elements. All the produced elements are consumed with ReceiveChannel.consumeEach extension function. There is no way to receive the elements from this channel again. The channel is closed when the producer coroutine is over and the attempt to receive from it again cannot receive anything.
Let us rewrite this code using publish coroutine builder from kotlinx-coroutines-reactive
module
instead of produce from kotlinx-coroutines-core
module. The code stays the same,
but where source
used to have ReceiveChannel type, it now has reactive streams
Publisher
type.
fun main(args: Array<String>) = runBlocking<Unit> {
// create a publisher that produces numbers from 1 to 3 with 200ms delays between them
val source = publish<Int>(coroutineContext) {
// ^^^^^^^ <--- Difference from the previous examples is here
println("Begin") // mark the beginning of this coroutine in output
for (x in 1..3) {
delay(200) // wait for 200ms
send(x) // send number x to the channel
}
}
// print elements from the source
println("Elements:")
source.consumeEach { // consume elements from it
println(it)
}
// print elements from the source AGAIN
println("Again:")
source.consumeEach { // consume elements from it
println(it)
}
}
You can get full code here
Now the output of this code changes to:
Elements:
Begin
1
2
3
Again:
Begin
1
2
3
This example highlights the key difference between a reactive stream and a channel. A reactive stream is a higher-order
functional concept. While the channel is a stream of elements, the reactive stream defines a recipe on how the stream of
elements is produced. It becomes the actual stream of elements on subscription. Each subscriber may receive the same or
a different stream of elements, depending on how the corresponding implementation of Publisher
works.
The publish coroutine builder, that is used in the above example, launches a fresh coroutine on each subscription. Every Publisher.consumeEach invocation creates a fresh subscription. We have two of them in this code and that is why we see "Begin" printed twice.
In Rx lingo this is called a cold publisher. Many standard Rx operators produce cold streams, too. We can iterate over them from a coroutine, and every subscription produces the same stream of elements.
Note, that we can replicate the same behaviour that we saw with channels by using Rx publish operator and connect method with it.
An example in the previous section uses source.consumeEach { ... }
snippet to open a subscription
and receive all the elements from it. If we need more control on how what to do with
the elements that are being received from the channel, we can use Publisher.openSubscription
as shown in the following example:
fun main(args: Array<String>) = runBlocking<Unit> {
val source = Flowable.range(1, 5) // a range of five numbers
.doOnSubscribe { println("OnSubscribe") } // provide some insight
.doFinally { println("Finally") } // ... into what's going on
var cnt = 0
source.openSubscription().use { channel -> // open channel to the source
for (x in channel) { // iterate over the channel to receive elements from it
println(x)
if (++cnt >= 3) break // break when 3 elements are printed
}
// `use` will close the channel when this block of code is complete
}
}
You can get full code here
It produces the following output:
OnSubscribe
1
2
3
Finally
With an explicit openSubscription
we should close the corresponding
subscription to unsubscribe from the source. However, instead of invoking close
explicitly,
this code relies on use
function from Kotlin's standard library.
The installed
doFinally
listener prints "Finally" to confirm that the subscription is actually being closed.
We do not need to use an explicit close
if iteration is performed over all the items that are emitted
by the publisher, because it is being closed automatically by consumeEach
:
fun main(args: Array<String>) = runBlocking<Unit> {
val source = Flowable.range(1, 5) // a range of five numbers
.doOnSubscribe { println("OnSubscribe") } // provide some insight
.doFinally { println("Finally") } // ... into what's going on
// iterate over the source fully
source.consumeEach { println(it) }
}
You can get full code here
We get the following output:
OnSubscribe
1
2
3
4
Finally
5
Notice, how "Finally" is printed before the last element "5". It happens because our main
function in this
example is a coroutine that we start with runBlocking coroutine builder.
Our main coroutine receives on the channel using source.consumeEach { ... }
expression.
The main coroutine is suspended while it waits for the source to emit an item.
When the last item is emitted by Flowable.range(1, 5)
it
resumes the main coroutine, which gets dispatched onto the main thread to print this
last element at a later point in time, while the source completes and prints "Finally".
Backpressure is one of the most interesting and complex aspects of reactive streams. Coroutines can suspend and they provide a natural answer to handling backpressure.
In Rx Java 2.x a backpressure-capable class is called
Flowable.
In the following example we use rxFlowable coroutine builder from kotlinx-coroutines-rx2
module to define a
flowable that sends three integers from 1 to 3.
It prints a message to the output before invocation of
suspending send function, so that we can study how it operates.
The integers are generated in the context of the main thread, but subscription is shifted
to another thread using Rx
observeOn
operator with a buffer of size 1.
The subscriber is slow. It takes 500 ms to process each item, which is simulated using Thread.sleep
.
fun main(args: Array<String>) = runBlocking<Unit> {
// coroutine -- fast producer of elements in the context of the main thread
val source = rxFlowable(coroutineContext) {
for (x in 1..3) {
send(x) // this is a suspending function
println("Sent $x") // print after successfully sent item
}
}
// subscribe on another thread with a slow subscriber using Rx
source
.observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
.doOnComplete { println("Complete") }
.subscribe { x ->
Thread.sleep(500) // 500ms to process each item
println("Processed $x")
}
delay(2000) // suspend the main thread for a few seconds
}
You can get full code here
The output of this code nicely illustrates how backpressure works with coroutines:
Sent 1
Processed 1
Sent 2
Processed 2
Sent 3
Processed 3
Complete
We see here how producer coroutine puts the first element in the buffer and is suspended while trying to send another one. Only after consumer processes the first item, producer sends the second one and resumes, etc.
RxJava has a concept of Subject which is an object that effectively broadcasts elements to all its subscribers. The matching concept in coroutines world is called a BroadcastChannel. There is a variety of subjects in Rx with BehaviorSubject being the one used to manage state:
fun main(args: Array<String>) {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two") // updates the state of BehaviorSubject, "one" value is lost
// now subscribe to this subject and print everything
subject.subscribe(System.out::println)
subject.onNext("three")
subject.onNext("four")
}
You can get full code here
This code prints the current state of the subject on subscription and all its further updates:
two
three
four
You can subscribe to subjects from a coroutine just as with any other reactive stream:
fun main(args: Array<String>) = runBlocking<Unit> {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two")
// now launch a coroutine to print everything
launch(Unconfined) { // launch coroutine in unconfined context
subject.consumeEach { println(it) }
}
subject.onNext("three")
subject.onNext("four")
}
You can get full code here
The result is the same:
two
three
four
Here we use Unconfined coroutine context to launch consuming coroutine with the same behaviour as subscription in Rx. It basically means that the launched coroutine is going to be immediately executed in the same thread that is emitting elements. Contexts are covered in more details in a separate section.
The advantage of coroutines is that it is easy to get conflation behavior for single-threaded UI updates. A typical UI application does not need to react to every state change. Only the most recent state is relevant. A sequence of back-to-back updates to the application state needs to get reflected in UI only once, as soon as the UI thread is free. For the following example we are going to simulate this by launching consuming coroutine in the context of the main thread and use yield function to simulate a break in the sequence of updates and to release the main thread:
fun main(args: Array<String>) = runBlocking<Unit> {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two")
// now launch a coroutine to print the most recent update
launch(coroutineContext) { // use the context of the main thread for a coroutine
subject.consumeEach { println(it) }
}
subject.onNext("three")
subject.onNext("four")
yield() // yield the main thread to the launched coroutine <--- HERE
subject.onComplete() // now complete subject's sequence to cancel consumer, too
}
You can get full code here
Now coroutine process (prints) only the most recent update:
four
The corresponding behavior in a pure coroutines world is implemented by ConflatedBroadcastChannel that provides the same logic on top of coroutine channels directly, without going through the bridge to the reactive streams:
fun main(args: Array<String>) = runBlocking<Unit> {
val broadcast = ConflatedBroadcastChannel<String>()
broadcast.offer("one")
broadcast.offer("two")
// now launch a coroutine to print the most recent update
launch(coroutineContext) { // use the context of the main thread for a coroutine
broadcast.consumeEach { println(it) }
}
broadcast.offer("three")
broadcast.offer("four")
yield() // yield the main thread to the launched coroutine
broadcast.close() // now close broadcast channel to cancel consumer, too
}
You can get full code here
It produces the same output as the previous example based on BehaviorSubject
:
four
Another implementation of BroadcastChannel is ArrayBroadcastChannel. It delivers every event to every
subscriber since the moment the corresponding subscription is open. It corresponds to
PublishSubject in Rx.
The capacity of the buffer in the constructor of ArrayBroadcastChannel
controls the numbers of elements
that can be sent before the sender is suspended waiting for receiver to receive those elements.
Full-featured reactive stream libraries, like Rx, come with a very large set of operators to create, transform, combine and otherwise process the corresponding streams. Creating your own operators with support for back-pressure is notoriously difficult.
Coroutines and channels are designed to provide an opposite experience. There are no built-in operators, but processing streams of elements is extremely simple and back-pressure is supported automatically without you having to explicitly think about it.
This section shows coroutine-based implementation of several reactive stream operators.
Let's roll out own implementation of
range
operator for reactive streams Publisher
interface. The asynchronous clean-slate implementation of this operator for
reactive streams is explained in
this blog post.
It takes a lot of code.
Here is the corresponding code with coroutines:
fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) send(x)
}
In this code CoroutineContext
is used instead of an Executor
and all the backpressure aspects are taken care
of by the coroutines machinery. Note, that this implementation depends only on the small reactive streams library
that defines Publisher
interface and its friends.
It is straightforward to use from a coroutine:
fun main(args: Array<String>) = runBlocking<Unit> {
range(CommonPool, 1, 5).consumeEach { println(it) }
}
You can get full code here
The result of this code is quite expected:
1
2
3
4
5
Reactive operators like
filter and
map
are trivial to implement with coroutines. For a bit of challenge and showcase, let us combine them
into the single fusedFilterMap
operator:
fun <T, R> Publisher<T>.fusedFilterMap(
context: CoroutineContext, // the context to execute this coroutine in
predicate: (T) -> Boolean, // the filter predicate
mapper: (T) -> R // the mapper function
) = publish<R>(context) {
consumeEach { // consume the source stream
if (predicate(it)) // filter part
send(mapper(it)) // map part
}
}
Using range
from the previous example we can test our fusedFilterMap
by filtering for even numbers and mapping them to strings:
fun main(args: Array<String>) = runBlocking<Unit> {
range(coroutineContext, 1, 5)
.fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" })
.consumeEach { println(it) } // print all the resulting strings
}
You can get full code here
It is not hard to see, that the result is going to be:
2 is even
4 is even
Let's implement our own version of takeUntil operator. It is quite a tricky one to implement, because of the need to track and manage subscription to two streams. We need to relay all the elements from the source stream until the other stream either completes or emits anything. However, we have select expression to rescue us in coroutines implementation:
fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
this@takeUntil.openSubscription().use { thisChannel -> // explicitly open channel to Publisher<T>
other.openSubscription().use { otherChannel -> // explicitly open channel to Publisher<U>
whileSelect {
otherChannel.onReceive { false } // bail out on any received element from `other`
thisChannel.onReceive { send(it); true } // resend element from this channel and continue
}
}
}
}
This code is using whileSelect as a nicer shortcut to while(select{...}) {}
loop and Kotlin's
use
expression to close the channels on exit, which unsubscribes from the corresponding publishers.
The following hand-written combination of
range with
interval
is used for testing. It is coded using a publish
coroutine builder
(its pure-Rx implementation is shown in later sections):
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
}
}
The following code shows how takeUntil
works:
fun main(args: Array<String>) = runBlocking<Unit> {
val slowNums = rangeWithInterval(coroutineContext, 200, 1, 10) // numbers with 200ms interval
val stop = rangeWithInterval(coroutineContext, 500, 1, 10) // the first one after 500ms
slowNums.takeUntil(coroutineContext, stop).consumeEach { println(it) } // let's test it
}
You can get full code here
Producing
1
2
There are always at least two ways for processing multiple streams of data with coroutines. One way involving select was shown in the previous example. The other way is just to launch multiple coroutines. Let us implement merge operator using the later approach:
fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
consumeEach { pub -> // for each publisher received on the source channel
launch(coroutineContext) { // launch a child coroutine
pub.consumeEach { send(it) } // resend all element from this publisher
}
}
}
Notice, the use of coroutineContext
in the invocation of launch coroutine builder. It is used to refer
to the CoroutineScope.coroutineContext that is provided by publish builder. This way, all the coroutines that are
being launched here are children of the publish
coroutine and will get cancelled when the publish
coroutine is cancelled or is otherwise completed.
Moreover, since parent coroutine waits until all children are complete, this implementation fully
merges all the received streams.
For a test, let us start with rangeWithInterval
function from the previous example and write a
producer that sends its results twice with some delay:
fun testPub(context: CoroutineContext) = publish<Publisher<Int>>(context) {
send(rangeWithInterval(context, 250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms
delay(100) // wait for 100 ms
send(rangeWithInterval(context, 500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms
delay(1100) // wait for 1.1s - done in 1.2 sec after start
}
The test code is to use merge
on testPub
and to display the results:
fun main(args: Array<String>) = runBlocking<Unit> {
testPub(coroutineContext).merge(coroutineContext).consumeEach { println(it) } // print the whole stream
}
You can get full code here
And the results should be:
1
2
11
3
4
12
13
All the example operators that are shown in the previous section have an explicit CoroutineContext parameter. In Rx world it roughly corresponds to a Scheduler.
The following example shows the basics of threading context management with Rx.
Here rangeWithIntervalRx
is an implementation of rangeWithInterval
function using Rx
zip
, range
, and interval
operators.
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
Flowable.zip(
Flowable.range(start, count),
Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
BiFunction { x, _ -> x })
fun main(args: Array<String>) {
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
You can get full code here
We are explicitly passing the
Schedulers.computation()
scheduler to our rangeWithIntervalRx
operator and
it is going to be executed in Rx computation thread pool. The output is going to be similar to the following one:
1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1
In the world of coroutines Schedulers.computation()
roughly corresponds to CommonPool,
so the previous example is similar to the following one:
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
}
}
fun main(args: Array<String>) {
Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
.subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
You can get full code here
The produced output is going to be similar to:
1 on thread ForkJoinPool.commonPool-worker-1
2 on thread ForkJoinPool.commonPool-worker-1
3 on thread ForkJoinPool.commonPool-worker-1
Here we've used Rx
subscribe
operator that does not have its own scheduler and operates on the same thread that the publisher -- on a CommonPool
in this example.
In Rx you use special operators to modify the threading context for operations in the chain. You can find some good guides about them, if you are not familiar.
For example, there is
observeOn
operator. Let us modify the previous example to observe using Schedulers.computation()
:
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
}
}
fun main(args: Array<String>) {
Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
.observeOn(Schedulers.computation()) // <-- THIS LINE IS ADDED
.subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
You can get full code here
Here is the difference in output, notice "RxComputationThreadPool":
1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1
A coroutine is always working in some context. For example, let us start a coroutine
in the main thread with runBlocking and iterate over the result of the Rx version of rangeWithIntervalRx
operator,
instead of using Rx subscribe
operator:
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
Flowable.zip(
Flowable.range(start, count),
Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
BiFunction { x, _ -> x })
fun main(args: Array<String>) = runBlocking<Unit> {
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.consumeEach { println("$it on thread ${Thread.currentThread().name}") }
}
You can get full code here
The resulting messages are going to be printed in the main thread:
1 on thread main
2 on thread main
3 on thread main
Most Rx operators do not have any specific thread (scheduler) associated with them and are working
in whatever thread that they happen to be invoked in. We've seen it on the example of subscribe
operator
in the threads with Rx section.
In the world of coroutines, Unconfined context serves a similar role. Let us modify our previous example,
but instead of iterating over the source Flowable
from the runBlocking
coroutine that is confined
to the main thread, we launch a new coroutine in Unconfined
context, while the main coroutine
simply waits its completion using Job.join:
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
Flowable.zip(
Flowable.range(start, count),
Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
BiFunction { x, _ -> x })
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch(Unconfined) { // launch new coroutine in Unconfined context (without its own thread pool)
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.consumeEach { println("$it on thread ${Thread.currentThread().name}") }
}
job.join() // wait for our coroutine to complete
}
You can get full code here
Now, the output shows that the code of the coroutine is executing in the Rx computation thread pool, just
like our initial example using Rx subscribe
operator.
1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1
Note, that Unconfined context shall be used with care. It may improve the overall performance on certain tests, due to the increased stack-locality of operations and less scheduling overhead, but it also produces deeper stacks and makes it harder to reason about asynchronicity of the code that is using it.
If a coroutine sends an element to a channel, then the thread that invoked the
send may start executing the code of a coroutine with Unconfined dispatcher.
The original producer coroutine that invoked send
is paused until the unconfined consumer coroutine hits its next
suspension point. This is very similar to a lock-step single-threaded onNext
execution in Rx world in the absense
of thread-shifting operators. It is a normal default for Rx, because operators are usually doing very small chunks
of work and you have to combine many operators for a complex processing. However, this is unusual with coroutines,
where you can have an arbitrary complex processing in a coroutine. Usually, you only need to chain stream-processing
coroutines for complex pipelines with fan-in and fan-out between multiple worker coroutines.