Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Asynchronous Data Streams with Kotlin Flow

Tech 1

A suspending function returns a single value asynchronously. To return multiple computed values asynchronously, Kotlin provides the Flow type.

Representing Multiple Values

Collections in Kotlin can represent multiple values. For example, a function that returns a list of numbers:

fun fetchNumbers(): List<Int> = listOf(10, 20, 30)

fun main() {
    fetchNumbers().forEach { num -> println(num) }
}

Output:

10
20
30

Sequences for Synchronous Computation

If numbers are computed with CPU-intensive blocking code, a Sequence can be used:

fun fetchNumbers(): Sequence<Int> = sequence {
    for (i in 1..3) {
        Thread.sleep(200) // Simulate computation
        yield(i)
    }
}

fun main() {
    fetchNumbers().forEach { num -> println(num) }
}

This prints the same numbers but waits 200 ms before each.

Suspending Functions for Asynchronous Results

When values are computed asynchronously, mark the function with suspend:

suspend fun fetchNumbers(): List<Int> {
    delay(1200) // Simulate async work
    return listOf(10, 20, 30)
}

fun main() = runBlocking {
    fetchNumbers().forEach { num -> println(num) }
}

This prints after a 1.2-second delay.

Using Flow for Asynchronous Streams

List returns all values at once. For an asynchronous stream of values, use Flow:

fun numberStream(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(150)
        emit(i)
    }
}

fun main() = runBlocking {
    launch {
        for (j in 1..3) {
            println("Main thread active $j")
            delay(150)
        }
    }
    numberStream().collect { num -> println(num) }
}

Output:

Main thread active 1
1
Main thread active 2
2
Main thread active 3
3

Key differences with Flow:

  • The flow builder function.
  • Code inside flow { ... } can suspend.
  • The emitting function is not marked suspend.
  • Values are emitted using emit.
  • Values are collected using collect.

Flows are Cold

Flow is a cold stream—code inside a flow builder runs only when the flow is collected.

fun numberStream(): Flow<Int> = flow {
    println("Stream initialization")
    for (i in 1..3) {
        delay(120)
        emit(i)
    }
}

fun main() = runBlocking {
    println("Requesting stream...")
    val stream = numberStream()
    println("Starting collection...")
    stream.collect { num -> println(num) }
    println("Collecting again...")
    stream.collect { num -> println(num) }
}

Output:

Requesting stream...
Starting collection...
Stream initialization
1
2
3
Collecting again...
Stream initialization
1
2
3

The stream starts anew on each collection.

Flow Cancellation

Flow collection adheres to cooperative cancellation. Collection can be cancelled when the flow is suspended in a cancellable suspending function.

fun numberStream(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(130)
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
    withTimeoutOrNull(300) {
        numberStream().collect { num -> println(num) }
    }
    println("Operation completed")
}

Output:

Emitting 1
1
Emitting 2
2
Operation completed

Flow Builders

Basic builders include flowOf and .asFlow().

fun main() = runBlocking {
    (1..3).asFlow().collect { num -> println(num) }
}

Intermediate Flow Operators

Operators like map and filter transform upstream flows into downstream flows. Code within these operators can call suspending functions.

suspend fun processRequest(req: Int): String {
    delay(1100)
    return "Result for $req"
}

fun main() = runBlocking {
    (1..3).asFlow()
        .map { req -> processRequest(req) }
        .collect { res -> println(res) }
}

Output (each line appears after ~1.1 seconds):

Result for 1
Result for 2
Result for 3

The Transform Operator

transform can emit any value any number of times.

fun main() = runBlocking {
    (1..3).asFlow()
        .transform { req ->
            emit("Starting request $req")
            emit(processRequest(req))
        }
        .collect { output -> println(output) }
}

Output:

Starting request 1
Result for 1
Starting request 2
Result for 2
Starting request 3
Result for 3

Size-Limiting Operators

Operators like take cancel flow execution when the limit is reached.

fun numberFlow(): Flow<Int> = flow {
    try {
        emit(10)
        emit(20)
        println("This line won't execute")
        emit(30)
    } finally {
        println("Cleanup in numberFlow")
    }
}

fun main() = runBlocking {
    numberFlow()
        .take(2)
        .collect { num -> println(num) }
}

Output:

10
20
Cleanup in numberFlow

Terminal Flow Operators

Terminal operators are suspending functions that start collection. Besides collect, others include toList, first, reduce, and fold.

fun main() = runBlocking {
    val total = (1..5).asFlow()
        .map { it * it }
        .reduce { acc, value -> acc + value }
    println(total)
}

Output:

55

Flow are Sequential

Each collection proceeds sequentially unless operators specifically enable concurrency.

fun main() = runBlocking {
    (1..5).asFlow()
        .filter {
            println("Filtering $it")
            it % 2 == 0
        }
        .map {
            println("Mapping $it")
            "text $it"
        }
        .collect { str -> println("Collecting $str") }
}

Output:

Filtering 1
Filtering 2
Mapping 2
Collecting text 2
Filtering 3
Filtering 4
Mapping 4
Collecting text 4
Filtering 5

Flow Context

Flow collection occurs in the context of the calling coroutine. This property is called context preservation.

fun simpleFlow(): Flow<Int> = flow {
    log("Flow started")
    for (i in 1..3) {
        emit(i)
    }
}

fun main() = runBlocking {
    simpleFlow().collect { num -> log("Collected $num") }
}

Output shows both logs on the main thread.

Incorrect Context Emission

Emitting from a different context inside flow { ... } violates the context preservation property and throws an exception.

fun wrongFlow(): Flow<Int> = flow {
    withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(150)
            emit(i)
        }
    }
}

The flowOn Operator

Use flowOn to change the context for flow emission.

fun numberFlow(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(150)
        log("Emitting $i")
        emit(i)
    }
}.flowOn(Dispatchers.Default)

fun main() = runBlocking {
    numberFlow().collect { num -> log("Collected $num") }
}

Emission occurs on a background thread while collection is on the main thread.

Buffering

Use buffer to run emission and collection concurrently.

fun slowEmitter(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(110)
        emit(i)
    }
}

fun main() = runBlocking {
    val duration = measureTimeMillis {
        slowEmitter()
            .buffer()
            .collect { num ->
                delay(320)
                println(num)
            }
    }
    println("Collected in $duration ms")
}

Conflation

Use conflate to skip intermediate values when the collector is slow.

fun main() = runBlocking {
    val duration = measureTimeMillis {
        slowEmitter()
            .conflate()
            .collect { num ->
                delay(320)
                println(num)
            }
    }
    println("Collected in $duration ms")
}

Output may skip the second value.

Processing the Latest Value

collectLatest cancels its block and restarts on each new emission.

fun main() = runBlocking {
    val duration = measureTimeMillis {
        slowEmitter()
            .collectLatest { num ->
                println("Processing $num")
                delay(320)
                println("Finished $num")
            }
    }
    println("Collected in $duration ms")
}

Combining Multiple Flows

Zip

Combines corresponding values from two flows.

fun main() = runBlocking {
    val numbers = (1..3).asFlow()
    val labels = flowOf("A", "B", "C")
    numbers.zip(labels) { a, b -> "$a -> $b" }
        .collect { combo -> println(combo) }
}

Output:

1 -> A
2 -> B
3 -> C

Combine

Recomputes whenever either flow emits a new value.

fun main() = runBlocking {
    val numbers = (1..3).asFlow().onEach { delay(310) }
    val labels = flowOf("X", "Y", "Z").onEach { delay(410) }
    val start = System.currentTimeMillis()
    numbers.combine(labels) { a, b -> "$a & $b" }
        .collect { combo ->
            println("$combo at ${System.currentTimeMillis() - start} ms")
        }
}

Flattening Flows

Flatten a flow of flows into a single flow.

fun dataStream(id: Int): Flow<String> = flow {
    emit("$id: Start")
    delay(520)
    emit("$id: End")
}

flatMapConcat

Processes inner flows sequentially.

fun main() = runBlocking {
    val start = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(110) }
        .flatMapConcat { dataStream(it) }
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - start} ms")
        }
}

flatMapMerge

Concurrently collects all inner flows.

fun main() = runBlocking {
    val start = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(110) }
        .flatMapMerge { dataStream(it) }
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - start} ms")
        }
}

flatMapLatest

Cancels the previous inner flow when a new one arrives.

fun main() = runBlocking {
    val start = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(110) }
        .flatMapLatest { dataStream(it) }
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - start} ms")
        }
}

Flow Exceptions

Collector try/catch

fun faultyStream(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
    try {
        faultyStream().collect { num ->
            println(num)
            check(num <= 1) { "Received $num" }
        }
    } catch (e: Throwable) {
        println("Caught: $e")
    }
}

Output:

Emitting 1
1
Emitting 2
2
Caught: java.lang.IllegalStateException: Received 2

The catch Operator

Preserves exception transparency.

fun main() = runBlocking {
    faultyStream()
        .catch { e -> emit("Recovered: $e") }
        .collect { output -> println(output) }
}

Declarative Catching

fun main() = runBlocking {
    faultyStream()
        .onEach { num ->
            check(num <= 1) { "Received $num" }
            println(num)
        }
        .catch { e -> println("Caught: $e") }
        .collect()
}

Flow Completion

Imperative finally Block

fun main() = runBlocking {
    try {
        (1..3).asFlow().collect { num -> println(num) }
    } finally {
        println("Done")
    }
}

Declarative onCompletion

fun main() = runBlocking {
    (1..3).asFlow()
        .onCompletion { println("Stream finished") }
        .collect { num -> println(num) }
}

onCompletion receives a nullable Throwable parameter.

Launching Flows

Use launchIn to start collection in a separate coroutine.

fun eventStream(): Flow<Int> = (1..3).asFlow().onEach { delay(120) }

fun main() = runBlocking {
    eventStream()
        .onEach { event -> println("Event: $event") }
        .launchIn(this)
    println("Ready")
}

Output:

Ready
Event: 1
Event: 2
Event: 3

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.