Asynchronous Data Streams with Kotlin Flow
A suspending function returns a single value asynchronously. To return multiple computed values asynchronously, Kotlin provides the Flow type.
Representing Multiple Values
Collections in Kotlin can represent multiple values. For example, a function that returns a list of numbers:
fun fetchNumbers(): List<Int> = listOf(10, 20, 30)
fun main() {
fetchNumbers().forEach { num -> println(num) }
}
Output:
10
20
30
Sequences for Synchronous Computation
If numbers are computed with CPU-intensive blocking code, a Sequence can be used:
fun fetchNumbers(): Sequence<Int> = sequence {
for (i in 1..3) {
Thread.sleep(200) // Simulate computation
yield(i)
}
}
fun main() {
fetchNumbers().forEach { num -> println(num) }
}
This prints the same numbers but waits 200 ms before each.
Suspending Functions for Asynchronous Results
When values are computed asynchronously, mark the function with suspend:
suspend fun fetchNumbers(): List<Int> {
delay(1200) // Simulate async work
return listOf(10, 20, 30)
}
fun main() = runBlocking {
fetchNumbers().forEach { num -> println(num) }
}
This prints after a 1.2-second delay.
Using Flow for Asynchronous Streams
List returns all values at once. For an asynchronous stream of values, use Flow:
fun numberStream(): Flow<Int> = flow {
for (i in 1..3) {
delay(150)
emit(i)
}
}
fun main() = runBlocking {
launch {
for (j in 1..3) {
println("Main thread active $j")
delay(150)
}
}
numberStream().collect { num -> println(num) }
}
Output:
Main thread active 1
1
Main thread active 2
2
Main thread active 3
3
Key differences with Flow:
- The
flowbuilder function. - Code inside
flow { ... }can suspend. - The emitting function is not marked
suspend. - Values are emitted using
emit. - Values are collected using
collect.
Flows are Cold
Flow is a cold stream—code inside a flow builder runs only when the flow is collected.
fun numberStream(): Flow<Int> = flow {
println("Stream initialization")
for (i in 1..3) {
delay(120)
emit(i)
}
}
fun main() = runBlocking {
println("Requesting stream...")
val stream = numberStream()
println("Starting collection...")
stream.collect { num -> println(num) }
println("Collecting again...")
stream.collect { num -> println(num) }
}
Output:
Requesting stream...
Starting collection...
Stream initialization
1
2
3
Collecting again...
Stream initialization
1
2
3
The stream starts anew on each collection.
Flow Cancellation
Flow collection adheres to cooperative cancellation. Collection can be cancelled when the flow is suspended in a cancellable suspending function.
fun numberStream(): Flow<Int> = flow {
for (i in 1..3) {
delay(130)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking {
withTimeoutOrNull(300) {
numberStream().collect { num -> println(num) }
}
println("Operation completed")
}
Output:
Emitting 1
1
Emitting 2
2
Operation completed
Flow Builders
Basic builders include flowOf and .asFlow().
fun main() = runBlocking {
(1..3).asFlow().collect { num -> println(num) }
}
Intermediate Flow Operators
Operators like map and filter transform upstream flows into downstream flows. Code within these operators can call suspending functions.
suspend fun processRequest(req: Int): String {
delay(1100)
return "Result for $req"
}
fun main() = runBlocking {
(1..3).asFlow()
.map { req -> processRequest(req) }
.collect { res -> println(res) }
}
Output (each line appears after ~1.1 seconds):
Result for 1
Result for 2
Result for 3
The Transform Operator
transform can emit any value any number of times.
fun main() = runBlocking {
(1..3).asFlow()
.transform { req ->
emit("Starting request $req")
emit(processRequest(req))
}
.collect { output -> println(output) }
}
Output:
Starting request 1
Result for 1
Starting request 2
Result for 2
Starting request 3
Result for 3
Size-Limiting Operators
Operators like take cancel flow execution when the limit is reached.
fun numberFlow(): Flow<Int> = flow {
try {
emit(10)
emit(20)
println("This line won't execute")
emit(30)
} finally {
println("Cleanup in numberFlow")
}
}
fun main() = runBlocking {
numberFlow()
.take(2)
.collect { num -> println(num) }
}
Output:
10
20
Cleanup in numberFlow
Terminal Flow Operators
Terminal operators are suspending functions that start collection. Besides collect, others include toList, first, reduce, and fold.
fun main() = runBlocking {
val total = (1..5).asFlow()
.map { it * it }
.reduce { acc, value -> acc + value }
println(total)
}
Output:
55
Flow are Sequential
Each collection proceeds sequentially unless operators specifically enable concurrency.
fun main() = runBlocking {
(1..5).asFlow()
.filter {
println("Filtering $it")
it % 2 == 0
}
.map {
println("Mapping $it")
"text $it"
}
.collect { str -> println("Collecting $str") }
}
Output:
Filtering 1
Filtering 2
Mapping 2
Collecting text 2
Filtering 3
Filtering 4
Mapping 4
Collecting text 4
Filtering 5
Flow Context
Flow collection occurs in the context of the calling coroutine. This property is called context preservation.
fun simpleFlow(): Flow<Int> = flow {
log("Flow started")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking {
simpleFlow().collect { num -> log("Collected $num") }
}
Output shows both logs on the main thread.
Incorrect Context Emission
Emitting from a different context inside flow { ... } violates the context preservation property and throws an exception.
fun wrongFlow(): Flow<Int> = flow {
withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(150)
emit(i)
}
}
}
The flowOn Operator
Use flowOn to change the context for flow emission.
fun numberFlow(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(150)
log("Emitting $i")
emit(i)
}
}.flowOn(Dispatchers.Default)
fun main() = runBlocking {
numberFlow().collect { num -> log("Collected $num") }
}
Emission occurs on a background thread while collection is on the main thread.
Buffering
Use buffer to run emission and collection concurrently.
fun slowEmitter(): Flow<Int> = flow {
for (i in 1..3) {
delay(110)
emit(i)
}
}
fun main() = runBlocking {
val duration = measureTimeMillis {
slowEmitter()
.buffer()
.collect { num ->
delay(320)
println(num)
}
}
println("Collected in $duration ms")
}
Conflation
Use conflate to skip intermediate values when the collector is slow.
fun main() = runBlocking {
val duration = measureTimeMillis {
slowEmitter()
.conflate()
.collect { num ->
delay(320)
println(num)
}
}
println("Collected in $duration ms")
}
Output may skip the second value.
Processing the Latest Value
collectLatest cancels its block and restarts on each new emission.
fun main() = runBlocking {
val duration = measureTimeMillis {
slowEmitter()
.collectLatest { num ->
println("Processing $num")
delay(320)
println("Finished $num")
}
}
println("Collected in $duration ms")
}
Combining Multiple Flows
Zip
Combines corresponding values from two flows.
fun main() = runBlocking {
val numbers = (1..3).asFlow()
val labels = flowOf("A", "B", "C")
numbers.zip(labels) { a, b -> "$a -> $b" }
.collect { combo -> println(combo) }
}
Output:
1 -> A
2 -> B
3 -> C
Combine
Recomputes whenever either flow emits a new value.
fun main() = runBlocking {
val numbers = (1..3).asFlow().onEach { delay(310) }
val labels = flowOf("X", "Y", "Z").onEach { delay(410) }
val start = System.currentTimeMillis()
numbers.combine(labels) { a, b -> "$a & $b" }
.collect { combo ->
println("$combo at ${System.currentTimeMillis() - start} ms")
}
}
Flattening Flows
Flatten a flow of flows into a single flow.
fun dataStream(id: Int): Flow<String> = flow {
emit("$id: Start")
delay(520)
emit("$id: End")
}
flatMapConcat
Processes inner flows sequentially.
fun main() = runBlocking {
val start = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(110) }
.flatMapConcat { dataStream(it) }
.collect { value ->
println("$value at ${System.currentTimeMillis() - start} ms")
}
}
flatMapMerge
Concurrently collects all inner flows.
fun main() = runBlocking {
val start = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(110) }
.flatMapMerge { dataStream(it) }
.collect { value ->
println("$value at ${System.currentTimeMillis() - start} ms")
}
}
flatMapLatest
Cancels the previous inner flow when a new one arrives.
fun main() = runBlocking {
val start = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(110) }
.flatMapLatest { dataStream(it) }
.collect { value ->
println("$value at ${System.currentTimeMillis() - start} ms")
}
}
Flow Exceptions
Collector try/catch
fun faultyStream(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking {
try {
faultyStream().collect { num ->
println(num)
check(num <= 1) { "Received $num" }
}
} catch (e: Throwable) {
println("Caught: $e")
}
}
Output:
Emitting 1
1
Emitting 2
2
Caught: java.lang.IllegalStateException: Received 2
The catch Operator
Preserves exception transparency.
fun main() = runBlocking {
faultyStream()
.catch { e -> emit("Recovered: $e") }
.collect { output -> println(output) }
}
Declarative Catching
fun main() = runBlocking {
faultyStream()
.onEach { num ->
check(num <= 1) { "Received $num" }
println(num)
}
.catch { e -> println("Caught: $e") }
.collect()
}
Flow Completion
Imperative finally Block
fun main() = runBlocking {
try {
(1..3).asFlow().collect { num -> println(num) }
} finally {
println("Done")
}
}
Declarative onCompletion
fun main() = runBlocking {
(1..3).asFlow()
.onCompletion { println("Stream finished") }
.collect { num -> println(num) }
}
onCompletion receives a nullable Throwable parameter.
Launching Flows
Use launchIn to start collection in a separate coroutine.
fun eventStream(): Flow<Int> = (1..3).asFlow().onEach { delay(120) }
fun main() = runBlocking {
eventStream()
.onEach { event -> println("Event: $event") }
.launchIn(this)
println("Ready")
}
Output:
Ready
Event: 1
Event: 2
Event: 3