From 0eb8512e87731ffaa69e9f316dd667704ee8fb47 Mon Sep 17 00:00:00 2001 From: bidetofevil Date: Tue, 24 Sep 2024 23:06:15 -0700 Subject: [PATCH 1/3] Partially implemented and tested SchedulignService --- .../embracesdk/internal/worker/Worker.kt | 5 + .../delivery/SupportedEnvelopeType.kt | 12 +- .../execution/RequestExecutionService.kt | 2 +- .../execution/RequestExecutionServiceImpl.kt | 2 +- .../delivery/scheduling/SchedulingService.kt | 6 + .../scheduling/SchedulingServiceImpl.kt | 152 +++++++++++++++++- .../internal/injection/DeliveryModule2Impl.kt | 4 +- .../internal/storage/PayloadReference.kt | 10 ++ .../internal/storage/StorageService2.kt | 9 ++ .../scheduling/SchedulingServiceImplTest.kt | 118 ++++++++++++++ .../fakes/FakeRequestExecutionService.kt | 8 +- .../embracesdk/fakes/FakeStorageService2.kt | 23 +++ .../embracesdk/fixtures/DeliveryFixtures.kt | 17 ++ 13 files changed, 355 insertions(+), 13 deletions(-) create mode 100644 embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/PayloadReference.kt create mode 100644 embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt create mode 100644 embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt create mode 100644 embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeStorageService2.kt create mode 100644 embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt diff --git a/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/worker/Worker.kt b/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/worker/Worker.kt index 2b12a1ee6f..b297d435e2 100644 --- a/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/worker/Worker.kt +++ b/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/worker/Worker.kt @@ -53,5 +53,10 @@ sealed class Worker(internal val threadName: String) { * Monitor thread that checks the main thread for ANRs. */ object AnrWatchdogWorker : Background("anr-watchdog") + + /** + * Delivery Worker + */ + object DeliveryWorker : Background("delivery") } } diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/SupportedEnvelopeType.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/SupportedEnvelopeType.kt index 149b951430..65b08c43df 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/SupportedEnvelopeType.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/SupportedEnvelopeType.kt @@ -1,5 +1,6 @@ package io.embrace.android.embracesdk.internal.delivery +import io.embrace.android.embracesdk.internal.comms.api.Endpoint import io.embrace.android.embracesdk.internal.payload.Envelope import java.lang.reflect.Type @@ -8,13 +9,14 @@ import java.lang.reflect.Type */ enum class SupportedEnvelopeType( val serializedType: Type, - val description: String + val description: String, + val endpoint: Endpoint, ) { - CRASH(Envelope.logEnvelopeType, "crash"), - SESSION(Envelope.sessionEnvelopeType, "session"), - LOG(Envelope.logEnvelopeType, "log"), - NETWORK(Envelope.logEnvelopeType, "network"); + CRASH(Envelope.logEnvelopeType, "crash", Endpoint.LOGS), + SESSION(Envelope.sessionEnvelopeType, "session", Endpoint.SESSIONS_V2), + LOG(Envelope.logEnvelopeType, "log", Endpoint.LOGS), + NETWORK(Envelope.logEnvelopeType, "network", Endpoint.LOGS); companion object { private val valueMap = diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionService.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionService.kt index 95fd801e8f..f27b3691e2 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionService.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionService.kt @@ -14,7 +14,7 @@ interface RequestExecutionService { * Takes an [InputStream] of a payload and attempts an HTTP request to the Embrace backend. */ fun attemptHttpRequest( - payloadStream: () -> InputStream, + payloadStream: () -> InputStream?, envelopeType: SupportedEnvelopeType ): ApiResponse } diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionServiceImpl.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionServiceImpl.kt index 23f4c77d0e..e2327d105a 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionServiceImpl.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionServiceImpl.kt @@ -7,7 +7,7 @@ import java.io.InputStream internal class RequestExecutionServiceImpl : RequestExecutionService { override fun attemptHttpRequest( - payloadStream: () -> InputStream, + payloadStream: () -> InputStream?, envelopeType: SupportedEnvelopeType ): ApiResponse { return ApiResponse.None diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingService.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingService.kt index 0c4025f674..d75c5b4f0a 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingService.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingService.kt @@ -13,3 +13,9 @@ interface SchedulingService : CrashTeardownHandler { */ fun onPayloadIntake() } + +class NoopSchedulingService : SchedulingService { + override fun onPayloadIntake() { } + + override fun handleCrash(crashId: String) { } +} diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt index 70acce9c6c..064a4fc732 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt @@ -1,14 +1,162 @@ +@file:Suppress("FunctionOnlyReturningConstant") + package io.embrace.android.embracesdk.internal.delivery.scheduling +import io.embrace.android.embracesdk.internal.clock.Clock +import io.embrace.android.embracesdk.internal.comms.api.ApiResponse +import io.embrace.android.embracesdk.internal.comms.api.Endpoint import io.embrace.android.embracesdk.internal.delivery.execution.RequestExecutionService +import io.embrace.android.embracesdk.internal.storage.PayloadReference +import io.embrace.android.embracesdk.internal.storage.StorageService2 +import io.embrace.android.embracesdk.internal.worker.BackgroundWorker +import java.io.InputStream +import java.util.Collections +import java.util.LinkedList +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean internal class SchedulingServiceImpl( - @Suppress("unused") private val requestExecutionService: RequestExecutionService + private val storageService: StorageService2, + private val executionService: RequestExecutionService, + private val schedulingWorker: BackgroundWorker, + private val deliveryWorker: BackgroundWorker, + private val clock: Clock ) : SchedulingService { + private val blockedEndpoints: MutableMap = ConcurrentHashMap() + private val sendLoopActive = AtomicBoolean(false) + private val activeSends: MutableSet = Collections.synchronizedSet(HashSet()) + private val payloadsToRetry: MutableMap = ConcurrentHashMap() + + override fun onPayloadIntake() { + // When a payload arrives, check to see if there's already an active job try to deliver payloads + // If not, schedule job. If so, do nothing. + if (sendLoopActive.compareAndSet(false, true)) { + schedulingWorker.submit { + queueLoop() + } + } + } + override fun handleCrash(crashId: String) { + // TODO: get ready to die } - override fun onPayloadIntake() { + /** + * Loop through the payloads ready to be sent by priority and queue for delivery + */ + private fun queueLoop() { + try { + var readyPayloads = getReadyPayloads() + while (readyPayloads.isNotEmpty() && readyToSend()) { + readyPayloads.forEach { payload -> + queueSend(payload) + } + readyPayloads = getReadyPayloads() + } + } finally { + sendLoopActive.set(false) + scheduleNextCheck() + } + } + + private fun getReadyPayloads(): List { + val payloads = storageService.getPayloadsByPriority() + val readyPayloads = LinkedList() + payloads.forEach { payload -> + if (payload.shouldSendPayload()) { + readyPayloads.push(payload) + } + } + return readyPayloads + } + + private fun PayloadReference.shouldSendPayload(): Boolean { + // determine if the given payload is eligible to be sent + // i.e. not already being sent, endpoint not blocked by 429, and isn't waiting to be retried + updateBlockedEndpoint(endpoint) + + return if (activeSends.contains(this)) { + false + } else if (blockedEndpoints.containsKey(endpoint)) { + false + } else { + payloadsToRetry[this]?.run { + clock.now() >= nextRetryTimeMs + } ?: true + } + } + + private fun queueSend(payload: PayloadReference): Future { + activeSends.add(payload) + return deliveryWorker.submit { + executionService.attemptHttpRequest( + payloadStream = payload.toInputStream(), + envelopeType = payload.envelopeType + ).apply { + if (!shouldRetry) { + payloadsToRetry.remove(payload) + storageService.deletePayload(payload) + } else { + val failedAttempts = payloadsToRetry[payload]?.let { it.failedAttempts + 1 } ?: 1 + payloadsToRetry[payload] = RetryInstance( + failedAttempts = failedAttempts, + nextRetryTimeMs = calculateNextRetryTime(retryCount = failedAttempts - 1) + ) + } + + if (this is ApiResponse.TooManyRequests) { + retryAfter?.let { delayMs -> + blockedEndpoints[endpoint] = clock.now() + delayMs + } + } + + activeSends.remove(payload) + } + } + } + + private fun scheduleNextCheck() { + payloadsToRetry.map { it.value.nextRetryTimeMs }.minOrNull()?.let { timestampMs -> + if (timestampMs <= clock.now()) { + onPayloadIntake() + } else if (timestampMs != Long.MAX_VALUE) { + schedulingWorker.schedule( + ::onPayloadIntake, + calculateDelay(timestampMs), + TimeUnit.MILLISECONDS + ) + } + } + } + + private fun readyToSend(): Boolean { + // TODO: determine if the SDK is in a state where it's ready to send payloads, i.e. have network connection and free thread + return true + } + + private fun updateBlockedEndpoint(endpoint: Endpoint) { + blockedEndpoints[endpoint]?.let { + if (it <= clock.now()) { + blockedEndpoints.remove(endpoint) + } + } + } + + private fun PayloadReference.toInputStream(): () -> InputStream? = { storageService.loadPayloadAsStream(this) } + + private fun calculateDelay(nextRetryTimeMs: Long): Long = nextRetryTimeMs - clock.now() + + private fun calculateNextRetryTime(retryCount: Int): Long = clock.now() + (INITIAL_DELAY_MS * (1 shl retryCount)) + + private data class RetryInstance( + val failedAttempts: Int, + val nextRetryTimeMs: Long + ) + + companion object { + const val INITIAL_DELAY_MS = 60_000L } } diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/injection/DeliveryModule2Impl.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/injection/DeliveryModule2Impl.kt index 1d4fc3b577..380b0bb3b2 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/injection/DeliveryModule2Impl.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/injection/DeliveryModule2Impl.kt @@ -8,8 +8,8 @@ import io.embrace.android.embracesdk.internal.delivery.intake.IntakeService import io.embrace.android.embracesdk.internal.delivery.intake.IntakeServiceImpl import io.embrace.android.embracesdk.internal.delivery.resurrection.PayloadResurrectionService import io.embrace.android.embracesdk.internal.delivery.resurrection.PayloadResurrectionServiceImpl +import io.embrace.android.embracesdk.internal.delivery.scheduling.NoopSchedulingService import io.embrace.android.embracesdk.internal.delivery.scheduling.SchedulingService -import io.embrace.android.embracesdk.internal.delivery.scheduling.SchedulingServiceImpl import io.embrace.android.embracesdk.internal.delivery.storage.PayloadStorageService import io.embrace.android.embracesdk.internal.delivery.storage.PayloadStorageServiceImpl import io.embrace.android.embracesdk.internal.worker.Worker @@ -73,6 +73,6 @@ internal class DeliveryModule2Impl( if (configModule.configService.isOnlyUsingOtelExporters()) { return@singleton null } - SchedulingServiceImpl(checkNotNull(requestExecutionService)) + NoopSchedulingService() } } diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/PayloadReference.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/PayloadReference.kt new file mode 100644 index 0000000000..c6b37cb202 --- /dev/null +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/PayloadReference.kt @@ -0,0 +1,10 @@ +package io.embrace.android.embracesdk.internal.storage + +import io.embrace.android.embracesdk.internal.comms.api.Endpoint +import io.embrace.android.embracesdk.internal.delivery.SupportedEnvelopeType + +data class PayloadReference( + val fileName: String, + val endpoint: Endpoint, + val envelopeType: SupportedEnvelopeType, +) diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt new file mode 100644 index 0000000000..e6056c6349 --- /dev/null +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt @@ -0,0 +1,9 @@ +package io.embrace.android.embracesdk.internal.storage + +import java.io.InputStream + +interface StorageService2 { + fun getPayloadsByPriority(): List + fun loadPayloadAsStream(payload: PayloadReference): InputStream? + fun deletePayload(payload: PayloadReference) +} diff --git a/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt b/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt new file mode 100644 index 0000000000..d7addff812 --- /dev/null +++ b/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt @@ -0,0 +1,118 @@ +package io.embrace.android.embracesdk.internal.delivery.scheduling + +import io.embrace.android.embracesdk.assertions.assertCountedDown +import io.embrace.android.embracesdk.concurrency.BlockingScheduledExecutorService +import io.embrace.android.embracesdk.fakes.FakeClock +import io.embrace.android.embracesdk.fakes.FakeRequestExecutionService +import io.embrace.android.embracesdk.fakes.FakeStorageService2 +import io.embrace.android.embracesdk.fakes.injection.FakeWorkerThreadModule +import io.embrace.android.embracesdk.fixtures.fakeLogPayloadReference +import io.embrace.android.embracesdk.fixtures.fakeSessionPayloadReference +import io.embrace.android.embracesdk.internal.comms.api.ApiResponse +import io.embrace.android.embracesdk.internal.delivery.scheduling.SchedulingServiceImpl.Companion.INITIAL_DELAY_MS +import io.embrace.android.embracesdk.internal.worker.Worker +import org.junit.Assert.assertEquals +import org.junit.Before +import org.junit.Test + +internal class SchedulingServiceImplTest { + + private lateinit var storageService: FakeStorageService2 + private lateinit var executionService: FakeRequestExecutionService + private lateinit var schedulingExecutor: BlockingScheduledExecutorService + private lateinit var deliveryExecutor: BlockingScheduledExecutorService + private lateinit var clock: FakeClock + private lateinit var schedulingService: SchedulingServiceImpl + + @Before + fun setup() { + val workerModule = FakeWorkerThreadModule( + testWorkerName = Worker.Background.IoRegWorker, + anotherTestWorkerName = Worker.Background.DeliveryWorker + ) + schedulingExecutor = workerModule.executor.apply { blockingMode = true } + deliveryExecutor = workerModule.anotherExecutor.apply { blockingMode = true } + clock = workerModule.executorClock + storageService = FakeStorageService2( + listOf(fakeSessionPayloadReference, fakeLogPayloadReference) + ) + executionService = FakeRequestExecutionService() + schedulingService = SchedulingServiceImpl( + storageService = storageService, + executionService = executionService, + schedulingWorker = workerModule.backgroundWorker(worker = Worker.Background.IoRegWorker), + deliveryWorker = workerModule.backgroundWorker(worker = Worker.Background.DeliveryWorker), + clock = clock, + ) + } + + @Test + fun `new payload will trigger new delivery loop if the previous one is done`() { + schedulingService.onPayloadIntake() + assertEquals(1, schedulingExecutor.submitCount) + val latch = schedulingExecutor.queueCompletionTask() + schedulingExecutor.runCurrentlyBlocked() + latch.assertCountedDown() + schedulingService.onPayloadIntake() + assertEquals(3, schedulingExecutor.submitCount) + } + + @Test + fun `new payload will not trigger new delivery loop job if one is running`() { + schedulingService.onPayloadIntake() + schedulingService.onPayloadIntake() + assertEquals(1, schedulingExecutor.submitCount) + } + + @Test + fun `all payloads ready to be sent are queued up`() { + schedulingService.onPayloadIntake() + schedulingExecutor.runCurrentlyBlocked() + val latch = deliveryExecutor.queueCompletionTask() + deliveryExecutor.runCurrentlyBlocked() + latch.assertCountedDown() + assertEquals(3, deliveryExecutor.submitCount) + } + + @Test + fun `payloads remaining in storage will not be resent if retry period has not ended`() { + executionService.constantResponse = ApiResponse.None + schedulingExecutor.blockingMode = false + deliveryExecutor.blockingMode = false + schedulingService.onPayloadIntake() + deliveryExecutor.queueCompletionTask().assertCountedDown() + val submittedDeliveries = deliveryExecutor.submitCount + schedulingService.onPayloadIntake() + deliveryExecutor.queueCompletionTask().assertCountedDown() + assertEquals(submittedDeliveries + 1, deliveryExecutor.submitCount) + } + + @Test + fun `payloads remaining in storage will resent if retry period has ended`() { + executionService.constantResponse = ApiResponse.None + schedulingExecutor.blockingMode = false + deliveryExecutor.blockingMode = false + schedulingService.onPayloadIntake() + deliveryExecutor.queueCompletionTask().assertCountedDown() + val submittedDeliveries = deliveryExecutor.submitCount + clock.tick(INITIAL_DELAY_MS + 1) + schedulingService.onPayloadIntake() + deliveryExecutor.queueCompletionTask().assertCountedDown() + assertEquals(submittedDeliveries + 3, deliveryExecutor.submitCount) + } + + @Test + fun `new payload arrival will trigger it to be sent and not resend in progress payloads`() { + schedulingService.onPayloadIntake() + schedulingExecutor.runCurrentlyBlocked() + storageService.cachedPayloads.add(fakeSessionPayloadReference.copy(fileName = "session2.json")) + schedulingService.onPayloadIntake() + val scheduleLatch = schedulingExecutor.queueCompletionTask() + schedulingExecutor.runCurrentlyBlocked() + scheduleLatch.assertCountedDown() + val deliveryLatch = deliveryExecutor.queueCompletionTask() + deliveryExecutor.runCurrentlyBlocked() + deliveryLatch.assertCountedDown() + assertEquals(4, deliveryExecutor.submitCount) + } +} diff --git a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeRequestExecutionService.kt b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeRequestExecutionService.kt index 483e7a5b64..a7c399c648 100644 --- a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeRequestExecutionService.kt +++ b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeRequestExecutionService.kt @@ -13,6 +13,7 @@ class FakeRequestExecutionService : RequestExecutionService { private val serializer = TestPlatformSerializer() var responseAction: (intake: Envelope<*>) -> ApiResponse = { _ -> ApiResponse.None } val attemptedHttpRequests = mutableListOf>() + var constantResponse: ApiResponse? = null @Suppress("UNCHECKED_CAST") inline fun getRequests(): List> { @@ -23,10 +24,13 @@ class FakeRequestExecutionService : RequestExecutionService { } override fun attemptHttpRequest( - payloadStream: () -> InputStream, + payloadStream: () -> InputStream?, envelopeType: SupportedEnvelopeType ): ApiResponse { - val json: Envelope<*> = serializer.fromJson(payloadStream().buffered(), envelopeType.serializedType) + constantResponse?.run { return this } + + val bufferedStream = payloadStream()?.buffered() ?: return ApiResponse.None + val json: Envelope<*> = serializer.fromJson(bufferedStream, envelopeType.serializedType) return responseAction(json) } } diff --git a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeStorageService2.kt b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeStorageService2.kt new file mode 100644 index 0000000000..133723a7b5 --- /dev/null +++ b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeStorageService2.kt @@ -0,0 +1,23 @@ +package io.embrace.android.embracesdk.fakes + +import io.embrace.android.embracesdk.internal.storage.PayloadReference +import io.embrace.android.embracesdk.internal.storage.StorageService2 +import java.io.InputStream + +class FakeStorageService2( + payloads: List = emptyList() +) : StorageService2 { + val cachedPayloads = LinkedHashSet().apply { + addAll(payloads) + } + + override fun getPayloadsByPriority(): List = cachedPayloads.toList() + + override fun loadPayloadAsStream(payload: PayloadReference): InputStream? { + return null + } + + override fun deletePayload(payload: PayloadReference) { + cachedPayloads.remove(payload) + } +} diff --git a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt new file mode 100644 index 0000000000..a033f02896 --- /dev/null +++ b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt @@ -0,0 +1,17 @@ +package io.embrace.android.embracesdk.fixtures + +import io.embrace.android.embracesdk.internal.comms.api.Endpoint +import io.embrace.android.embracesdk.internal.delivery.SupportedEnvelopeType +import io.embrace.android.embracesdk.internal.storage.PayloadReference + +val fakeSessionPayloadReference = PayloadReference( + fileName = "session.json", + endpoint = Endpoint.SESSIONS_V2, + envelopeType = SupportedEnvelopeType.SESSION +) + +val fakeLogPayloadReference = PayloadReference( + fileName = "log.json", + endpoint = Endpoint.LOGS, + envelopeType = SupportedEnvelopeType.LOG +) From e3079c6f1c0d2068bd74e7be8230254e0ea89fab Mon Sep 17 00:00:00 2001 From: bidetofevil Date: Thu, 26 Sep 2024 16:30:16 -0700 Subject: [PATCH 2/3] Review feedback --- .../internal/comms/api/ApiResponse.kt | 5 + .../internal/comms/api/EmbraceApiService.kt | 2 +- .../embracesdk/internal/worker/Worker.kt | 2 + .../execution/RequestExecutionService.kt | 2 +- .../execution/RequestExecutionServiceImpl.kt | 2 +- .../scheduling/SchedulingServiceImpl.kt | 125 +++++++++++------- .../internal/storage/PayloadReference.kt | 10 -- .../internal/storage/StorageService2.kt | 7 +- .../scheduling/SchedulingServiceImplTest.kt | 41 +++--- .../fakes/FakeRequestExecutionService.kt | 15 ++- .../embracesdk/fakes/FakeStorageService2.kt | 38 ++++-- .../embracesdk/fixtures/DeliveryFixtures.kt | 22 +-- 12 files changed, 164 insertions(+), 107 deletions(-) delete mode 100644 embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/PayloadReference.kt diff --git a/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/comms/api/ApiResponse.kt b/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/comms/api/ApiResponse.kt index 387779e7a6..ccdc42dd80 100644 --- a/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/comms/api/ApiResponse.kt +++ b/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/comms/api/ApiResponse.kt @@ -47,6 +47,11 @@ sealed class ApiResponse { */ data class Incomplete(val exception: Throwable) : ApiResponse() + /** + * API call not executed because payload was not found + */ + object NoPayload : ApiResponse() + /** * No response was received */ diff --git a/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/comms/api/EmbraceApiService.kt b/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/comms/api/EmbraceApiService.kt index f2b57738d5..c9c275c098 100644 --- a/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/comms/api/EmbraceApiService.kt +++ b/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/comms/api/EmbraceApiService.kt @@ -73,7 +73,7 @@ internal class EmbraceApiService( null } - is ApiResponse.Failure, ApiResponse.None -> { + is ApiResponse.Failure, ApiResponse.None, ApiResponse.NoPayload -> { null } diff --git a/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/worker/Worker.kt b/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/worker/Worker.kt index b297d435e2..3dbfe41ea8 100644 --- a/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/worker/Worker.kt +++ b/embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/worker/Worker.kt @@ -56,6 +56,8 @@ sealed class Worker(internal val threadName: String) { /** * Delivery Worker + * + * TODO: Make this a PriorityWorker */ object DeliveryWorker : Background("delivery") } diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionService.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionService.kt index f27b3691e2..95fd801e8f 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionService.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionService.kt @@ -14,7 +14,7 @@ interface RequestExecutionService { * Takes an [InputStream] of a payload and attempts an HTTP request to the Embrace backend. */ fun attemptHttpRequest( - payloadStream: () -> InputStream?, + payloadStream: () -> InputStream, envelopeType: SupportedEnvelopeType ): ApiResponse } diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionServiceImpl.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionServiceImpl.kt index e2327d105a..23f4c77d0e 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionServiceImpl.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/execution/RequestExecutionServiceImpl.kt @@ -7,7 +7,7 @@ import java.io.InputStream internal class RequestExecutionServiceImpl : RequestExecutionService { override fun attemptHttpRequest( - payloadStream: () -> InputStream?, + payloadStream: () -> InputStream, envelopeType: SupportedEnvelopeType ): ApiResponse { return ApiResponse.None diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt index 064a4fc732..9b0088e7fe 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt @@ -5,8 +5,9 @@ package io.embrace.android.embracesdk.internal.delivery.scheduling import io.embrace.android.embracesdk.internal.clock.Clock import io.embrace.android.embracesdk.internal.comms.api.ApiResponse import io.embrace.android.embracesdk.internal.comms.api.Endpoint +import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata import io.embrace.android.embracesdk.internal.delivery.execution.RequestExecutionService -import io.embrace.android.embracesdk.internal.storage.PayloadReference +import io.embrace.android.embracesdk.internal.delivery.storedTelemetryComparator import io.embrace.android.embracesdk.internal.storage.StorageService2 import io.embrace.android.embracesdk.internal.worker.BackgroundWorker import java.io.InputStream @@ -27,34 +28,47 @@ internal class SchedulingServiceImpl( private val blockedEndpoints: MutableMap = ConcurrentHashMap() private val sendLoopActive = AtomicBoolean(false) - private val activeSends: MutableSet = Collections.synchronizedSet(HashSet()) - private val payloadsToRetry: MutableMap = ConcurrentHashMap() + private val queryForPayloads = AtomicBoolean(true) + private val activeSends: MutableSet = Collections.newSetFromMap(ConcurrentHashMap()) + private val payloadsToRetry: MutableMap = ConcurrentHashMap() override fun onPayloadIntake() { + queryForPayloads.set(true) + startDeliveryLoop() + } + + override fun handleCrash(crashId: String) { + // TODO: get ready to die + } + + private fun startDeliveryLoop() { // When a payload arrives, check to see if there's already an active job try to deliver payloads // If not, schedule job. If so, do nothing. if (sendLoopActive.compareAndSet(false, true)) { schedulingWorker.submit { - queueLoop() + deliveryLoop() } } } - override fun handleCrash(crashId: String) { - // TODO: get ready to die - } - /** * Loop through the payloads ready to be sent by priority and queue for delivery */ - private fun queueLoop() { + private fun deliveryLoop() { try { - var readyPayloads = getReadyPayloads() - while (readyPayloads.isNotEmpty() && readyToSend()) { - readyPayloads.forEach { payload -> - queueSend(payload) + var deliveryQueue = getDeliveryQueue() + while (deliveryQueue.isNotEmpty() && readyToSend()) { + deliveryQueue.poll()?.let { payload -> + queueDelivery(payload) + } + + if (queryForPayloads.compareAndSet(true, false)) { + deliveryQueue = getDeliveryQueue() + } + + if (deliveryQueue.isEmpty()) { + deliveryQueue = getDeliveryQueue() } - readyPayloads = getReadyPayloads() } } finally { sendLoopActive.set(false) @@ -62,25 +76,21 @@ internal class SchedulingServiceImpl( } } - private fun getReadyPayloads(): List { - val payloads = storageService.getPayloadsByPriority() - val readyPayloads = LinkedList() - payloads.forEach { payload -> - if (payload.shouldSendPayload()) { - readyPayloads.push(payload) - } - } - return readyPayloads - } + private fun getDeliveryQueue() = LinkedList(getReadyPayloads()) + + private fun getReadyPayloads(): List = + storageService.getPayloadsByPriority() + .filter { it.shouldSendPayload() } + .sortedWith(storedTelemetryComparator) - private fun PayloadReference.shouldSendPayload(): Boolean { + private fun StoredTelemetryMetadata.shouldSendPayload(): Boolean { // determine if the given payload is eligible to be sent // i.e. not already being sent, endpoint not blocked by 429, and isn't waiting to be retried - updateBlockedEndpoint(endpoint) + updateBlockedEndpoint(envelopeType.endpoint) return if (activeSends.contains(this)) { false - } else if (blockedEndpoints.containsKey(endpoint)) { + } else if (blockedEndpoints.containsKey(envelopeType.endpoint)) { false } else { payloadsToRetry[this]?.run { @@ -89,30 +99,43 @@ internal class SchedulingServiceImpl( } } - private fun queueSend(payload: PayloadReference): Future { + private fun queueDelivery(payload: StoredTelemetryMetadata): Future { activeSends.add(payload) return deliveryWorker.submit { - executionService.attemptHttpRequest( - payloadStream = payload.toInputStream(), - envelopeType = payload.envelopeType - ).apply { - if (!shouldRetry) { - payloadsToRetry.remove(payload) - storageService.deletePayload(payload) - } else { - val failedAttempts = payloadsToRetry[payload]?.let { it.failedAttempts + 1 } ?: 1 - payloadsToRetry[payload] = RetryInstance( - failedAttempts = failedAttempts, - nextRetryTimeMs = calculateNextRetryTime(retryCount = failedAttempts - 1) - ) - } - - if (this is ApiResponse.TooManyRequests) { - retryAfter?.let { delayMs -> - blockedEndpoints[endpoint] = clock.now() + delayMs + try { + val payloadStream = payload.toStream() + if (payloadStream != null) { + executionService.attemptHttpRequest( + payloadStream = { payloadStream }, + envelopeType = payload.envelopeType + ).apply { + if (!shouldRetry) { + // If the response is such that we should not ever retry the delivery of this payload, + // delete it from both the in memory retry payloads map and on disk + payloadsToRetry.remove(payload) + storageService.deletePayload(payload) + } else { + // If delivery of this payload should be retried, add or replace the entry in the retry map + // with the new values for how many times it has failed, and when the next retry should happen + val retryAttempts = payloadsToRetry[payload]?.failedAttempts ?: 0 + val nextRetryTimeMs = calculateNextRetryTime(retryAttempts = retryAttempts) + payloadsToRetry[payload] = RetryInstance( + failedAttempts = retryAttempts + 1, + nextRetryTimeMs = nextRetryTimeMs + ) + } + + if (this is ApiResponse.TooManyRequests) { + retryAfter?.let { delayMs -> + blockedEndpoints[endpoint] = clock.now() + delayMs + } + } } + } else { + // Could not find payload. Do not retry. + ApiResponse.NoPayload } - + } finally { activeSends.remove(payload) } } @@ -121,10 +144,10 @@ internal class SchedulingServiceImpl( private fun scheduleNextCheck() { payloadsToRetry.map { it.value.nextRetryTimeMs }.minOrNull()?.let { timestampMs -> if (timestampMs <= clock.now()) { - onPayloadIntake() + startDeliveryLoop() } else if (timestampMs != Long.MAX_VALUE) { schedulingWorker.schedule( - ::onPayloadIntake, + ::startDeliveryLoop, calculateDelay(timestampMs), TimeUnit.MILLISECONDS ) @@ -145,11 +168,11 @@ internal class SchedulingServiceImpl( } } - private fun PayloadReference.toInputStream(): () -> InputStream? = { storageService.loadPayloadAsStream(this) } + private fun StoredTelemetryMetadata.toStream(): InputStream? = storageService.loadPayloadAsStream(this) private fun calculateDelay(nextRetryTimeMs: Long): Long = nextRetryTimeMs - clock.now() - private fun calculateNextRetryTime(retryCount: Int): Long = clock.now() + (INITIAL_DELAY_MS * (1 shl retryCount)) + private fun calculateNextRetryTime(retryAttempts: Int): Long = clock.now() + (INITIAL_DELAY_MS * (1 shl retryAttempts)) private data class RetryInstance( val failedAttempts: Int, diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/PayloadReference.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/PayloadReference.kt deleted file mode 100644 index c6b37cb202..0000000000 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/PayloadReference.kt +++ /dev/null @@ -1,10 +0,0 @@ -package io.embrace.android.embracesdk.internal.storage - -import io.embrace.android.embracesdk.internal.comms.api.Endpoint -import io.embrace.android.embracesdk.internal.delivery.SupportedEnvelopeType - -data class PayloadReference( - val fileName: String, - val endpoint: Endpoint, - val envelopeType: SupportedEnvelopeType, -) diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt index e6056c6349..350d927c11 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt @@ -1,9 +1,10 @@ package io.embrace.android.embracesdk.internal.storage +import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata import java.io.InputStream interface StorageService2 { - fun getPayloadsByPriority(): List - fun loadPayloadAsStream(payload: PayloadReference): InputStream? - fun deletePayload(payload: PayloadReference) + fun getPayloadsByPriority(): List + fun loadPayloadAsStream(payloadMetadata: StoredTelemetryMetadata): InputStream? + fun deletePayload(payloadMetadata: StoredTelemetryMetadata) } diff --git a/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt b/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt index d7addff812..cb702c8a4c 100644 --- a/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt +++ b/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt @@ -6,8 +6,9 @@ import io.embrace.android.embracesdk.fakes.FakeClock import io.embrace.android.embracesdk.fakes.FakeRequestExecutionService import io.embrace.android.embracesdk.fakes.FakeStorageService2 import io.embrace.android.embracesdk.fakes.injection.FakeWorkerThreadModule -import io.embrace.android.embracesdk.fixtures.fakeLogPayloadReference -import io.embrace.android.embracesdk.fixtures.fakeSessionPayloadReference +import io.embrace.android.embracesdk.fixtures.fakeLogStoredTelemetryMetadata +import io.embrace.android.embracesdk.fixtures.fakeSessionStoredTelemetryMetadata +import io.embrace.android.embracesdk.fixtures.fakeSessionStoredTelemetryMetadata2 import io.embrace.android.embracesdk.internal.comms.api.ApiResponse import io.embrace.android.embracesdk.internal.delivery.scheduling.SchedulingServiceImpl.Companion.INITIAL_DELAY_MS import io.embrace.android.embracesdk.internal.worker.Worker @@ -31,10 +32,10 @@ internal class SchedulingServiceImplTest { anotherTestWorkerName = Worker.Background.DeliveryWorker ) schedulingExecutor = workerModule.executor.apply { blockingMode = true } - deliveryExecutor = workerModule.anotherExecutor.apply { blockingMode = true } + deliveryExecutor = workerModule.anotherExecutor.apply { blockingMode = false } clock = workerModule.executorClock storageService = FakeStorageService2( - listOf(fakeSessionPayloadReference, fakeLogPayloadReference) + listOf(fakeSessionStoredTelemetryMetadata, fakeLogStoredTelemetryMetadata) ) executionService = FakeRequestExecutionService() schedulingService = SchedulingServiceImpl( @@ -69,50 +70,56 @@ internal class SchedulingServiceImplTest { schedulingService.onPayloadIntake() schedulingExecutor.runCurrentlyBlocked() val latch = deliveryExecutor.queueCompletionTask() - deliveryExecutor.runCurrentlyBlocked() latch.assertCountedDown() - assertEquals(3, deliveryExecutor.submitCount) + assertEquals(2, executionService.sendAttempts()) } @Test fun `payloads remaining in storage will not be resent if retry period has not ended`() { - executionService.constantResponse = ApiResponse.None + executionService.constantResponse = ApiResponse.Failure(code = 500, emptyMap()) schedulingExecutor.blockingMode = false - deliveryExecutor.blockingMode = false schedulingService.onPayloadIntake() deliveryExecutor.queueCompletionTask().assertCountedDown() - val submittedDeliveries = deliveryExecutor.submitCount schedulingService.onPayloadIntake() deliveryExecutor.queueCompletionTask().assertCountedDown() - assertEquals(submittedDeliveries + 1, deliveryExecutor.submitCount) + assertEquals(2, executionService.sendAttempts()) } @Test fun `payloads remaining in storage will resent if retry period has ended`() { - executionService.constantResponse = ApiResponse.None + executionService.constantResponse = ApiResponse.Failure(code = 500, emptyMap()) schedulingExecutor.blockingMode = false - deliveryExecutor.blockingMode = false schedulingService.onPayloadIntake() deliveryExecutor.queueCompletionTask().assertCountedDown() - val submittedDeliveries = deliveryExecutor.submitCount clock.tick(INITIAL_DELAY_MS + 1) schedulingService.onPayloadIntake() deliveryExecutor.queueCompletionTask().assertCountedDown() - assertEquals(submittedDeliveries + 3, deliveryExecutor.submitCount) + assertEquals(4, executionService.sendAttempts()) } @Test fun `new payload arrival will trigger it to be sent and not resend in progress payloads`() { schedulingService.onPayloadIntake() schedulingExecutor.runCurrentlyBlocked() - storageService.cachedPayloads.add(fakeSessionPayloadReference.copy(fileName = "session2.json")) + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata2) schedulingService.onPayloadIntake() val scheduleLatch = schedulingExecutor.queueCompletionTask() schedulingExecutor.runCurrentlyBlocked() scheduleLatch.assertCountedDown() val deliveryLatch = deliveryExecutor.queueCompletionTask() - deliveryExecutor.runCurrentlyBlocked() deliveryLatch.assertCountedDown() - assertEquals(4, deliveryExecutor.submitCount) + assertEquals(3, executionService.sendAttempts()) + } + + @Test + fun `no sent attempt will be made if a payload cannot be found on disk`() { + deliveryExecutor.blockingMode = true + schedulingService.onPayloadIntake() + schedulingExecutor.runCurrentlyBlocked() + storageService.cachedPayloads.remove(fakeLogStoredTelemetryMetadata) + deliveryExecutor.blockingMode = false + val deliveryLatch = deliveryExecutor.queueCompletionTask() + deliveryLatch.assertCountedDown() + assertEquals(1, executionService.sendAttempts()) } } diff --git a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeRequestExecutionService.kt b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeRequestExecutionService.kt index a7c399c648..a16dcb4cfa 100644 --- a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeRequestExecutionService.kt +++ b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeRequestExecutionService.kt @@ -11,26 +11,27 @@ import java.io.InputStream class FakeRequestExecutionService : RequestExecutionService { private val serializer = TestPlatformSerializer() - var responseAction: (intake: Envelope<*>) -> ApiResponse = { _ -> ApiResponse.None } + var constantResponse: ApiResponse = ApiResponse.None + var responseAction: (intake: Envelope<*>) -> ApiResponse = { _ -> constantResponse } val attemptedHttpRequests = mutableListOf>() - var constantResponse: ApiResponse? = null @Suppress("UNCHECKED_CAST") inline fun getRequests(): List> { - if (T::class != SessionPayload::class || T::class != LogPayload::class) { + if (T::class != SessionPayload::class && T::class != LogPayload::class) { error("Unsupported type: ${T::class}") } return attemptedHttpRequests.filter { it.data is T } as List> } override fun attemptHttpRequest( - payloadStream: () -> InputStream?, + payloadStream: () -> InputStream, envelopeType: SupportedEnvelopeType ): ApiResponse { - constantResponse?.run { return this } - - val bufferedStream = payloadStream()?.buffered() ?: return ApiResponse.None + val bufferedStream = payloadStream().buffered() val json: Envelope<*> = serializer.fromJson(bufferedStream, envelopeType.serializedType) + attemptedHttpRequests.add(json) return responseAction(json) } + + fun sendAttempts() = getRequests().size + getRequests().size } diff --git a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeStorageService2.kt b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeStorageService2.kt index 133723a7b5..e9a86c8ec6 100644 --- a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeStorageService2.kt +++ b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeStorageService2.kt @@ -1,23 +1,45 @@ package io.embrace.android.embracesdk.fakes -import io.embrace.android.embracesdk.internal.storage.PayloadReference +import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata +import io.embrace.android.embracesdk.internal.payload.Envelope +import io.embrace.android.embracesdk.internal.payload.LogPayload +import io.embrace.android.embracesdk.internal.payload.SessionPayload +import io.embrace.android.embracesdk.internal.serialization.EmbraceSerializer import io.embrace.android.embracesdk.internal.storage.StorageService2 import java.io.InputStream class FakeStorageService2( - payloads: List = emptyList() + payloads: List = emptyList() ) : StorageService2 { - val cachedPayloads = LinkedHashSet().apply { + private val serializer = EmbraceSerializer() + + val cachedPayloads = LinkedHashSet().apply { addAll(payloads) } - override fun getPayloadsByPriority(): List = cachedPayloads.toList() + override fun getPayloadsByPriority(): List = cachedPayloads.toList() - override fun loadPayloadAsStream(payload: PayloadReference): InputStream? { - return null + override fun loadPayloadAsStream(payloadMetadata: StoredTelemetryMetadata): InputStream? { + return if (cachedPayloads.contains(payloadMetadata)) { + when (payloadMetadata.envelopeType.serializedType) { + Envelope.sessionEnvelopeType -> + serializer.toJson( + Envelope(data = SessionPayload()), + Envelope.sessionEnvelopeType + ).byteInputStream() + Envelope.logEnvelopeType -> + serializer.toJson( + Envelope(data = LogPayload()), + Envelope.logEnvelopeType + ).byteInputStream() + else -> null + } + } else { + null + } } - override fun deletePayload(payload: PayloadReference) { - cachedPayloads.remove(payload) + override fun deletePayload(payloadMetadata: StoredTelemetryMetadata) { + cachedPayloads.remove(payloadMetadata) } } diff --git a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt index a033f02896..c00f7c59dc 100644 --- a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt +++ b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt @@ -1,17 +1,23 @@ package io.embrace.android.embracesdk.fixtures -import io.embrace.android.embracesdk.internal.comms.api.Endpoint +import io.embrace.android.embracesdk.fakes.FakeClock.Companion.DEFAULT_FAKE_CURRENT_TIME +import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata import io.embrace.android.embracesdk.internal.delivery.SupportedEnvelopeType -import io.embrace.android.embracesdk.internal.storage.PayloadReference -val fakeSessionPayloadReference = PayloadReference( - fileName = "session.json", - endpoint = Endpoint.SESSIONS_V2, +val fakeSessionStoredTelemetryMetadata = StoredTelemetryMetadata( + timestamp = DEFAULT_FAKE_CURRENT_TIME, + uuid = "30690ad1-6b87-4e08-b72c-7deca14451d8", envelopeType = SupportedEnvelopeType.SESSION ) -val fakeLogPayloadReference = PayloadReference( - fileName = "log.json", - endpoint = Endpoint.LOGS, +val fakeSessionStoredTelemetryMetadata2 = StoredTelemetryMetadata( + timestamp = DEFAULT_FAKE_CURRENT_TIME + 1, + uuid = "30690ad1-6b87-4e08-b72c-7deca14451d8", + envelopeType = SupportedEnvelopeType.SESSION +) + +val fakeLogStoredTelemetryMetadata = StoredTelemetryMetadata( + timestamp = DEFAULT_FAKE_CURRENT_TIME, + uuid = "6bda3896-d4fd-42ce-89f6-47bec86f1c80", envelopeType = SupportedEnvelopeType.LOG ) From 76ea53552c738be8ca371e7d59a18d45125ae44c Mon Sep 17 00:00:00 2001 From: bidetofevil Date: Fri, 27 Sep 2024 11:38:49 -0700 Subject: [PATCH 3/3] Fully test implemented Scheduling Service functionality --- .../PeriodicBackgroundActivityCacherTest.kt | 11 +- .../scheduling/SchedulingServiceImpl.kt | 86 ++++---- .../internal/storage/StorageService2.kt | 3 + .../scheduling/SchedulingServiceImplTest.kt | 205 ++++++++++++++---- .../embracesdk/assertions/TestAssertions.kt | 6 +- .../BlockingScheduledExecutorService.kt | 11 + .../embracesdk/fixtures/DeliveryFixtures.kt | 6 +- 7 files changed, 235 insertions(+), 93 deletions(-) diff --git a/embrace-android-core/src/test/java/io/embrace/android/embracesdk/internal/session/caching/PeriodicBackgroundActivityCacherTest.kt b/embrace-android-core/src/test/java/io/embrace/android/embracesdk/internal/session/caching/PeriodicBackgroundActivityCacherTest.kt index c0f8e5f4f2..123b29b39c 100644 --- a/embrace-android-core/src/test/java/io/embrace/android/embracesdk/internal/session/caching/PeriodicBackgroundActivityCacherTest.kt +++ b/embrace-android-core/src/test/java/io/embrace/android/embracesdk/internal/session/caching/PeriodicBackgroundActivityCacherTest.kt @@ -41,9 +41,7 @@ internal class PeriodicBackgroundActivityCacherTest { fun `do not save more than once if delay time has not elapsed`() { queueScheduleSave() queueScheduleSave() - val latch = executor.queueCompletionTask() - executor.blockingMode = false - latch.assertCountedDown() + executor.awaitExecutionCompletion() assertEquals(1, executionCount.get()) } @@ -56,8 +54,7 @@ internal class PeriodicBackgroundActivityCacherTest { val latch2 = queueScheduleSave() assertEquals(1, executor.scheduledTasksCount()) executor.moveForwardAndRunBlocked(1999) - executor.blockingMode = false - executor.queueCompletionTask().assertCountedDown() + executor.awaitExecutionCompletion() assertEquals(1, latch2.count) assertEquals(1, executionCount.get()) executor.moveForwardAndRunBlocked(2) @@ -69,9 +66,7 @@ internal class PeriodicBackgroundActivityCacherTest { fun `stopping cacher prevents execution of the pending scheduled save`() { queueScheduleSave() cacher.stop() - val latch = executor.queueCompletionTask() - executor.blockingMode = false - latch.assertCountedDown() + executor.awaitExecutionCompletion() assertEquals(0, executionCount.get()) } diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt index 9b0088e7fe..4fe0880d74 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt @@ -56,48 +56,30 @@ internal class SchedulingServiceImpl( */ private fun deliveryLoop() { try { - var deliveryQueue = getDeliveryQueue() + var deliveryQueue = createPayloadQueue() while (deliveryQueue.isNotEmpty() && readyToSend()) { deliveryQueue.poll()?.let { payload -> - queueDelivery(payload) - } - - if (queryForPayloads.compareAndSet(true, false)) { - deliveryQueue = getDeliveryQueue() + if (payload.shouldSendPayload()) { + payload.envelopeType.endpoint.updateBlockedEndpoint() + queueDelivery(payload) + } } - if (deliveryQueue.isEmpty()) { - deliveryQueue = getDeliveryQueue() + if (queryForPayloads.compareAndSet(true, false) || deliveryQueue.isEmpty()) { + deliveryQueue = createPayloadQueue() } } } finally { sendLoopActive.set(false) - scheduleNextCheck() + scheduleNextDeliveryLoop() } } - private fun getDeliveryQueue() = LinkedList(getReadyPayloads()) - - private fun getReadyPayloads(): List = + private fun createPayloadQueue() = LinkedList( storageService.getPayloadsByPriority() .filter { it.shouldSendPayload() } .sortedWith(storedTelemetryComparator) - - private fun StoredTelemetryMetadata.shouldSendPayload(): Boolean { - // determine if the given payload is eligible to be sent - // i.e. not already being sent, endpoint not blocked by 429, and isn't waiting to be retried - updateBlockedEndpoint(envelopeType.endpoint) - - return if (activeSends.contains(this)) { - false - } else if (blockedEndpoints.containsKey(envelopeType.endpoint)) { - false - } else { - payloadsToRetry[this]?.run { - clock.now() >= nextRetryTimeMs - } ?: true - } - } + ) private fun queueDelivery(payload: StoredTelemetryMetadata): Future { activeSends.add(payload) @@ -118,18 +100,19 @@ internal class SchedulingServiceImpl( // If delivery of this payload should be retried, add or replace the entry in the retry map // with the new values for how many times it has failed, and when the next retry should happen val retryAttempts = payloadsToRetry[payload]?.failedAttempts ?: 0 - val nextRetryTimeMs = calculateNextRetryTime(retryAttempts = retryAttempts) + val nextRetryTimeMs = if (this is ApiResponse.TooManyRequests && retryAfter != null) { + val unblockedTimestampMs = clock.now() + retryAfter as Long + blockedEndpoints[endpoint] = unblockedTimestampMs + unblockedTimestampMs + } else { + calculateNextRetryTime(retryAttempts = retryAttempts) + } + payloadsToRetry[payload] = RetryInstance( failedAttempts = retryAttempts + 1, nextRetryTimeMs = nextRetryTimeMs ) } - - if (this is ApiResponse.TooManyRequests) { - retryAfter?.let { delayMs -> - blockedEndpoints[endpoint] = clock.now() + delayMs - } - } } } else { // Could not find payload. Do not retry. @@ -141,7 +124,7 @@ internal class SchedulingServiceImpl( } } - private fun scheduleNextCheck() { + private fun scheduleNextDeliveryLoop() { payloadsToRetry.map { it.value.nextRetryTimeMs }.minOrNull()?.let { timestampMs -> if (timestampMs <= clock.now()) { startDeliveryLoop() @@ -156,20 +139,39 @@ internal class SchedulingServiceImpl( } private fun readyToSend(): Boolean { - // TODO: determine if the SDK is in a state where it's ready to send payloads, i.e. have network connection and free thread + // TODO: determine if the SDK is in a state where it's ready to send payloads, e.g. have network connection, etc. return true } - private fun updateBlockedEndpoint(endpoint: Endpoint) { - blockedEndpoints[endpoint]?.let { - if (it <= clock.now()) { - blockedEndpoints.remove(endpoint) - } + private fun StoredTelemetryMetadata.shouldSendPayload(): Boolean { + // determine if the given payload is eligible to be sent + // i.e. not already being sent, endpoint not blocked by 429, and isn't waiting to be retried + return if (activeSends.contains(this)) { + false + } else if (isEndpointBlocked()) { + false + } else { + payloadsToRetry[this]?.run { + clock.now() >= nextRetryTimeMs + } ?: true } } private fun StoredTelemetryMetadata.toStream(): InputStream? = storageService.loadPayloadAsStream(this) + private fun StoredTelemetryMetadata.isEndpointBlocked(): Boolean = + blockedEndpoints[envelopeType.endpoint]?.let { timestampMs -> + timestampMs > clock.now() + } ?: false + + private fun Endpoint.updateBlockedEndpoint() { + blockedEndpoints[this]?.let { + if (it <= clock.now()) { + blockedEndpoints.remove(this) + } + } + } + private fun calculateDelay(nextRetryTimeMs: Long): Long = nextRetryTimeMs - clock.now() private fun calculateNextRetryTime(retryAttempts: Int): Long = clock.now() + (INITIAL_DELAY_MS * (1 shl retryAttempts)) diff --git a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt index 350d927c11..c9a9ffd2d7 100644 --- a/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt +++ b/embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/storage/StorageService2.kt @@ -3,6 +3,9 @@ package io.embrace.android.embracesdk.internal.storage import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata import java.io.InputStream +/** + * TODO: merge this into [PayloadStorageService] + */ interface StorageService2 { fun getPayloadsByPriority(): List fun loadPayloadAsStream(payloadMetadata: StoredTelemetryMetadata): InputStream? diff --git a/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt b/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt index cb702c8a4c..f6720a0505 100644 --- a/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt +++ b/embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt @@ -1,6 +1,5 @@ package io.embrace.android.embracesdk.internal.delivery.scheduling -import io.embrace.android.embracesdk.assertions.assertCountedDown import io.embrace.android.embracesdk.concurrency.BlockingScheduledExecutorService import io.embrace.android.embracesdk.fakes.FakeClock import io.embrace.android.embracesdk.fakes.FakeRequestExecutionService @@ -10,9 +9,12 @@ import io.embrace.android.embracesdk.fixtures.fakeLogStoredTelemetryMetadata import io.embrace.android.embracesdk.fixtures.fakeSessionStoredTelemetryMetadata import io.embrace.android.embracesdk.fixtures.fakeSessionStoredTelemetryMetadata2 import io.embrace.android.embracesdk.internal.comms.api.ApiResponse +import io.embrace.android.embracesdk.internal.comms.api.Endpoint import io.embrace.android.embracesdk.internal.delivery.scheduling.SchedulingServiceImpl.Companion.INITIAL_DELAY_MS +import io.embrace.android.embracesdk.internal.payload.SessionPayload import io.embrace.android.embracesdk.internal.worker.Worker import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue import org.junit.Before import org.junit.Test @@ -22,22 +24,25 @@ internal class SchedulingServiceImplTest { private lateinit var executionService: FakeRequestExecutionService private lateinit var schedulingExecutor: BlockingScheduledExecutorService private lateinit var deliveryExecutor: BlockingScheduledExecutorService - private lateinit var clock: FakeClock private lateinit var schedulingService: SchedulingServiceImpl + @Volatile + private lateinit var clock: FakeClock + @Before fun setup() { val workerModule = FakeWorkerThreadModule( testWorkerName = Worker.Background.IoRegWorker, anotherTestWorkerName = Worker.Background.DeliveryWorker ) - schedulingExecutor = workerModule.executor.apply { blockingMode = true } + schedulingExecutor = workerModule.executor.apply { blockingMode = false } deliveryExecutor = workerModule.anotherExecutor.apply { blockingMode = false } clock = workerModule.executorClock storageService = FakeStorageService2( - listOf(fakeSessionStoredTelemetryMetadata, fakeLogStoredTelemetryMetadata) + listOf(fakeLogStoredTelemetryMetadata, fakeSessionStoredTelemetryMetadata) ) executionService = FakeRequestExecutionService() + allSendsSucceed() schedulingService = SchedulingServiceImpl( storageService = storageService, executionService = executionService, @@ -49,77 +54,203 @@ internal class SchedulingServiceImplTest { @Test fun `new payload will trigger new delivery loop if the previous one is done`() { + schedulingExecutor.blockingMode = true schedulingService.onPayloadIntake() assertEquals(1, schedulingExecutor.submitCount) - val latch = schedulingExecutor.queueCompletionTask() - schedulingExecutor.runCurrentlyBlocked() - latch.assertCountedDown() + schedulingExecutor.awaitExecutionCompletion() schedulingService.onPayloadIntake() assertEquals(3, schedulingExecutor.submitCount) } @Test fun `new payload will not trigger new delivery loop job if one is running`() { + schedulingExecutor.blockingMode = true schedulingService.onPayloadIntake() schedulingService.onPayloadIntake() assertEquals(1, schedulingExecutor.submitCount) } @Test - fun `all payloads ready to be sent are queued up`() { - schedulingService.onPayloadIntake() - schedulingExecutor.runCurrentlyBlocked() - val latch = deliveryExecutor.queueCompletionTask() - latch.assertCountedDown() + fun `all payloads ready to be sent are sent in priority order`() { + executionService.constantResponse = success + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() assertEquals(2, executionService.sendAttempts()) + assertEquals(0, storageService.cachedPayloads.size) + assertTrue(executionService.attemptedHttpRequests.first().data is SessionPayload) } @Test - fun `payloads remaining in storage will not be resent if retry period has not ended`() { - executionService.constantResponse = ApiResponse.Failure(code = 500, emptyMap()) - schedulingExecutor.blockingMode = false - schedulingService.onPayloadIntake() - deliveryExecutor.queueCompletionTask().assertCountedDown() - schedulingService.onPayloadIntake() - deliveryExecutor.queueCompletionTask().assertCountedDown() + fun `payloads being sent will not be resent when a new payload arrives`() { + deliveryExecutor.blockingMode = true + waitForOnPayloadIntakeTaskCompletion() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata2) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.blockingMode = false + deliveryExecutor.awaitExecutionCompletion() + assertEquals(3, executionService.sendAttempts()) + } + + @Test + fun `payloads that fail to send will not be ready to be resent immediately`() { + allSendsFail() + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals(2, executionService.sendAttempts()) + assertEquals(2, storageService.cachedPayloads.size) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() assertEquals(2, executionService.sendAttempts()) + assertEquals(2, storageService.cachedPayloads.size) + } + + @Test + fun `payloads that fail to send will be retried with exponential back off`() { + allSendsFail() + schedulingExecutor.blockingMode = true + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + var delay = INITIAL_DELAY_MS + repeat(10) { iteration -> + schedulingExecutor.moveForwardAndRunBlocked(delay + 1) + schedulingExecutor.awaitExecutionCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals( + "Send attempt ${iteration + 1} did not result in the right number of sends after $delay ms", + 2 * (iteration + 2), + executionService.sendAttempts() + ) + assertEquals("Send attempt $iteration failed", 2, storageService.cachedPayloads.size) + delay *= 2 + } } @Test fun `payloads remaining in storage will resent if retry period has ended`() { - executionService.constantResponse = ApiResponse.Failure(code = 500, emptyMap()) - schedulingExecutor.blockingMode = false - schedulingService.onPayloadIntake() - deliveryExecutor.queueCompletionTask().assertCountedDown() + allSendsFail() + schedulingExecutor.blockingMode = true + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() clock.tick(INITIAL_DELAY_MS + 1) - schedulingService.onPayloadIntake() - deliveryExecutor.queueCompletionTask().assertCountedDown() + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() assertEquals(4, executionService.sendAttempts()) } @Test - fun `new payload arrival will trigger it to be sent and not resend in progress payloads`() { + fun `new payload arrival during delivery loop will be picked up and sent without delay`() { + schedulingExecutor.blockingMode = true + deliveryExecutor.blockingMode = true schedulingService.onPayloadIntake() - schedulingExecutor.runCurrentlyBlocked() storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata2) - schedulingService.onPayloadIntake() - val scheduleLatch = schedulingExecutor.queueCompletionTask() - schedulingExecutor.runCurrentlyBlocked() - scheduleLatch.assertCountedDown() - val deliveryLatch = deliveryExecutor.queueCompletionTask() - deliveryLatch.assertCountedDown() + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.blockingMode = false + deliveryExecutor.awaitExecutionCompletion() assertEquals(3, executionService.sendAttempts()) } + @Test + fun `payloads to blocked endpoint will not be sent or retried until duration lapses`() { + val longBlockedDuration = 90_000L + storageService.cachedPayloads.clear() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata) + executionService.constantResponse = ApiResponse.TooManyRequests(endpoint = Endpoint.SESSIONS_V2, longBlockedDuration) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals(1, executionService.sendAttempts()) + allSendsSucceed() + schedulingExecutor.moveForwardAndRunBlocked(INITIAL_DELAY_MS + 1) + deliveryExecutor.awaitExecutionCompletion() + assertEquals(1, executionService.sendAttempts()) + schedulingExecutor.moveForwardAndRunBlocked(longBlockedDuration - INITIAL_DELAY_MS) + deliveryExecutor.awaitExecutionCompletion() + assertEquals(2, executionService.sendAttempts()) + assertEquals(0, storageService.cachedPayloads.size) + } + + @Test + fun `payloads that fail to deliver because of a 429 will be retried before the default delay if endpoint is unblocked earlier`() { + storageService.cachedPayloads.clear() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata) + executionService.constantResponse = ApiResponse.TooManyRequests(endpoint = Endpoint.SESSIONS_V2, SHORT_BLOCKED_DURATION) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + allSendsSucceed() + schedulingExecutor.moveForwardAndRunBlocked(SHORT_BLOCKED_DURATION + 1) + deliveryExecutor.awaitExecutionCompletion() + assertEquals(2, executionService.sendAttempts()) + assertEquals(0, storageService.cachedPayloads.size) + } + + @Test + fun `payloads to unblocked endpoint will not affect other endpoints`() { + storageService.cachedPayloads.clear() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata) + executionService.constantResponse = ApiResponse.TooManyRequests(endpoint = Endpoint.SESSIONS_V2, SHORT_BLOCKED_DURATION) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals(1, executionService.sendAttempts()) + executionService.constantResponse = success + storageService.cachedPayloads.add(fakeLogStoredTelemetryMetadata) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals(2, executionService.sendAttempts()) + assertEquals(1, storageService.cachedPayloads.size) + } + + @Test + fun `concurrent payload sending to the same endpoint will result in only one delivery attempt`() { + storageService.cachedPayloads.clear() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata) + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata2) + executionService.constantResponse = ApiResponse.TooManyRequests(endpoint = Endpoint.SESSIONS_V2, SHORT_BLOCKED_DURATION) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals(1, executionService.sendAttempts()) + schedulingExecutor.moveForwardAndRunBlocked(SHORT_BLOCKED_DURATION + 1) + deliveryExecutor.awaitExecutionCompletion() + assertEquals(2, executionService.sendAttempts()) + } + + @Test + fun `payloads to already blocked endpoint will not be sent`() { + storageService.cachedPayloads.clear() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata) + executionService.constantResponse = ApiResponse.TooManyRequests(endpoint = Endpoint.SESSIONS_V2, SHORT_BLOCKED_DURATION) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + storageService.cachedPayloads.add(fakeSessionStoredTelemetryMetadata2) + waitForOnPayloadIntakeTaskCompletion() + deliveryExecutor.awaitExecutionCompletion() + assertEquals(1, executionService.sendAttempts()) + } + @Test fun `no sent attempt will be made if a payload cannot be found on disk`() { deliveryExecutor.blockingMode = true - schedulingService.onPayloadIntake() - schedulingExecutor.runCurrentlyBlocked() + waitForOnPayloadIntakeTaskCompletion() storageService.cachedPayloads.remove(fakeLogStoredTelemetryMetadata) deliveryExecutor.blockingMode = false - val deliveryLatch = deliveryExecutor.queueCompletionTask() - deliveryLatch.assertCountedDown() + deliveryExecutor.awaitExecutionCompletion() assertEquals(1, executionService.sendAttempts()) } + + private fun waitForOnPayloadIntakeTaskCompletion() { + schedulingService.onPayloadIntake() + schedulingExecutor.awaitExecutionCompletion() + } + + private fun allSendsSucceed() { + executionService.constantResponse = success + } + + private fun allSendsFail() { + executionService.constantResponse = failure + } + + private companion object { + const val SHORT_BLOCKED_DURATION = 30_000L + val success = ApiResponse.Success(body = "", headers = emptyMap()) + val failure = ApiResponse.Failure(code = 500, emptyMap()) + } } diff --git a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/assertions/TestAssertions.kt b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/assertions/TestAssertions.kt index eb626625dd..6da4fb22e6 100644 --- a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/assertions/TestAssertions.kt +++ b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/assertions/TestAssertions.kt @@ -4,7 +4,7 @@ import org.junit.Assert.assertEquals import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -fun CountDownLatch.assertCountedDown() { - await(1, TimeUnit.SECONDS) - assertEquals(0, count) +fun CountDownLatch.assertCountedDown(waitTimeMs: Long = 1000L) { + await(waitTimeMs, TimeUnit.MILLISECONDS) + assertEquals("Operation timed out after $waitTimeMs ms", 0, count) } diff --git a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/concurrency/BlockingScheduledExecutorService.kt b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/concurrency/BlockingScheduledExecutorService.kt index de54c27f27..272e95e0b2 100644 --- a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/concurrency/BlockingScheduledExecutorService.kt +++ b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/concurrency/BlockingScheduledExecutorService.kt @@ -1,5 +1,6 @@ package io.embrace.android.embracesdk.concurrency +import io.embrace.android.embracesdk.assertions.assertCountedDown import io.embrace.android.embracesdk.fakes.FakeClock import java.util.LinkedList import java.util.concurrent.AbstractExecutorService @@ -211,9 +212,19 @@ class BlockingScheduledExecutorService( submit { latch.countDown() } + if (blockingMode) { + runCurrentlyBlocked() + } return latch } + /** + * Wait [waitTimeMs] for all the queued up jobs on this executor to finish running + */ + fun awaitExecutionCompletion(waitTimeMs: Long = 1000L) { + queueCompletionTask().assertCountedDown(waitTimeMs) + } + private fun submitOrQueue(delay: Long, futureTask: BlockedFutureScheduledTask) { if (delay <= 0L) { submit(futureTask) diff --git a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt index c00f7c59dc..058d2db1c8 100644 --- a/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt +++ b/embrace-test-fakes/src/main/kotlin/io/embrace/android/embracesdk/fixtures/DeliveryFixtures.kt @@ -5,19 +5,19 @@ import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata import io.embrace.android.embracesdk.internal.delivery.SupportedEnvelopeType val fakeSessionStoredTelemetryMetadata = StoredTelemetryMetadata( - timestamp = DEFAULT_FAKE_CURRENT_TIME, + timestamp = DEFAULT_FAKE_CURRENT_TIME + 2000L, uuid = "30690ad1-6b87-4e08-b72c-7deca14451d8", envelopeType = SupportedEnvelopeType.SESSION ) val fakeSessionStoredTelemetryMetadata2 = StoredTelemetryMetadata( - timestamp = DEFAULT_FAKE_CURRENT_TIME + 1, + timestamp = DEFAULT_FAKE_CURRENT_TIME + 10_000L, uuid = "30690ad1-6b87-4e08-b72c-7deca14451d8", envelopeType = SupportedEnvelopeType.SESSION ) val fakeLogStoredTelemetryMetadata = StoredTelemetryMetadata( - timestamp = DEFAULT_FAKE_CURRENT_TIME, + timestamp = DEFAULT_FAKE_CURRENT_TIME + 500L, uuid = "6bda3896-d4fd-42ce-89f6-47bec86f1c80", envelopeType = SupportedEnvelopeType.LOG )