From 5cbbf87c3700ed3549d96d2f257117a711dba7ee Mon Sep 17 00:00:00 2001 From: Luca Kellermann Date: Sun, 11 Aug 2024 03:52:17 +0200 Subject: [PATCH] Add more debug information on consume cancellation --- .../kotlin/ratelimit/IdentifyRateLimiter.kt | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/gateway/src/commonMain/kotlin/ratelimit/IdentifyRateLimiter.kt b/gateway/src/commonMain/kotlin/ratelimit/IdentifyRateLimiter.kt index b0743bbd2ee9..e6790fee38d1 100644 --- a/gateway/src/commonMain/kotlin/ratelimit/IdentifyRateLimiter.kt +++ b/gateway/src/commonMain/kotlin/ratelimit/IdentifyRateLimiter.kt @@ -77,12 +77,17 @@ private class IdentifyRateLimiterImpl( override suspend fun consume(shardId: Int, events: SharedFlow) { require(shardId >= 0) { "shardId must be non-negative but was $shardId" } - // if the coroutine that called consume() is cancelled, the CancellableContinuation makes sure the waiting is - // stopped (the Gateway won't try to identify), so we don't need to hold the mutex and waste time for other - // calls return suspendCancellableCoroutine { continuation -> - val job = launchIdentifyWaiter(shardId, events, continuation) - continuation.invokeOnCancellation { job.cancel() } + val waiter = launchIdentifyWaiter(shardId, events, continuation) + // this will be invoked if the coroutine that called consume() is cancelled + continuation.invokeOnCancellation { cause -> + // stop the waiter, so we don't hold the mutex and waste time for other consume() calls (the Gateway + // won't try to identify if it was cancelled at this point) + waiter.cancel("Identify waiter was cancelled because consume() was cancelled", cause) + logger.debug(cause) { + "Identifying on shard $shardId with rate_limit_key ${shardId % maxConcurrency} was cancelled" + } + } } }