From 76ea53552c738be8ca371e7d59a18d45125ae44c Mon Sep 17 00:00:00 2001 From: bidetofevil Date: Fri, 27 Sep 2024 11:38:49 -0700 Subject: [PATCH] Fully test implemented Scheduling Service functionality --- .../PeriodicBackgroundActivityCacherTest.kt | 11 +- .../scheduling/SchedulingServiceImpl.kt | 86 ++++---- .../internal/storage/StorageService2.kt | 3 + .../scheduling/SchedulingServiceImplTest.kt | 205 ++++++++++++++---- .../embracesdk/assertions/TestAssertions.kt | 6 +- .../BlockingScheduledExecutorService.kt | 11 + .../embracesdk/fixtures/DeliveryFixtures.kt | 6 +- 7 files changed, 235 insertions(+), 93 deletions(-) diff --git a/embrace-android-core/src/test/java/io/embrace/android/embracesdk/internal/session/caching/PeriodicBackgroundActivityCacherTest.kt b/embrace-android-core/src/test/java/io/embrace/android/embracesdk/internal/session/caching/PeriodicBackgroundActivityCacherTest.kt index c0f8e5f4f2..123b29b39c 100644 --- a/embrace-android-core/src/test/java/io/embrace/android/embracesdk/internal/session/caching/PeriodicBackgroundActivityCacherTest.kt +++ b/embrace-android-core/src/test/java/io/embrace/android/embracesdk/internal/session/caching/PeriodicBackgroundActivityCacherTest.kt @@ -41,9 +41,7 @@ internal class PeriodicBackgroundActivityCacherTest { fun `do not save more than once if delay time has not elapsed`() { queueScheduleSave() queueScheduleSave() - val latch = executor.queueCompletionTask() - executor.blockingMode = false - latch.assertCountedDown() + executor.awaitExecutionCompletion() assertEquals(1, executionCount.get()) } @@ -56,8 +54,7 @@ internal class PeriodicBackgroundActivityCacherTest { val latch2 = queueScheduleSave() assertEquals(1, executor.scheduledTasksCount()) executor.moveForwardAndRunBlocked(1999) - executor.blockingMode = false - executor.queueCompletionTask().assertCountedDown() + executor.awaitExecutionCompletion() assertEquals(1, latch2.count) assertEquals(1, executionCount.get()) executor.moveForwardAndRunBlocked(2) @@ -69,9 +66,7 @@ internal class PeriodicBackgroundActivityCacherTest { fun `stopping cacher prevents execution of the pending scheduled save`() { queueScheduleSave() cacher.stop() - val latch = executor.queueCompletionTask() - executor.blockingMode = false - latch.assertCountedDown() + executor.awaitExecutionCompletion() assertEquals(0, executionCount.get()) } diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt index 9b0088e7fe..4fe0880d74 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt @@ -56,48 +56,30 @@ internal class SchedulingServiceImpl( */ private fun deliveryLoop() { try { - var deliveryQueue = getDeliveryQueue() + var deliveryQueue = createPayloadQueue() while (deliveryQueue.isNotEmpty() && readyToSend()) { deliveryQueue.poll()?.let { payload -> - queueDelivery(payload) - } - - if (queryForPayloads.compareAndSet(true, false)) { - deliveryQueue = getDeliveryQueue() + if (payload.shouldSendPayload()) { + payload.envelopeType.endpoint.updateBlockedEndpoint() + queueDelivery(payload) + } } - if (deliveryQueue.isEmpty()) { - deliveryQueue = getDeliveryQueue() + if (queryForPayloads.compareAndSet(true, false) || deliveryQueue.isEmpty()) { + deliveryQueue = createPayloadQueue() } } } finally { sendLoopActive.set(false) - scheduleNextCheck() + scheduleNextDeliveryLoop() } } - private fun getDeliveryQueue() = LinkedList(getReadyPayloads()) - - private fun getReadyPayloads(): List = + private fun createPayloadQueue() = LinkedList( storageService.getPayloadsByPriority() .filter { it.shouldSendPayload() } .sortedWith(storedTelemetryComparator) - - private fun StoredTelemetryMetadata.shouldSendPayload(): Boolean { - // determine if the given payload is eligible to be sent - // i.e. not already being sent, endpoint not blocked by 429, and isn't waiting to be retried - updateBlockedEndpoint(envelopeType.endpoint) - - return if (activeSends.contains(this)) { - false - } else if (blockedEndpoints.containsKey(envelopeType.endpoint)) { - false - } else { - payloadsToRetry[this]?.run { - clock.now() >= nextRetryTimeMs - } ?: true - } - } + ) private fun queueDelivery(payload: StoredTelemetryMetadata): Future { activeSends.add(payload) @@ -118,18 +100,19 @@ internal class SchedulingServiceImpl( // If delivery of this payload should be retried, add or replace the entry in the retry map // with the new values for how many times it has failed, and when the next retry should happen val retryAttempts = payloadsToRetry[payload]?.failedAttempts ?: 0 - val nextRetryTimeMs = calculateNextRetryTime(retryAttempts = retryAttempts) + val nextRetryTimeMs = if (this is ApiResponse.TooManyRequests && retryAfter != null) { + val unblockedTimestampMs = clock.now() + retryAfter as Long + blockedEndpoints[endpoint] = unblockedTimestampMs + unblockedTimestampMs + } else { + calculateNextRetryTime(retryAttempts = retryAttempts) + } + payloadsToRetry[payload] = RetryInstance( failedAttempts = retryAttempts + 1, nextRetryTimeMs = nextRetryTimeMs ) } - - if (this is ApiResponse.TooManyRequests) { - retryAfter?.let { delayMs -> - blockedEndpoints[endpoint] = clock.now() + delayMs - } - } } } else { // Could not find payload. Do not retry. @@ -141,7 +124,7 @@ internal class SchedulingServiceImpl( } } - private fun scheduleNextCheck() { + private fun scheduleNextDeliveryLoop() { payloadsToRetry.map { it.value.nextRetryTimeMs }.minOrNull()?.let { timestampMs -> if (timestampMs <= clock.now()) { startDeliveryLoop() @@ -156,20 +139,39 @@ internal class SchedulingServiceImpl( } private fun readyToSend(): Boolean { - // TODO: determine if the SDK is in a state where it's ready to send payloads, i.e. have network connection and free thread + // TODO: determine if the SDK is in a state where it's ready to send payloads, e.g. have network connection, etc. return true } - private fun updateBlockedEndpoint(endpoint: Endpoint) { - blockedEndpoints[endpoint]?.let { - if (it <= clock.now()) { - blockedEndpoints.remove(endpoint) - } + private fun StoredTelemetryMetadata.shouldSendPayload(): Boolean { + // determine if the given payload is eligible to be sent + // i.e. not already being sent, endpoint not blocked by 429, and isn't waiting to be retried + return if (activeSends.contains(this)) { + false + } else if (isEndpointBlocked()) { + false + } else { + payloadsToRetry[this]?.run { + clock.now() >= nextRetryTimeMs + } ?: true } } private fun StoredTelemetryMetadata.toStream(): InputStream? = storageService.loadPayloadAsStream(this) + private fun StoredTelemetryMetadata.isEndpointBlocked(): Boolean = + blockedEndpoints[envelopeType.endpoint]?.let { timestampMs -> + timestampMs > clock.now() + } ?: false + + private fun Endpoint.updateBlockedEndpoint() { + blockedEndpoints[this]?.let { + if (it <= clock.now()) { + blockedEndpoints.remove(this) + } + } + } + private fun calculateDelay(nextRetryTimeMs: Long): Long = nextRetryTimeMs - clock.now() private fun calculateNextRetryTime(retryAttempts: Int): Long = clock.now() + (INITIAL_DELAY_MS * (1 shl retryAttempts)) diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt index 350d927c11..c9a9ffd2d7 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt @@ -3,6 +3,9 @@ package io.embrace.android.embracesdk.internal.storage import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata import java.io.InputStream +/** + * TODO: merge this into [PayloadStorageService] + */ interface StorageService2 { fun getPayloadsByPriority(): List fun loadPayloadAsStream(payloadMetadata: StoredTelemetryMetadata): InputStream? diff --git a/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt b/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt index cb702c8a4c..f6720a0505 100644 --- a/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt +++ b/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt @@ -1,6 +1,5 @@ package io.embrace.android.embracesdk.internal.delivery.scheduling -import io.embrace.android.embracesdk.assertions.assertCountedDown import io.embrace.android.embracesdk.concurrency.BlockingScheduledExecutorService import io.embrace.android.embracesdk.fakes.FakeClock import io.embrace.android.embracesdk.fakes.FakeRequestExecutionService @@ -10,9 +9,12 @@ import io.embrace.android.embracesdk.fixtures.fakeLogStoredTelemetryMetadata import io.embrace.android.embracesdk.fixtures.fakeSessionStoredTelemetryMetadata import io.embrace.android.embracesdk.fixtures.fakeSessionStoredTelemetryMetadata2 import io.embrace.android.embracesdk.internal.comms.api.ApiResponse +import io.embrace.android.embracesdk.internal.comms.api.Endpoint import io.embrace.android.embracesdk.internal.delivery.scheduling.SchedulingServiceImpl.Companion.INITIAL_DELAY_MS +import io.embrace.android.embracesdk.internal.payload.SessionPayload import io.embrace.android.embracesdk.internal.worker.Worker import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue import org.junit.Before import org.junit.Test @@ -22,22 +24,25 @@ internal class SchedulingServiceImplTest { private lateinit var executionService: FakeRequestExecutionService private lateinit var schedulingExecutor: BlockingScheduledExecutorService private lateinit var deliveryExecutor: BlockingScheduledExecutorService - private lateinit var clock: FakeClock private lateinit var schedulingService: SchedulingServiceImpl + @Volatile + private lateinit var clock: FakeClock + @Before fun setup() { val workerModule = FakeWorkerThreadModule( testWorkerName = Worker.Background.IoRegWorker, anotherTestWorkerName = Worker.Background.DeliveryWorker ) - schedulingExecutor = workerModule.executor.apply { blockingMode = true } + schedulingExecutor = workerModule.executor.apply { blockingMode = false } deliveryExecutor = workerModule.anotherExecutor.apply { blockingMode = false } clock = workerModule.executorClock storageService = FakeStorageService2( - listOf(fakeSessionStoredTelemetryMetadata, fakeLogStoredTelemetryMetadata) + listOf(fakeLogStoredTelemetryMetadata, fakeSessionStoredTelemetryMetadata) ) executionService = FakeRequestExecutionService() + allSendsSucceed() schedulingService = SchedulingServiceImpl( storageService = storageService, executionService = executionService, @@ -49,77 +54,203 @@ internal class SchedulingServiceImplTest { @Test fun `new payload will trigger new delivery loop if the previous one is done`() { + schedulingExecutor.blockingMode = true schedulingService.onPayloadIntake() assertEquals(1, schedulingExecutor.submitCount) - val latch = schedulingExecutor.queueCompletionTask() - schedulingExecutor.runCurrentlyBlocked() - latch.assertCountedDown() + schedulingExecutor.awaitExecutionCompletion() schedulingService.onPayloadIntake() assertEquals(3, schedulingExecutor.submitCount) } @Test fun `new payload will not trigger new delivery loop job if one is running`() { + schedulingExecutor.blockingMode = true schedulingService.onPayloadIntake() schedulingService.onPayloadIntake() assertEquals(1, schedulingExecutor.submitCount) } @Test - fun `all payloads ready to be sent are queued up`() { - schedulingService.onPayloadIntake() - schedulingExecutor.runCurrentlyBlocked() - val latch = deliveryExecutor.queueCompletionTask() - latch.assertCountedDown() + fun `all payloads ready to be sent are sent in priority order`() { + executionService.constantResponse = success + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() assertEquals(2, executionService.sendAttempts()) + assertEquals(0, storageService.cachedPayloads.size) + assertTrue(executionService.attemptedHttpRequests.first().data is SessionPayload) } @Test - fun `payloads remaining in storage will not be resent if retry period has not ended`() { - executionService.constantResponse = ApiResponse.Failure(code = 500, emptyMap()) - schedulingExecutor.blockingMode = false - schedulingService.onPayloadIntake() - deliveryExecutor.queueCompletionTask().assertCountedDown() - schedulingService.onPayloadIntake() - deliveryExecutor.queueCompletionTask().assertCountedDown() + fun `payloads being sent will not be resent when a new payload arrives`() { + deliveryExecutor.blockingMode = true + waitForOnPayloadIntakeTaskCompletion() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata2) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.blockingMode = false + deliveryExecutor.awaitExecutionCompletion() + assertEquals(3, executionService.sendAttempts()) + } + + @Test + fun `payloads that fail to send will not be ready to be resent immediately`() { + allSendsFail() + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals(2, executionService.sendAttempts()) + assertEquals(2, storageService.cachedPayloads.size) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() assertEquals(2, executionService.sendAttempts()) + assertEquals(2, storageService.cachedPayloads.size) + } + + @Test + fun `payloads that fail to send will be retried with exponential back off`() { + allSendsFail() + schedulingExecutor.blockingMode = true + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + var delay = INITIAL_DELAY_MS + repeat(10) { iteration -> + schedulingExecutor.moveForwardAndRunBlocked(delay + 1) + schedulingExecutor.awaitExecutionCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals( + "Send attempt ${iteration + 1} did not result in the right number of sends after $delay ms", + 2 * (iteration + 2), + executionService.sendAttempts() + ) + assertEquals("Send attempt $iteration failed", 2, storageService.cachedPayloads.size) + delay *= 2 + } } @Test fun `payloads remaining in storage will resent if retry period has ended`() { - executionService.constantResponse = ApiResponse.Failure(code = 500, emptyMap()) - schedulingExecutor.blockingMode = false - schedulingService.onPayloadIntake() - deliveryExecutor.queueCompletionTask().assertCountedDown() + allSendsFail() + schedulingExecutor.blockingMode = true + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() clock.tick(INITIAL_DELAY_MS + 1) - schedulingService.onPayloadIntake() - deliveryExecutor.queueCompletionTask().assertCountedDown() + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() assertEquals(4, executionService.sendAttempts()) } @Test - fun `new payload arrival will trigger it to be sent and not resend in progress payloads`() { + fun `new payload arrival during delivery loop will be picked up and sent without delay`() { + schedulingExecutor.blockingMode = true + deliveryExecutor.blockingMode = true schedulingService.onPayloadIntake() - schedulingExecutor.runCurrentlyBlocked() storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata2) - schedulingService.onPayloadIntake() - val scheduleLatch = schedulingExecutor.queueCompletionTask() - schedulingExecutor.runCurrentlyBlocked() - scheduleLatch.assertCountedDown() - val deliveryLatch = deliveryExecutor.queueCompletionTask() - deliveryLatch.assertCountedDown() + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.blockingMode = false + deliveryExecutor.awaitExecutionCompletion() assertEquals(3, executionService.sendAttempts()) } + @Test + fun `payloads to blocked endpoint will not be sent or retried until duration lapses`() { + val longBlockedDuration = 90_000L + storageService.cachedPayloads.clear() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata) + executionService.constantResponse = ApiResponse.TooManyRequests(endpoint = Endpoint.SESSIONS_V2, longBlockedDuration) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals(1, executionService.sendAttempts()) + allSendsSucceed() + schedulingExecutor.moveForwardAndRunBlocked(INITIAL_DELAY_MS + 1) + deliveryExecutor.awaitExecutionCompletion() + assertEquals(1, executionService.sendAttempts()) + schedulingExecutor.moveForwardAndRunBlocked(longBlockedDuration - INITIAL_DELAY_MS) + deliveryExecutor.awaitExecutionCompletion() + assertEquals(2, executionService.sendAttempts()) + assertEquals(0, storageService.cachedPayloads.size) + } + + @Test + fun `payloads that fail to deliver because of a 429 will be retried before the default delay if endpoint is unblocked earlier`() { + storageService.cachedPayloads.clear() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata) + executionService.constantResponse = ApiResponse.TooManyRequests(endpoint = Endpoint.SESSIONS_V2, SHORT_BLOCKED_DURATION) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + allSendsSucceed() + schedulingExecutor.moveForwardAndRunBlocked(SHORT_BLOCKED_DURATION + 1) + deliveryExecutor.awaitExecutionCompletion() + assertEquals(2, executionService.sendAttempts()) + assertEquals(0, storageService.cachedPayloads.size) + } + + @Test + fun `payloads to unblocked endpoint will not affect other endpoints`() { + storageService.cachedPayloads.clear() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata) + executionService.constantResponse = ApiResponse.TooManyRequests(endpoint = Endpoint.SESSIONS_V2, SHORT_BLOCKED_DURATION) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals(1, executionService.sendAttempts()) + executionService.constantResponse = success + storageService.cachedPayloads.add(fakeLogStoredTelemetryMetadata) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals(2, executionService.sendAttempts()) + assertEquals(1, storageService.cachedPayloads.size) + } + + @Test + fun `concurrent payload sending to the same endpoint will result in only one delivery attempt`() { + storageService.cachedPayloads.clear() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata) + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata2) + executionService.constantResponse = ApiResponse.TooManyRequests(endpoint = Endpoint.SESSIONS_V2, SHORT_BLOCKED_DURATION) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals(1, executionService.sendAttempts()) + schedulingExecutor.moveForwardAndRunBlocked(SHORT_BLOCKED_DURATION + 1) + deliveryExecutor.awaitExecutionCompletion() + assertEquals(2, executionService.sendAttempts()) + } + + @Test + fun `payloads to already blocked endpoint will not be sent`() { + storageService.cachedPayloads.clear() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata) + executionService.constantResponse = ApiResponse.TooManyRequests(endpoint = Endpoint.SESSIONS_V2, SHORT_BLOCKED_DURATION) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata2) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals(1, executionService.sendAttempts()) + } + @Test fun `no sent attempt will be made if a payload cannot be found on disk`() { deliveryExecutor.blockingMode = true - schedulingService.onPayloadIntake() - schedulingExecutor.runCurrentlyBlocked() + waitForOnPayloadIntakeTaskCompletion() storageService.cachedPayloads.remove(fakeLogStoredTelemetryMetadata) deliveryExecutor.blockingMode = false - val deliveryLatch = deliveryExecutor.queueCompletionTask() - deliveryLatch.assertCountedDown() + deliveryExecutor.awaitExecutionCompletion() assertEquals(1, executionService.sendAttempts()) } + + private fun waitForOnPayloadIntakeTaskCompletion() { + schedulingService.onPayloadIntake() + schedulingExecutor.awaitExecutionCompletion() + } + + private fun allSendsSucceed() { + executionService.constantResponse = success + } + + private fun allSendsFail() { + executionService.constantResponse = failure + } + + private companion object { + const val SHORT_BLOCKED_DURATION = 30_000L + val success = ApiResponse.Success(body = "", headers = emptyMap()) + val failure = ApiResponse.Failure(code = 500, emptyMap()) + } } diff --git a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/assertions/TestAssertions.kt b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/assertions/TestAssertions.kt index eb626625dd..6da4fb22e6 100644 --- a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/assertions/TestAssertions.kt +++ b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/assertions/TestAssertions.kt @@ -4,7 +4,7 @@ import org.junit.Assert.assertEquals import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -fun CountDownLatch.assertCountedDown() { - await(1, TimeUnit.SECONDS) - assertEquals(0, count) +fun CountDownLatch.assertCountedDown(waitTimeMs: Long = 1000L) { + await(waitTimeMs, TimeUnit.MILLISECONDS) + assertEquals("Operation timed out after $waitTimeMs ms", 0, count) } diff --git a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/concurrency/BlockingScheduledExecutorService.kt b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/concurrency/BlockingScheduledExecutorService.kt index de54c27f27..272e95e0b2 100644 --- a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/concurrency/BlockingScheduledExecutorService.kt +++ b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/concurrency/BlockingScheduledExecutorService.kt @@ -1,5 +1,6 @@ package io.embrace.android.embracesdk.concurrency +import io.embrace.android.embracesdk.assertions.assertCountedDown import io.embrace.android.embracesdk.fakes.FakeClock import java.util.LinkedList import java.util.concurrent.AbstractExecutorService @@ -211,9 +212,19 @@ class BlockingScheduledExecutorService( submit { latch.countDown() } + if (blockingMode) { + runCurrentlyBlocked() + } return latch } + /** + * Wait [waitTimeMs] for all the queued up jobs on this executor to finish running + */ + fun awaitExecutionCompletion(waitTimeMs: Long = 1000L) { + queueCompletionTask().assertCountedDown(waitTimeMs) + } + private fun submitOrQueue(delay: Long, futureTask: BlockedFutureScheduledTask) { if (delay <= 0L) { submit(futureTask) diff --git a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt index c00f7c59dc..058d2db1c8 100644 --- a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt +++ b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt @@ -5,19 +5,19 @@ import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata import io.embrace.android.embracesdk.internal.delivery.SupportedEnvelopeType val fakeSessionStoredTelemetryMetadata = StoredTelemetryMetadata( - timestamp = DEFAULT_FAKE_CURRENT_TIME, + timestamp = DEFAULT_FAKE_CURRENT_TIME + 2000L, uuid = "30690ad1-6b87-4e08-b72c-7deca14451d8", envelopeType = SupportedEnvelopeType.SESSION ) val fakeSessionStoredTelemetryMetadata2 = StoredTelemetryMetadata( - timestamp = DEFAULT_FAKE_CURRENT_TIME + 1, + timestamp = DEFAULT_FAKE_CURRENT_TIME + 10_000L, uuid = "30690ad1-6b87-4e08-b72c-7deca14451d8", envelopeType = SupportedEnvelopeType.SESSION ) val fakeLogStoredTelemetryMetadata = StoredTelemetryMetadata( - timestamp = DEFAULT_FAKE_CURRENT_TIME, + timestamp = DEFAULT_FAKE_CURRENT_TIME + 500L, uuid = "6bda3896-d4fd-42ce-89f6-47bec86f1c80", envelopeType = SupportedEnvelopeType.LOG )