Skip to content

Commit

Permalink
Save crashes synchronously and process other unbatched log requests i…
Browse files Browse the repository at this point in the history
…n background thread (#1322)

## Goal

Changed the `SendImmediately` designation to a `SendMode` enum to additionally allow logs to be saved instead of sent, and use this to save crash logs rather than send them. This ensures they will be delivered on the next app startup in a consistent matter, rather than rely on our existing, flakey mechanism to attempt to send synchronously, and only if it fails, save the payload.

Bundled with this change is the need to serialize the log payload and create the request for other unbatched logs off the calling thread. Right now, they are not being done on a background thread - only sent through it - so this could be problematic depending on where it's being invoked. Instead, we will run them on the same scheduler that we use to run the log batching job.

## Testing

Added unit tests everywhere to ensure this SendMode concept is passed down. Also added integration test to ensure that this works.
  • Loading branch information
bidetofevil authored Sep 5, 2024
1 parent f80b515 commit 9771d7f
Show file tree
Hide file tree
Showing 25 changed files with 273 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.embrace.android.embracesdk.internal.arch.schema
public sealed class EmbType(type: String, subtype: String?) : TelemetryType {
override val key: EmbraceAttributeKey = EmbraceAttributeKey(id = "type")
override val value: String = type + (subtype?.run { ".$this" } ?: "")
override val sendImmediately: Boolean = false
override val sendMode: SendMode = SendMode.DEFAULT

/**
* Keys that track how fast a time interval is. Only applies to spans.
Expand Down Expand Up @@ -48,7 +48,7 @@ public sealed class EmbType(type: String, subtype: String?) : TelemetryType {
*/
public sealed class System(
subtype: String,
override val sendImmediately: Boolean = false
override val sendMode: SendMode = SendMode.DEFAULT
) : EmbType("sys", subtype) {

public object Breadcrumb : System("breadcrumb")
Expand All @@ -71,19 +71,19 @@ public sealed class EmbType(type: String, subtype: String?) : TelemetryType {
public val embFlutterExceptionLibrary: EmbraceAttributeKey = EmbraceAttributeKey("exception.library")
}

public object Exit : System("exit", true)
public object Exit : System("exit", SendMode.IMMEDIATE)

public object PushNotification : System("push_notification")

public object Crash : System("android.crash", true) {
public object Crash : System("android.crash", SendMode.DEFER) {
/**
* The list of [Throwable] that caused the exception responsible for a crash
*/
public val embAndroidCrashExceptionCause: EmbraceAttributeKey =
EmbraceAttributeKey("android.crash.exception_cause")
}

public object ReactNativeCrash : System("android.react_native_crash", true) {
public object ReactNativeCrash : System("android.react_native_crash", SendMode.DEFER) {
/**
* The JavaScript unhandled exception from the ReactNative layer
*/
Expand All @@ -92,9 +92,9 @@ public sealed class EmbType(type: String, subtype: String?) : TelemetryType {
)
}

public object ReactNativeAction : System("rn_action", true)
public object ReactNativeAction : System("rn_action")

public object NativeCrash : System("android.native_crash", true) {
public object NativeCrash : System("android.native_crash", SendMode.DEFER) {
/**
* Exception coming from the native layer
*/
Expand Down Expand Up @@ -122,7 +122,7 @@ public sealed class EmbType(type: String, subtype: String?) : TelemetryType {

public object Sigquit : System("sigquit")

public object NetworkCapturedRequest : System("network_capture", true)
public object NetworkCapturedRequest : System("network_capture", SendMode.IMMEDIATE)

public object NetworkStatus : System("network_status")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.embrace.android.embracesdk.internal.arch.schema

import io.embrace.android.embracesdk.internal.opentelemetry.embSendMode
import io.embrace.android.embracesdk.internal.payload.AppExitInfoData
import io.embrace.android.embracesdk.internal.payload.NetworkCapturedCall
import io.embrace.android.embracesdk.internal.utils.toNonNullMap
Expand All @@ -23,8 +24,8 @@ public sealed class SchemaType(
protected abstract val schemaAttributes: Map<String, String>

private val commonAttributes: Map<String, String> = mutableMapOf<String, String>().apply {
if (telemetryType.sendImmediately) {
plusAssign(SendImmediately.toEmbraceKeyValuePair())
if (telemetryType.sendMode != SendMode.DEFAULT) {
plusAssign(embSendMode.name to telemetryType.sendMode.name)
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.embrace.android.embracesdk.internal.arch.schema

import io.embrace.android.embracesdk.internal.arch.schema.SendMode.DEFAULT
import io.embrace.android.embracesdk.internal.arch.schema.SendMode.DEFER
import io.embrace.android.embracesdk.internal.arch.schema.SendMode.IMMEDIATE

/**
* How a given payload should be delivered to the Embrace server
*/
public enum class SendMode {
/**
* Use the default delivery semantics - no customization required
*/
DEFAULT,

/**
* If supported for that signal/payload type, deliver this as soon as possible and do not batch
*/
IMMEDIATE,

/**
* Queue for delivery at the next convenient time. Used when the delivery environment is unstable, e.g. when an app is about to crash.
*/
DEFER
}

public fun String.toSendMode(): SendMode {
return when (lowercase()) {
"immediate" -> IMMEDIATE
"defer" -> DEFER
else -> DEFAULT
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ package io.embrace.android.embracesdk.internal.arch.schema
* backend that it can assume the data in the event follows a particular schema.
*/
public interface TelemetryType : FixedAttribute {
public val sendImmediately: Boolean
public val sendMode: SendMode
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package io.embrace.android.embracesdk.internal.envelope.log

import io.embrace.android.embracesdk.internal.logs.LogRequest
import io.embrace.android.embracesdk.internal.payload.Envelope
import io.embrace.android.embracesdk.internal.payload.LogPayload

public interface LogEnvelopeSource {
public fun getBatchedLogEnvelope(): Envelope<LogPayload>

public fun getNonbatchedEnvelope(): List<Envelope<LogPayload>>
public fun getSingleLogEnvelopes(): List<LogRequest<Envelope<LogPayload>>>
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.embrace.android.embracesdk.internal.envelope.log

import io.embrace.android.embracesdk.internal.envelope.metadata.EnvelopeMetadataSource
import io.embrace.android.embracesdk.internal.envelope.resource.EnvelopeResourceSource
import io.embrace.android.embracesdk.internal.logs.LogRequest
import io.embrace.android.embracesdk.internal.payload.Envelope
import io.embrace.android.embracesdk.internal.payload.LogPayload

Expand All @@ -13,10 +14,10 @@ internal class LogEnvelopeSourceImpl(

override fun getBatchedLogEnvelope(): Envelope<LogPayload> = getLogEnvelope(logPayloadSource.getBatchedLogPayload())

override fun getNonbatchedEnvelope(): List<Envelope<LogPayload>> {
val payloads = logPayloadSource.getNonbatchedLogPayloads()
override fun getSingleLogEnvelopes(): List<LogRequest<Envelope<LogPayload>>> {
val payloads = logPayloadSource.getSingleLogPayloads()
return if (payloads.isNotEmpty()) {
payloads.map { getLogEnvelope(it) }
payloads.map { LogRequest(payload = getLogEnvelope(it.payload), defer = it.defer) }
} else {
emptyList()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.embrace.android.embracesdk.internal.envelope.log

import io.embrace.android.embracesdk.internal.logs.LogRequest
import io.embrace.android.embracesdk.internal.payload.LogPayload

public interface LogPayloadSource {
Expand All @@ -12,5 +13,5 @@ public interface LogPayloadSource {
/**
* Returns a list of [LogPayload] that each contain a single high priority log
*/
public fun getNonbatchedLogPayloads(): List<LogPayload>
public fun getSingleLogPayloads(): List<LogRequest<LogPayload>>
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.embrace.android.embracesdk.internal.envelope.log

import io.embrace.android.embracesdk.internal.logs.LogRequest
import io.embrace.android.embracesdk.internal.logs.LogSink
import io.embrace.android.embracesdk.internal.payload.LogPayload

Expand All @@ -9,24 +10,29 @@ internal class LogPayloadSourceImpl(

override fun getBatchedLogPayload(): LogPayload {
return LogPayload(
logs = logSink.flushLogs()
logs = logSink.flushBatch()
)
}

override fun getNonbatchedLogPayloads(): List<LogPayload> {
val nonbatchedLogs = mutableListOf<LogPayload>()
var log = logSink.pollNonbatchedLog()
override fun getSingleLogPayloads(): List<LogRequest<LogPayload>> {
val logRequests = mutableListOf<LogRequest<LogPayload>>()
var logRequest = logSink.pollUnbatchedLog()

while (log != null) {
nonbatchedLogs.add(LogPayload(logs = listOf(log)))
log = if (nonbatchedLogs.size < MAX_PAYLOADS) {
logSink.pollNonbatchedLog()
while (logRequest != null) {
logRequests.add(
LogRequest(
payload = LogPayload(logs = listOf(logRequest.payload)),
defer = logRequest.defer
)
)
logRequest = if (logRequests.size < MAX_PAYLOADS) {
logSink.pollUnbatchedLog()
} else {
null
}
}

return nonbatchedLogs
return logRequests
}

private companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

internal class LogOrchestratorImpl(
private val logOrchestratorScheduledWorker: ScheduledWorker,
private val worker: ScheduledWorker,
private val clock: Clock,
private val sink: LogSink,
private val deliveryService: DeliveryService,
Expand Down Expand Up @@ -49,8 +49,14 @@ internal class LogOrchestratorImpl(
}

private fun onLogsAdded() {
logEnvelopeSource.getNonbatchedEnvelope().forEach { logEnvelope ->
deliveryService.sendLogs(logEnvelope)
logEnvelopeSource.getSingleLogEnvelopes().forEach { logRequest ->
if (logRequest.defer) {
deliveryService.saveLogs(logRequest.payload)
} else {
worker.submit {
deliveryService.sendLogs(logRequest.payload)
}
}
}

lastLogTime.set(clock.now())
Expand Down Expand Up @@ -85,15 +91,15 @@ internal class LogOrchestratorImpl(
val nextBatchCheck = MAX_BATCH_TIME - (now - firstLogInBatchTime.get())
val nextInactivityCheck = MAX_INACTIVITY_TIME - (now - lastLogTime.get())
scheduledCheckFuture?.cancel(false)
scheduledCheckFuture = logOrchestratorScheduledWorker.schedule<Unit>(
scheduledCheckFuture = worker.schedule<Unit>(
::sendLogsIfNeeded,
min(nextBatchCheck, nextInactivityCheck),
TimeUnit.MILLISECONDS
)
}

private fun isMaxLogsPerBatchReached(): Boolean =
sink.completedLogs().size >= MAX_LOGS_PER_BATCH
sink.logsForNextBatch().size >= MAX_LOGS_PER_BATCH

private fun isMaxInactivityTimeReached(now: Long): Boolean =
now - lastLogTime.get() >= MAX_INACTIVITY_TIME
Expand All @@ -103,8 +109,8 @@ internal class LogOrchestratorImpl(
return firstLogInBatchTime != 0L && now - firstLogInBatchTime >= MAX_BATCH_TIME
}

public companion object {
public const val MAX_LOGS_PER_BATCH: Int = 50
companion object {
const val MAX_LOGS_PER_BATCH: Int = 50
private const val MAX_BATCH_TIME = 5000L // In milliseconds
private const val MAX_INACTIVITY_TIME = 2000L // In milliseconds
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.embrace.android.embracesdk.internal.logs

/**
* A wrapper for a log payload that stipulates whether its delivery should be deferred or sent immediately
*/
public data class LogRequest<T>(val payload: T, val defer: Boolean = false)
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ public interface LogSink {
/**
* Returns the list of currently stored [Log] objects, waiting to be sent in the next batch
*/
public fun completedLogs(): List<Log>
public fun logsForNextBatch(): List<Log>

/**
* Returns and clears the currently stored [Log] objects, to be used when the next batch is to be sent.
* Implementations of this method must make sure the clearing and returning is atomic, i.e. logs cannot be added during this operation.
*/
public fun flushLogs(): List<Log>
public fun flushBatch(): List<Log>

/**
* Return a [Log] that is to be sent immediately rather than batched
* Return a [Log] that is to be delivered in its own request
*/
public fun pollNonbatchedLog(): Log?
public fun pollUnbatchedLog(): LogRequest<Log>?

/**
* Registers a callback to be called after new logs are stored.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
package io.embrace.android.embracesdk.internal.logs

import io.embrace.android.embracesdk.internal.arch.schema.SendImmediately
import io.embrace.android.embracesdk.internal.arch.schema.SendMode
import io.embrace.android.embracesdk.internal.arch.schema.toSendMode
import io.embrace.android.embracesdk.internal.opentelemetry.embSendMode
import io.embrace.android.embracesdk.internal.payload.Log
import io.embrace.android.embracesdk.internal.payload.toNewPayload
import io.embrace.android.embracesdk.internal.spans.hasFixedAttribute
import io.embrace.android.embracesdk.internal.utils.threadSafeTake
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.logs.data.LogRecordData
import java.util.concurrent.ConcurrentLinkedQueue

public class LogSinkImpl : LogSink {
private val storedLogs: ConcurrentLinkedQueue<Log> = ConcurrentLinkedQueue()
private val nonbatchedLogs: ConcurrentLinkedQueue<Log> = ConcurrentLinkedQueue()
private val logRequests: ConcurrentLinkedQueue<LogRequest<Log>> = ConcurrentLinkedQueue()
private var onLogsStored: (() -> Unit)? = null
private val flushLock = Any()

override fun storeLogs(logs: List<LogRecordData>): CompletableResultCode {
try {
logs.forEach { log ->
if (log.hasFixedAttribute(SendImmediately)) {
nonbatchedLogs.add(log.toNewPayload())
val sendMode = log.attributes[embSendMode.attributeKey]?.toSendMode() ?: SendMode.DEFAULT
if (sendMode != SendMode.DEFAULT) {
logRequests.add(
LogRequest(
payload = log.toNewPayload(),
defer = sendMode == SendMode.DEFER
)
)
} else {
storedLogs.add(log.toNewPayload())
}
Expand All @@ -32,11 +39,11 @@ public class LogSinkImpl : LogSink {
return CompletableResultCode.ofSuccess()
}

override fun completedLogs(): List<Log> {
override fun logsForNextBatch(): List<Log> {
return storedLogs.toList()
}

override fun flushLogs(): List<Log> {
override fun flushBatch(): List<Log> {
synchronized(flushLock) {
val batchSize = minOf(storedLogs.size, MAX_LOGS_PER_BATCH)
val flushedLogs = storedLogs.threadSafeTake(batchSize)
Expand All @@ -45,7 +52,7 @@ public class LogSinkImpl : LogSink {
}
}

override fun pollNonbatchedLog(): Log? = nonbatchedLogs.poll()
override fun pollUnbatchedLog(): LogRequest<Log>? = logRequests.poll()

override fun registerLogStoredCallback(onLogsStored: () -> Unit) {
this.onLogsStored = onLogsStored
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,8 @@ public val embErrorLogCount: EmbraceAttributeKey = EmbraceAttributeKey("error_lo
* Attribute name that identifies the number of free bytes on disk
*/
public val embFreeDiskBytes: EmbraceAttributeKey = EmbraceAttributeKey("disk_free_bytes")

/**
* Attribute name that identifies how a signal should be delivered to the Embrace backend
*/
public val embSendMode: EmbraceAttributeKey = EmbraceAttributeKey(id = "send_mode", isPrivate = true)
Loading

0 comments on commit 9771d7f

Please sign in to comment.