Skip to content

Commit

Permalink
Partially implemented and tested SchedulignService (#1426)
Browse files Browse the repository at this point in the history
## Goal

Flesh out internal design of implementation of `SchedulingService`. Implemented a portion of the logic and added a portion of test for the implementation. More logic and tests will be added later, but I want to get some feedback on a partially working implementation

## Testing
Some unit tests were added that leverage new added or tweaked fakes.
  • Loading branch information
bidetofevil authored Sep 27, 2024
2 parents fd05972 + 76ea535 commit 9132bb9
Show file tree
Hide file tree
Showing 15 changed files with 565 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ sealed class ApiResponse {
*/
data class Incomplete(val exception: Throwable) : ApiResponse()

/**
* API call not executed because payload was not found
*/
object NoPayload : ApiResponse()

/**
* No response was received
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ internal class EmbraceApiService(
null
}

is ApiResponse.Failure, ApiResponse.None -> {
is ApiResponse.Failure, ApiResponse.None, ApiResponse.NoPayload -> {
null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,12 @@ sealed class Worker(internal val threadName: String) {
* Monitor thread that checks the main thread for ANRs.
*/
object AnrWatchdogWorker : Background("anr-watchdog")

/**
* Delivery Worker
*
* TODO: Make this a PriorityWorker
*/
object DeliveryWorker : Background("delivery")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ internal class PeriodicBackgroundActivityCacherTest {
fun `do not save more than once if delay time has not elapsed`() {
queueScheduleSave()
queueScheduleSave()
val latch = executor.queueCompletionTask()
executor.blockingMode = false
latch.assertCountedDown()
executor.awaitExecutionCompletion()
assertEquals(1, executionCount.get())
}

Expand All @@ -56,8 +54,7 @@ internal class PeriodicBackgroundActivityCacherTest {
val latch2 = queueScheduleSave()
assertEquals(1, executor.scheduledTasksCount())
executor.moveForwardAndRunBlocked(1999)
executor.blockingMode = false
executor.queueCompletionTask().assertCountedDown()
executor.awaitExecutionCompletion()
assertEquals(1, latch2.count)
assertEquals(1, executionCount.get())
executor.moveForwardAndRunBlocked(2)
Expand All @@ -69,9 +66,7 @@ internal class PeriodicBackgroundActivityCacherTest {
fun `stopping cacher prevents execution of the pending scheduled save`() {
queueScheduleSave()
cacher.stop()
val latch = executor.queueCompletionTask()
executor.blockingMode = false
latch.assertCountedDown()
executor.awaitExecutionCompletion()
assertEquals(0, executionCount.get())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.embrace.android.embracesdk.internal.delivery

import io.embrace.android.embracesdk.internal.comms.api.Endpoint
import io.embrace.android.embracesdk.internal.payload.Envelope
import java.lang.reflect.Type

Expand All @@ -8,13 +9,14 @@ import java.lang.reflect.Type
*/
enum class SupportedEnvelopeType(
val serializedType: Type,
val description: String
val description: String,
val endpoint: Endpoint,
) {

CRASH(Envelope.logEnvelopeType, "crash"),
SESSION(Envelope.sessionEnvelopeType, "session"),
LOG(Envelope.logEnvelopeType, "log"),
NETWORK(Envelope.logEnvelopeType, "network");
CRASH(Envelope.logEnvelopeType, "crash", Endpoint.LOGS),
SESSION(Envelope.sessionEnvelopeType, "session", Endpoint.SESSIONS_V2),
LOG(Envelope.logEnvelopeType, "log", Endpoint.LOGS),
NETWORK(Envelope.logEnvelopeType, "network", Endpoint.LOGS);

companion object {
private val valueMap =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ interface SchedulingService : CrashTeardownHandler {
*/
fun onPayloadIntake()
}

class NoopSchedulingService : SchedulingService {
override fun onPayloadIntake() { }

override fun handleCrash(crashId: String) { }
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,187 @@
@file:Suppress("FunctionOnlyReturningConstant")

package io.embrace.android.embracesdk.internal.delivery.scheduling

import io.embrace.android.embracesdk.internal.clock.Clock
import io.embrace.android.embracesdk.internal.comms.api.ApiResponse
import io.embrace.android.embracesdk.internal.comms.api.Endpoint
import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata
import io.embrace.android.embracesdk.internal.delivery.execution.RequestExecutionService
import io.embrace.android.embracesdk.internal.delivery.storedTelemetryComparator
import io.embrace.android.embracesdk.internal.storage.StorageService2
import io.embrace.android.embracesdk.internal.worker.BackgroundWorker
import java.io.InputStream
import java.util.Collections
import java.util.LinkedList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

internal class SchedulingServiceImpl(
@Suppress("unused") private val requestExecutionService: RequestExecutionService
private val storageService: StorageService2,
private val executionService: RequestExecutionService,
private val schedulingWorker: BackgroundWorker,
private val deliveryWorker: BackgroundWorker,
private val clock: Clock
) : SchedulingService {

private val blockedEndpoints: MutableMap<Endpoint, Long> = ConcurrentHashMap()
private val sendLoopActive = AtomicBoolean(false)
private val queryForPayloads = AtomicBoolean(true)
private val activeSends: MutableSet<StoredTelemetryMetadata> = Collections.newSetFromMap(ConcurrentHashMap())
private val payloadsToRetry: MutableMap<StoredTelemetryMetadata, RetryInstance> = ConcurrentHashMap()

override fun onPayloadIntake() {
queryForPayloads.set(true)
startDeliveryLoop()
}

override fun handleCrash(crashId: String) {
// TODO: get ready to die
}

override fun onPayloadIntake() {
private fun startDeliveryLoop() {
// When a payload arrives, check to see if there's already an active job try to deliver payloads
// If not, schedule job. If so, do nothing.
if (sendLoopActive.compareAndSet(false, true)) {
schedulingWorker.submit {
deliveryLoop()
}
}
}

/**
* Loop through the payloads ready to be sent by priority and queue for delivery
*/
private fun deliveryLoop() {
try {
var deliveryQueue = createPayloadQueue()
while (deliveryQueue.isNotEmpty() && readyToSend()) {
deliveryQueue.poll()?.let { payload ->
if (payload.shouldSendPayload()) {
payload.envelopeType.endpoint.updateBlockedEndpoint()
queueDelivery(payload)
}
}

if (queryForPayloads.compareAndSet(true, false) || deliveryQueue.isEmpty()) {
deliveryQueue = createPayloadQueue()
}
}
} finally {
sendLoopActive.set(false)
scheduleNextDeliveryLoop()
}
}

private fun createPayloadQueue() = LinkedList(
storageService.getPayloadsByPriority()
.filter { it.shouldSendPayload() }
.sortedWith(storedTelemetryComparator)
)

private fun queueDelivery(payload: StoredTelemetryMetadata): Future<ApiResponse> {
activeSends.add(payload)
return deliveryWorker.submit<ApiResponse> {
try {
val payloadStream = payload.toStream()
if (payloadStream != null) {
executionService.attemptHttpRequest(
payloadStream = { payloadStream },
envelopeType = payload.envelopeType
).apply {
if (!shouldRetry) {
// If the response is such that we should not ever retry the delivery of this payload,
// delete it from both the in memory retry payloads map and on disk
payloadsToRetry.remove(payload)
storageService.deletePayload(payload)
} else {
// If delivery of this payload should be retried, add or replace the entry in the retry map
// with the new values for how many times it has failed, and when the next retry should happen
val retryAttempts = payloadsToRetry[payload]?.failedAttempts ?: 0
val nextRetryTimeMs = if (this is ApiResponse.TooManyRequests && retryAfter != null) {
val unblockedTimestampMs = clock.now() + retryAfter as Long
blockedEndpoints[endpoint] = unblockedTimestampMs
unblockedTimestampMs
} else {
calculateNextRetryTime(retryAttempts = retryAttempts)
}

payloadsToRetry[payload] = RetryInstance(
failedAttempts = retryAttempts + 1,
nextRetryTimeMs = nextRetryTimeMs
)
}
}
} else {
// Could not find payload. Do not retry.
ApiResponse.NoPayload
}
} finally {
activeSends.remove(payload)
}
}
}

private fun scheduleNextDeliveryLoop() {
payloadsToRetry.map { it.value.nextRetryTimeMs }.minOrNull()?.let { timestampMs ->
if (timestampMs <= clock.now()) {
startDeliveryLoop()
} else if (timestampMs != Long.MAX_VALUE) {
schedulingWorker.schedule<Unit>(
::startDeliveryLoop,
calculateDelay(timestampMs),
TimeUnit.MILLISECONDS
)
}
}
}

private fun readyToSend(): Boolean {
// TODO: determine if the SDK is in a state where it's ready to send payloads, e.g. have network connection, etc.
return true
}

private fun StoredTelemetryMetadata.shouldSendPayload(): Boolean {
// determine if the given payload is eligible to be sent
// i.e. not already being sent, endpoint not blocked by 429, and isn't waiting to be retried
return if (activeSends.contains(this)) {
false
} else if (isEndpointBlocked()) {
false
} else {
payloadsToRetry[this]?.run {
clock.now() >= nextRetryTimeMs
} ?: true
}
}

private fun StoredTelemetryMetadata.toStream(): InputStream? = storageService.loadPayloadAsStream(this)

private fun StoredTelemetryMetadata.isEndpointBlocked(): Boolean =
blockedEndpoints[envelopeType.endpoint]?.let { timestampMs ->
timestampMs > clock.now()
} ?: false

private fun Endpoint.updateBlockedEndpoint() {
blockedEndpoints[this]?.let {
if (it <= clock.now()) {
blockedEndpoints.remove(this)
}
}
}

private fun calculateDelay(nextRetryTimeMs: Long): Long = nextRetryTimeMs - clock.now()

private fun calculateNextRetryTime(retryAttempts: Int): Long = clock.now() + (INITIAL_DELAY_MS * (1 shl retryAttempts))

private data class RetryInstance(
val failedAttempts: Int,
val nextRetryTimeMs: Long
)

companion object {
const val INITIAL_DELAY_MS = 60_000L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import io.embrace.android.embracesdk.internal.delivery.intake.IntakeService
import io.embrace.android.embracesdk.internal.delivery.intake.IntakeServiceImpl
import io.embrace.android.embracesdk.internal.delivery.resurrection.PayloadResurrectionService
import io.embrace.android.embracesdk.internal.delivery.resurrection.PayloadResurrectionServiceImpl
import io.embrace.android.embracesdk.internal.delivery.scheduling.NoopSchedulingService
import io.embrace.android.embracesdk.internal.delivery.scheduling.SchedulingService
import io.embrace.android.embracesdk.internal.delivery.scheduling.SchedulingServiceImpl
import io.embrace.android.embracesdk.internal.delivery.storage.PayloadStorageService
import io.embrace.android.embracesdk.internal.delivery.storage.PayloadStorageServiceImpl
import io.embrace.android.embracesdk.internal.worker.Worker
Expand Down Expand Up @@ -73,6 +73,6 @@ internal class DeliveryModule2Impl(
if (configModule.configService.isOnlyUsingOtelExporters()) {
return@singleton null
}
SchedulingServiceImpl(checkNotNull(requestExecutionService))
NoopSchedulingService()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.embrace.android.embracesdk.internal.storage

import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata
import java.io.InputStream

/**
* TODO: merge this into [PayloadStorageService]
*/
interface StorageService2 {
fun getPayloadsByPriority(): List<StoredTelemetryMetadata>
fun loadPayloadAsStream(payloadMetadata: StoredTelemetryMetadata): InputStream?
fun deletePayload(payloadMetadata: StoredTelemetryMetadata)
}
Loading

0 comments on commit 9132bb9

Please sign in to comment.