Skip to content

Commit

Permalink
chore: create simple custom flow
Browse files Browse the repository at this point in the history
  • Loading branch information
giacomoaccursi committed Jan 12, 2024
1 parent e13d2a1 commit 611ca97
Showing 1 changed file with 62 additions and 0 deletions.
62 changes: 62 additions & 0 deletions src/main/kotlin/flow/CustomFlow.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2024. Accursi Giacomo
*
* Use of this source code is governed by an MIT-style
* license that can be found in the LICENSE file or at
* https://opensource.org/licenses/MIT.
*/

package flow

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.withContext
import java.util.concurrent.CountDownLatch

/**
* A custom mutableSharedFlow waiting for the consumption of an emitted element.
*/
class CustomMutableSharedFlow<T>(private val flow: MutableSharedFlow<T>) : MutableSharedFlow<T> by flow {

private lateinit var latch: CountDownLatch

override suspend fun emit(value: T) {
latch = CountDownLatch(subscriptionCount.value)
flow.emit(value)
withContext(Dispatchers.IO) {
latch.await()
}
latch = CountDownLatch(0)
}

/**
* A function for notifying the consumption.
*/
fun notifyConsumed() {
latch.countDown()
}
}

/**
* A custom mutableStateFlow waiting for the consumption of an emitted element.
*/
class CustomMutableStateFlow<T>(private val flow: MutableStateFlow<T>) : MutableStateFlow<T> by flow {
private lateinit var latch: CountDownLatch

override suspend fun emit(value: T) {
latch = CountDownLatch(subscriptionCount.value)
flow.emit(value)
withContext(Dispatchers.IO) {
latch.await()
}
latch = CountDownLatch(0)
}

/**
* A function for notifying the consumption.
*/
fun notifyConsumed() {
latch.countDown()
}
}

0 comments on commit 611ca97

Please sign in to comment.