Skip to content

Commit

Permalink
Fully test implemented Scheduling Service functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
bidetofevil committed Sep 27, 2024
1 parent e3079c6 commit 76ea535
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ internal class PeriodicBackgroundActivityCacherTest {
fun `do not save more than once if delay time has not elapsed`() {
queueScheduleSave()
queueScheduleSave()
val latch = executor.queueCompletionTask()
executor.blockingMode = false
latch.assertCountedDown()
executor.awaitExecutionCompletion()
assertEquals(1, executionCount.get())
}

Expand All @@ -56,8 +54,7 @@ internal class PeriodicBackgroundActivityCacherTest {
val latch2 = queueScheduleSave()
assertEquals(1, executor.scheduledTasksCount())
executor.moveForwardAndRunBlocked(1999)
executor.blockingMode = false
executor.queueCompletionTask().assertCountedDown()
executor.awaitExecutionCompletion()
assertEquals(1, latch2.count)
assertEquals(1, executionCount.get())
executor.moveForwardAndRunBlocked(2)
Expand All @@ -69,9 +66,7 @@ internal class PeriodicBackgroundActivityCacherTest {
fun `stopping cacher prevents execution of the pending scheduled save`() {
queueScheduleSave()
cacher.stop()
val latch = executor.queueCompletionTask()
executor.blockingMode = false
latch.assertCountedDown()
executor.awaitExecutionCompletion()
assertEquals(0, executionCount.get())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,48 +56,30 @@ internal class SchedulingServiceImpl(
*/
private fun deliveryLoop() {
try {
var deliveryQueue = getDeliveryQueue()
var deliveryQueue = createPayloadQueue()
while (deliveryQueue.isNotEmpty() && readyToSend()) {
deliveryQueue.poll()?.let { payload ->
queueDelivery(payload)
}

if (queryForPayloads.compareAndSet(true, false)) {
deliveryQueue = getDeliveryQueue()
if (payload.shouldSendPayload()) {
payload.envelopeType.endpoint.updateBlockedEndpoint()
queueDelivery(payload)
}
}

if (deliveryQueue.isEmpty()) {
deliveryQueue = getDeliveryQueue()
if (queryForPayloads.compareAndSet(true, false) || deliveryQueue.isEmpty()) {
deliveryQueue = createPayloadQueue()
}
}
} finally {
sendLoopActive.set(false)
scheduleNextCheck()
scheduleNextDeliveryLoop()
}
}

private fun getDeliveryQueue() = LinkedList(getReadyPayloads())

private fun getReadyPayloads(): List<StoredTelemetryMetadata> =
private fun createPayloadQueue() = LinkedList(
storageService.getPayloadsByPriority()
.filter { it.shouldSendPayload() }
.sortedWith(storedTelemetryComparator)

private fun StoredTelemetryMetadata.shouldSendPayload(): Boolean {
// determine if the given payload is eligible to be sent
// i.e. not already being sent, endpoint not blocked by 429, and isn't waiting to be retried
updateBlockedEndpoint(envelopeType.endpoint)

return if (activeSends.contains(this)) {
false
} else if (blockedEndpoints.containsKey(envelopeType.endpoint)) {
false
} else {
payloadsToRetry[this]?.run {
clock.now() >= nextRetryTimeMs
} ?: true
}
}
)

private fun queueDelivery(payload: StoredTelemetryMetadata): Future<ApiResponse> {
activeSends.add(payload)
Expand All @@ -118,18 +100,19 @@ internal class SchedulingServiceImpl(
// If delivery of this payload should be retried, add or replace the entry in the retry map
// with the new values for how many times it has failed, and when the next retry should happen
val retryAttempts = payloadsToRetry[payload]?.failedAttempts ?: 0
val nextRetryTimeMs = calculateNextRetryTime(retryAttempts = retryAttempts)
val nextRetryTimeMs = if (this is ApiResponse.TooManyRequests && retryAfter != null) {
val unblockedTimestampMs = clock.now() + retryAfter as Long
blockedEndpoints[endpoint] = unblockedTimestampMs
unblockedTimestampMs
} else {
calculateNextRetryTime(retryAttempts = retryAttempts)
}

payloadsToRetry[payload] = RetryInstance(
failedAttempts = retryAttempts + 1,
nextRetryTimeMs = nextRetryTimeMs
)
}

if (this is ApiResponse.TooManyRequests) {
retryAfter?.let { delayMs ->
blockedEndpoints[endpoint] = clock.now() + delayMs
}
}
}
} else {
// Could not find payload. Do not retry.
Expand All @@ -141,7 +124,7 @@ internal class SchedulingServiceImpl(
}
}

private fun scheduleNextCheck() {
private fun scheduleNextDeliveryLoop() {
payloadsToRetry.map { it.value.nextRetryTimeMs }.minOrNull()?.let { timestampMs ->
if (timestampMs <= clock.now()) {
startDeliveryLoop()
Expand All @@ -156,20 +139,39 @@ internal class SchedulingServiceImpl(
}

private fun readyToSend(): Boolean {
// TODO: determine if the SDK is in a state where it's ready to send payloads, i.e. have network connection and free thread
// TODO: determine if the SDK is in a state where it's ready to send payloads, e.g. have network connection, etc.
return true
}

private fun updateBlockedEndpoint(endpoint: Endpoint) {
blockedEndpoints[endpoint]?.let {
if (it <= clock.now()) {
blockedEndpoints.remove(endpoint)
}
private fun StoredTelemetryMetadata.shouldSendPayload(): Boolean {
// determine if the given payload is eligible to be sent
// i.e. not already being sent, endpoint not blocked by 429, and isn't waiting to be retried
return if (activeSends.contains(this)) {
false
} else if (isEndpointBlocked()) {
false
} else {
payloadsToRetry[this]?.run {
clock.now() >= nextRetryTimeMs
} ?: true
}
}

private fun StoredTelemetryMetadata.toStream(): InputStream? = storageService.loadPayloadAsStream(this)

private fun StoredTelemetryMetadata.isEndpointBlocked(): Boolean =
blockedEndpoints[envelopeType.endpoint]?.let { timestampMs ->
timestampMs > clock.now()
} ?: false

private fun Endpoint.updateBlockedEndpoint() {
blockedEndpoints[this]?.let {
if (it <= clock.now()) {
blockedEndpoints.remove(this)
}
}
}

private fun calculateDelay(nextRetryTimeMs: Long): Long = nextRetryTimeMs - clock.now()

private fun calculateNextRetryTime(retryAttempts: Int): Long = clock.now() + (INITIAL_DELAY_MS * (1 shl retryAttempts))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package io.embrace.android.embracesdk.internal.storage
import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata
import java.io.InputStream

/**
* TODO: merge this into [PayloadStorageService]
*/
interface StorageService2 {
fun getPayloadsByPriority(): List<StoredTelemetryMetadata>
fun loadPayloadAsStream(payloadMetadata: StoredTelemetryMetadata): InputStream?
Expand Down
Loading

0 comments on commit 76ea535

Please sign in to comment.