Skip to content

Commit

Permalink
Update unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
d4rken committed Oct 28, 2023
1 parent 82e8769 commit 4a2f7c2
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,80 +3,69 @@ package eu.darken.capod.common.flow
import eu.darken.capod.common.collections.mutate
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.instanceOf
import io.kotest.matchers.types.shouldBeInstanceOf
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.mockk
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.test.*
import kotlinx.coroutines.test.advanceUntilIdle
import org.junit.jupiter.api.Test
import testhelpers.BaseTest
import testhelpers.coroutine.runTest2
import testhelpers.flow.test
import java.io.IOException
import java.lang.Thread.sleep
import kotlin.concurrent.thread
import kotlin.coroutines.EmptyCoroutineContext

class DynamicStateFlowTest : BaseTest() {

// Without an init value, there isn't a way to keep using the flow
@Test
fun `exceptions on initialization are rethrown`() {
val testScope =
createTestCoroutineScope(TestCoroutineDispatcher() + TestCoroutineExceptionHandler() + EmptyCoroutineContext)
val hotData = eu.darken.capod.common.flow.DynamicStateFlow<String>(
fun `exceptions on initialization are rethrown`() = runTest2(expectedError = IOException::class) {
val testScope = this

val hotData = DynamicStateFlow<String>(
loggingTag = "tag",
parentScope = testScope,
coroutineContext = Dispatchers.Unconfined,
startValueProvider = { throw IOException() }
)
runBlocking {
withTimeoutOrNull(500) {
// This blocking scope gets the init exception as the first caller
hotData.flow.firstOrNull()
} shouldBe null
}

testScope.advanceUntilIdle()

testScope.uncaughtExceptions.single() shouldBe instanceOf(IOException::class)
// This blocking scope gets the init exception as the first caller
hotData.flow.firstOrNull()
}


@Test
fun `subscription doesn't end when no subscriber is collecting, mode Lazily`() {
val testScope =
createTestCoroutineScope(TestCoroutineDispatcher() + TestCoroutineExceptionHandler() + EmptyCoroutineContext)
fun `subscription doesn't end when no subscriber is collecting, mode Lazily`() = runTest2(autoCancel = true) {
val testScope = this

val valueProvider = mockk<suspend CoroutineScope.() -> String>()
coEvery { valueProvider.invoke(any()) } returns "Test"

val hotData = eu.darken.capod.common.flow.DynamicStateFlow(
val hotData = DynamicStateFlow(
loggingTag = "tag",
parentScope = testScope,
coroutineContext = Dispatchers.Unconfined,
startValueProvider = valueProvider,
)

testScope.apply {
runTest2(autoCancel = true) {
hotData.flow.first() shouldBe "Test"
hotData.flow.first() shouldBe "Test"
}
coVerify(exactly = 1) { valueProvider.invoke(any()) }
}
hotData.flow.first() shouldBe "Test"
hotData.flow.first() shouldBe "Test"

coVerify(exactly = 1) { valueProvider.invoke(any()) }
}

@Test
fun `value updates`() {
val testScope =
createTestCoroutineScope(TestCoroutineDispatcher() + TestCoroutineExceptionHandler() + EmptyCoroutineContext)
fun `value updates`() = runTest2(autoCancel = true) {
val testScope = this

val valueProvider = mockk<suspend CoroutineScope.() -> Long>()
coEvery { valueProvider.invoke(any()) } returns 1

val hotData = eu.darken.capod.common.flow.DynamicStateFlow(
val hotData = DynamicStateFlow(
loggingTag = "tag",
parentScope = testScope,
startValueProvider = valueProvider,
Expand All @@ -88,7 +77,6 @@ class DynamicStateFlowTest : BaseTest() {
(1..16).forEach { _ ->
thread {
(1..200).forEach { _ ->
sleep(10)
hotData.updateAsync(
onUpdate = { this + 1L },
onError = { throw it }
Expand All @@ -97,27 +85,30 @@ class DynamicStateFlowTest : BaseTest() {
}
}

runBlocking {
testCollector.await { list, _ -> list.size == 3201 }
testCollector.latestValues shouldBe (1..3201).toList()
}
advanceUntilIdle()

testCollector.await { list, _ -> list.size == 3201 }
testCollector.latestValues shouldBe (1..3201).toList()

advanceUntilIdle()

coVerify(exactly = 1) { valueProvider.invoke(any()) }

testCollector.cancelAndJoin()
}

data class TestData(
val number: Long = 1
)

@Test
fun `check multi threading value updates with more complex data`() {
val testScope =
createTestCoroutineScope(TestCoroutineDispatcher() + TestCoroutineExceptionHandler() + EmptyCoroutineContext)
val valueProvider =
mockk<suspend CoroutineScope.() -> Map<String, eu.darken.capod.common.flow.DynamicStateFlowTest.TestData>>()
coEvery { valueProvider.invoke(any()) } returns mapOf("data" to eu.darken.capod.common.flow.DynamicStateFlowTest.TestData())

val hotData = eu.darken.capod.common.flow.DynamicStateFlow(
fun `check multi threading value updates with more complex data`() = runTest2(autoCancel = true) {
val testScope = this

val valueProvider = mockk<suspend CoroutineScope.() -> Map<String, TestData>>()
coEvery { valueProvider.invoke(any()) } returns mapOf("data" to TestData())

val hotData = DynamicStateFlow(
loggingTag = "tag",
parentScope = testScope,
startValueProvider = valueProvider,
Expand All @@ -140,20 +131,23 @@ class DynamicStateFlowTest : BaseTest() {
}
}

runBlocking {
testCollector.await { list, _ -> list.size == 4001 }
testCollector.latestValues.map { it.values.single().number } shouldBe (1L..4001L).toList()
}
advanceUntilIdle()

testCollector.await { list, _ -> list.size == 4001 }
testCollector.latestValues.map { it.values.single().number } shouldBe (1L..4001L).toList()

advanceUntilIdle()

coVerify(exactly = 1) { valueProvider.invoke(any()) }

testCollector.cancelAndJoin()
}

@Test
fun `only emit new values if they actually changed updates`() {
val testScope =
createTestCoroutineScope(TestCoroutineDispatcher() + TestCoroutineExceptionHandler() + EmptyCoroutineContext)
fun `only emit new values if they actually changed updates`() = runTest2(autoCancel = true) {
val testScope = this

val hotData = eu.darken.capod.common.flow.DynamicStateFlow(
val hotData = DynamicStateFlow(
loggingTag = "tag",
parentScope = testScope,
startValueProvider = { "1" },
Expand All @@ -167,26 +161,28 @@ class DynamicStateFlowTest : BaseTest() {
hotData.updateAsync { "2" }
hotData.updateAsync { "1" }

runBlocking {
testCollector.await { list, _ -> list.size == 3 }
testCollector.latestValues shouldBe listOf("1", "2", "1")
}
advanceUntilIdle()

testCollector.await { list, _ -> list.size == 3 }
testCollector.latestValues shouldBe listOf("1", "2", "1")
}

@Test
fun `multiple subscribers share the flow`() = runTest2(autoCancel = true) {
val testScope = this

val valueProvider = mockk<suspend CoroutineScope.() -> String>()
coEvery { valueProvider.invoke(any()) } returns "Test"

val hotData = eu.darken.capod.common.flow.DynamicStateFlow(
val hotData = DynamicStateFlow(
loggingTag = "tag",
parentScope = this,
parentScope = testScope,
startValueProvider = valueProvider,
)

val sub1 = hotData.flow.test(tag = "sub1", scope = this)
val sub2 = hotData.flow.test(tag = "sub2", scope = this)
val sub3 = hotData.flow.test(tag = "sub3", scope = this)
val sub1 = hotData.flow.test(tag = "sub1", scope = testScope)
val sub2 = hotData.flow.test(tag = "sub2", scope = testScope)
val sub3 = hotData.flow.test(tag = "sub3", scope = testScope)

hotData.updateAsync { "A" }
hotData.updateAsync { "B" }
Expand All @@ -207,17 +203,19 @@ class DynamicStateFlowTest : BaseTest() {

@Test
fun `value is persisted between unsubscribes`() = runTest2(autoCancel = true) {
val testScope = this

val valueProvider = mockk<suspend CoroutineScope.() -> Long>()
coEvery { valueProvider.invoke(any()) } returns 1

val hotData = eu.darken.capod.common.flow.DynamicStateFlow(
val hotData = DynamicStateFlow(
loggingTag = "tag",
parentScope = this,
parentScope = testScope,
coroutineContext = this.coroutineContext,
startValueProvider = valueProvider,
)

val testCollector1 = hotData.flow.test(tag = "collector1", scope = this)
val testCollector1 = hotData.flow.test(tag = "collector1", scope = testScope)
testCollector1.silent = false

(1..10).forEach { _ ->
Expand All @@ -233,7 +231,7 @@ class DynamicStateFlowTest : BaseTest() {

testCollector1.cancelAndJoin()

val testCollector2 = hotData.flow.test(tag = "collector2", scope = this)
val testCollector2 = hotData.flow.test(tag = "collector2", scope = testScope)
testCollector2.silent = false

advanceUntilIdle()
Expand All @@ -246,10 +244,10 @@ class DynamicStateFlowTest : BaseTest() {
}

@Test
fun `blocking update is actually blocking`() = runBlocking {
val testScope =
createTestCoroutineScope(TestCoroutineDispatcher() + TestCoroutineExceptionHandler() + EmptyCoroutineContext)
val hotData = eu.darken.capod.common.flow.DynamicStateFlow(
fun `blocking update is actually blocking`() = runTest2(autoCancel = true) {
val testScope = this

val hotData = DynamicStateFlow(
loggingTag = "tag",
parentScope = testScope,
coroutineContext = testScope.coroutineContext,
Expand All @@ -270,17 +268,19 @@ class DynamicStateFlowTest : BaseTest() {

hotData.updateBlocking { this - 3 } shouldBe 0

advanceUntilIdle()

testCollector.await { _, i -> i == 3 }
testCollector.latestValues shouldBe listOf(2, 3, 0)

testCollector.cancelAndJoin()
}

@Test
fun `blocking update rethrows error`() = runBlocking {
val testScope =
createTestCoroutineScope(TestCoroutineDispatcher() + TestCoroutineExceptionHandler() + EmptyCoroutineContext)
val hotData = eu.darken.capod.common.flow.DynamicStateFlow(
fun `blocking update rethrows error`() = runTest2(autoCancel = true) {
val testScope = this

val hotData = DynamicStateFlow(
loggingTag = "tag",
parentScope = testScope,
coroutineContext = testScope.coroutineContext,
Expand All @@ -300,16 +300,17 @@ class DynamicStateFlowTest : BaseTest() {
hotData.flow.first() shouldBe 2

hotData.updateBlocking { 3 } shouldBe 3
hotData.flow.first() shouldBe 3

testScope.uncaughtExceptions.singleOrNull() shouldBe null
advanceUntilIdle()

hotData.flow.first() shouldBe 3

testCollector.cancelAndJoin()
}

@Test
fun `async updates error handler`() = runTest2(expectedError = IOException::class) {
val hotData = eu.darken.capod.common.flow.DynamicStateFlow(
val hotData = DynamicStateFlow(
loggingTag = "tag",
parentScope = this,
startValueProvider = { 1 },
Expand All @@ -321,14 +322,15 @@ class DynamicStateFlowTest : BaseTest() {
hotData.updateAsync { throw IOException("Surprise") }

advanceUntilIdle()

testCollector.cancelAndJoin()
}

@Test
fun `async updates rethrow errors on HotDataFlow scope if no error handler is set`() = runBlocking {
val testScope =
createTestCoroutineScope(TestCoroutineDispatcher() + TestCoroutineExceptionHandler() + EmptyCoroutineContext)
fun `async updates rethrow errors on HotDataFlow scope if no error handler is set`() = runTest2(autoCancel = true) {
val testScope = this

val hotData = eu.darken.capod.common.flow.DynamicStateFlow(
val hotData = DynamicStateFlow(
loggingTag = "tag",
parentScope = testScope,
startValueProvider = { 1 },
Expand All @@ -346,31 +348,29 @@ class DynamicStateFlowTest : BaseTest() {

testScope.advanceUntilIdle()
thrownError!!.shouldBeInstanceOf<IOException>()
testScope.uncaughtExceptions.singleOrNull() shouldBe null

testCollector.cancelAndJoin()
}

@Test
fun `clean up function is called when parent scope is cancelled`() = runTest {
val testScope =
createTestCoroutineScope(TestCoroutineDispatcher() + TestCoroutineExceptionHandler() + EmptyCoroutineContext)

fun `clean up function is called when parent scope is cancelled`() {
var onReleaseValue: String? = null

val hotData = eu.darken.capod.common.flow.DynamicStateFlow(
loggingTag = "tag",
parentScope = testScope,
coroutineContext = Dispatchers.Unconfined,
startValueProvider = { "Test" },
onRelease = {
onReleaseValue = it
}
)
runTest2(autoCancel = true) {
val testScope = this

hotData.flow.first() shouldBe "Test"
val hotData = DynamicStateFlow(
loggingTag = "tag",
parentScope = testScope,
coroutineContext = Dispatchers.Unconfined,
startValueProvider = { "Test" },
onRelease = {
onReleaseValue = it
}
)

testScope.cancel()
hotData.flow.first() shouldBe "Test"
}

onReleaseValue shouldBe "Test"
}
Expand Down
Loading

0 comments on commit 4a2f7c2

Please sign in to comment.