From 0255a4464b17f0f9cd2b69efad8bf051a12a7a92 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 23 Oct 2024 13:58:48 +0200 Subject: [PATCH] Use Duration as the ground truth for communicating durations Historically, the library evolved using "a Long of milliseconds" as the standard of denoting durations. Since then, `kotlin.time.Duration` appeared, encompassing a number of useful conversions. There are several consequences to this change. - Before, `delay(Long)` and `delay(Duration)` were not easily expressed via one another. For example, `delay(Long.MAX_VALUE / 2 + 1)` (up until `Long.MAX_VALUE`) used to be considered a valid delay, but it was not expressible in `delay(Duration)`. Therefore, `delay(Long)` was the more fundamental implementation. However, `delay(Duration)` could not just be expressed as `delay(inWholeMilliseconds)`, as we need to round the durations up when delaying events, and this required complex logic. With this change, `delay(Duration)` is taken as the standard implementation, and `delay(Long)` is just `delay(timeMillis.milliseconds)`, simplifying the conceptual space. - The same goes for other APIs accepting either a duration or some Long number of milliseconds. - In several platform APIs, we are actually able to pass nanoseconds as the duration to wait for. We can now accurately do that. This precision is unlikely to be important in practice, but it is still nice that we are not losing any information in transit. - On Android's main thread, it's no longer possible to wait for `Long.MAX_VALUE / 2` milliseconds: it's considered an infinite duration. `Long.MAX_VALUE / 2 - 1` is still fine. - In `kotlinx-coroutines-test`, before, it was possible to observe correct behavior for up to `Long.MAX_VALUE` milliseconds. Now, this value is drastically reduced, to be able to test the nanosecond precision. - In `kotlinx-coroutines-test`, we now fail with an `IllegalStateException` if we enter the representable ceiling of time during the test. Before, we used to continue the test execution, only using the order in which tasks arrived but not their virtual time values. --- docs/topics/cancellation-and-timeouts.md | 2 +- .../api/kotlinx-coroutines-core.api | 11 +- .../api/kotlinx-coroutines-core.klib.api | 7 +- kotlinx-coroutines-core/common/src/Delay.kt | 57 +++---- .../common/src/EventLoop.common.kt | 21 +-- kotlinx-coroutines-core/common/src/Timeout.kt | 70 +++++---- .../common/src/Unconfined.kt | 2 +- .../common/src/flow/SharingStarted.kt | 19 +-- .../common/src/flow/operators/Delay.kt | 147 +++++++++--------- .../common/src/selects/OnTimeout.kt | 13 +- .../common/test/WithTimeoutDurationTest.kt | 2 +- .../common/test/WithTimeoutTest.kt | 2 +- .../common/test/flow/VirtualTime.kt | 62 +++++--- .../test/flow/sharing/SharingStartedTest.kt | 27 ++-- kotlinx-coroutines-core/jdk8/src/time/Time.kt | 37 +---- .../src/internal/JSDispatcher.kt | 28 ++-- .../jvm/src/DefaultExecutor.kt | 5 +- kotlinx-coroutines-core/jvm/src/EventLoop.kt | 3 - kotlinx-coroutines-core/jvm/src/Executors.kt | 17 +- .../jvm/src/channels/TickerChannels.kt | 35 +++-- .../jvm/src/internal/MainDispatchers.kt | 5 +- ...edFromLexicalBlockWhenTriggeredByChild.txt | 5 +- ...acktraceIsRecoveredFromSuspensionPoint.txt | 5 +- ...sRecoveredFromSuspensionPointWithChild.txt | 5 +- .../ExecutorAsCoroutineDispatcherDelayTest.kt | 5 +- .../jvm/test/examples/test/FlowDelayTest.kt | 24 +-- .../jvm/test/flow/SharingStressTest.kt | 2 +- .../test/guide/test/CancellationGuideTest.kt | 2 +- ...CoroutineSchedulerInternalApiStressTest.kt | 4 +- .../native/src/CoroutineContext.kt | 9 +- .../native/src/EventLoop.kt | 12 +- .../native/src/MultithreadedDispatchers.kt | 12 +- .../nativeDarwin/src/Dispatchers.kt | 14 +- .../wasmWasi/src/EventLoop.kt | 5 +- .../api/kotlinx-coroutines-test.api | 7 +- .../api/kotlinx-coroutines-test.klib.api | 11 +- .../common/src/TestCoroutineDispatchers.kt | 3 +- .../common/src/TestCoroutineScheduler.kt | 58 ++++--- .../common/src/TestDispatcher.kt | 10 +- .../common/src/internal/TestMainDispatcher.kt | 9 +- .../src/migration/TestCoroutineDispatcher.kt | 3 +- .../migration/TestRunBlockingOrderTest.kt | 2 +- .../api/kotlinx-coroutines-reactor.api | 6 +- .../src/Scheduler.kt | 9 +- .../api/kotlinx-coroutines-rx2.api | 6 +- .../kotlinx-coroutines-rx2/src/RxScheduler.kt | 21 +-- .../api/kotlinx-coroutines-rx3.api | 6 +- .../kotlinx-coroutines-rx3/src/RxScheduler.kt | 21 +-- .../test/ordered/tests/TestComponent.kt | 2 +- .../api/kotlinx-coroutines-android.api | 4 +- .../src/HandlerDispatcher.kt | 13 +- .../test/DisabledHandlerTest.kt | 2 +- .../api/kotlinx-coroutines-javafx.api | 6 +- .../src/JavaFxDispatcher.kt | 16 +- .../api/kotlinx-coroutines-swing.api | 6 +- .../src/SwingDispatcher.kt | 14 +- 56 files changed, 461 insertions(+), 450 deletions(-) diff --git a/docs/topics/cancellation-and-timeouts.md b/docs/topics/cancellation-and-timeouts.md index c574e12624..18a377e5f1 100644 --- a/docs/topics/cancellation-and-timeouts.md +++ b/docs/topics/cancellation-and-timeouts.md @@ -321,7 +321,7 @@ It produces the following output: I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ... -Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms +Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1.3s ``` diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 91cccb7bd1..5249b11a18 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -297,20 +297,21 @@ public final class kotlinx/coroutines/Deferred$DefaultImpls { } public abstract interface class kotlinx/coroutines/Delay { - public abstract fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public abstract fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public abstract fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public abstract fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V + public abstract fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; } public final class kotlinx/coroutines/Delay$DefaultImpls { - public static fun delay (Lkotlinx/coroutines/Delay;JLkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public static fun invokeOnTimeout-KLykuaI (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public static fun timeoutMessage-LRDsOJo (Lkotlinx/coroutines/Delay;J)Ljava/lang/String; } public final class kotlinx/coroutines/DelayKt { public static final fun awaitCancellation (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun delay-VtjQ1oo (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun toDelayMillis-LRDsOJo (J)J } public abstract interface annotation class kotlinx/coroutines/DelicateCoroutinesApi : java/lang/annotation/Annotation { diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api index 4cdf5983e7..48104ec82f 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api @@ -289,9 +289,9 @@ abstract interface kotlinx.coroutines/CoroutineScope { // kotlinx.coroutines/Cor } abstract interface kotlinx.coroutines/Delay { // kotlinx.coroutines/Delay|null[0] - abstract fun scheduleResumeAfterDelay(kotlin/Long, kotlinx.coroutines/CancellableContinuation) // kotlinx.coroutines/Delay.scheduleResumeAfterDelay|scheduleResumeAfterDelay(kotlin.Long;kotlinx.coroutines.CancellableContinuation){}[0] - open fun invokeOnTimeout(kotlin/Long, kotlinx.coroutines/Runnable, kotlin.coroutines/CoroutineContext): kotlinx.coroutines/DisposableHandle // kotlinx.coroutines/Delay.invokeOnTimeout|invokeOnTimeout(kotlin.Long;kotlinx.coroutines.Runnable;kotlin.coroutines.CoroutineContext){}[0] - open suspend fun delay(kotlin/Long) // kotlinx.coroutines/Delay.delay|delay(kotlin.Long){}[0] + abstract fun scheduleResumeAfterDelay(kotlin.time/Duration, kotlinx.coroutines/CancellableContinuation) // kotlinx.coroutines/Delay.scheduleResumeAfterDelay|scheduleResumeAfterDelay(kotlin.time.Duration;kotlinx.coroutines.CancellableContinuation){}[0] + open fun invokeOnTimeout(kotlin.time/Duration, kotlinx.coroutines/Runnable, kotlin.coroutines/CoroutineContext): kotlinx.coroutines/DisposableHandle // kotlinx.coroutines/Delay.invokeOnTimeout|invokeOnTimeout(kotlin.time.Duration;kotlinx.coroutines.Runnable;kotlin.coroutines.CoroutineContext){}[0] + open fun timeoutMessage(kotlin.time/Duration): kotlin/String // kotlinx.coroutines/Delay.timeoutMessage|timeoutMessage(kotlin.time.Duration){}[0] } abstract interface kotlinx.coroutines/Job : kotlin.coroutines/CoroutineContext.Element { // kotlinx.coroutines/Job|null[0] @@ -758,6 +758,7 @@ final fun (kotlin.coroutines/CoroutineContext).kotlinx.coroutines/ensureActive() final fun (kotlin.coroutines/CoroutineContext).kotlinx.coroutines/newCoroutineContext(kotlin.coroutines/CoroutineContext): kotlin.coroutines/CoroutineContext // kotlinx.coroutines/newCoroutineContext|newCoroutineContext@kotlin.coroutines.CoroutineContext(kotlin.coroutines.CoroutineContext){}[0] final fun (kotlin.ranges/IntRange).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow // kotlinx.coroutines.flow/asFlow|asFlow@kotlin.ranges.IntRange(){}[0] final fun (kotlin.ranges/LongRange).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow // kotlinx.coroutines.flow/asFlow|asFlow@kotlin.ranges.LongRange(){}[0] +final fun (kotlin.time/Duration).kotlinx.coroutines/toDelayMillis(): kotlin/Long // kotlinx.coroutines/toDelayMillis|toDelayMillis@kotlin.time.Duration(){}[0] final fun (kotlin/IntArray).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow // kotlinx.coroutines.flow/asFlow|asFlow@kotlin.IntArray(){}[0] final fun (kotlin/LongArray).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow // kotlinx.coroutines.flow/asFlow|asFlow@kotlin.LongArray(){}[0] final fun (kotlinx.coroutines.channels/ReceiveChannel<*>).kotlinx.coroutines.channels/cancelConsumed(kotlin/Throwable?) // kotlinx.coroutines.channels/cancelConsumed|cancelConsumed@kotlinx.coroutines.channels.ReceiveChannel<*>(kotlin.Throwable?){}[0] diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt index 67d3d16bb1..845c642de5 100644 --- a/kotlinx-coroutines-core/common/src/Delay.kt +++ b/kotlinx-coroutines-core/common/src/Delay.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.selects.* import kotlin.coroutines.* import kotlin.time.* import kotlin.time.Duration.Companion.nanoseconds +import kotlin.time.Duration.Companion.milliseconds /** * This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that natively support @@ -16,19 +17,8 @@ import kotlin.time.Duration.Companion.nanoseconds */ @InternalCoroutinesApi public interface Delay { - - /** @suppress **/ - @Deprecated( - message = "Deprecated without replacement as an internal method never intended for public use", - level = DeprecationLevel.ERROR - ) // Error since 1.6.0 - public suspend fun delay(time: Long) { - if (time <= 0) return // don't delay - return suspendCancellableCoroutine { scheduleResumeAfterDelay(time, it) } - } - /** - * Schedules resume of a specified [continuation] after a specified delay [timeMillis]. + * Schedules resume of a specified [continuation] after a specified delay [time]. * * Continuation **must be scheduled** to resume even if it is already cancelled, because a cancellation is just * an exception that the coroutine that used `delay` might wanted to catch and process. It might @@ -42,28 +32,20 @@ public interface Delay { * with(continuation) { resumeUndispatchedWith(Unit) } * ``` */ - public fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) + public fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) /** - * Schedules invocation of a specified [block] after a specified delay [timeMillis]. + * Schedules invocation of a specified [block] after a specified delay [timeout]. * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] of this invocation * request if it is not needed anymore. */ - public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - DefaultDelay.invokeOnTimeout(timeMillis, block, context) -} + public fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + DefaultDelay.invokeOnTimeout(timeout, block, context) -/** - * Enhanced [Delay] interface that provides additional diagnostics for [withTimeout]. - * Is going to be removed once there is proper JVM-default support. - * Then we'll be able put this function into [Delay] without breaking binary compatibility. - */ -@InternalCoroutinesApi -internal interface DelayWithTimeoutDiagnostics : Delay { /** * Returns a string that explains that the timeout has occurred, and explains what can be done about it. */ - fun timeoutMessage(timeout: Duration): String + fun timeoutMessage(timeout: Duration): String = "Timed out waiting for $timeout" } /** @@ -103,8 +85,8 @@ internal interface DelayWithTimeoutDiagnostics : Delay { public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {} /** - * Delays coroutine for at least the given time without blocking a thread and resumes it after a specified time. - * If the given [timeMillis] is non-positive, this function returns immediately. + * Delays coroutine for at least the given [duration] without blocking a thread and resumes it after the specified time. + * If the given [duration] is non-positive, this function returns immediately. * * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this * suspending function is waiting, this function immediately resumes with [CancellationException]. @@ -116,21 +98,20 @@ public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {} * Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause. * * Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context. - * @param timeMillis time in milliseconds. */ -public suspend fun delay(timeMillis: Long) { - if (timeMillis <= 0) return // don't delay +public suspend fun delay(duration: Duration) { + if (duration <= Duration.ZERO) return // don't delay return suspendCancellableCoroutine sc@ { cont: CancellableContinuation -> - // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule. - if (timeMillis < Long.MAX_VALUE) { - cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont) + // instead of actually waiting for an infinite time, just wait forever like awaitCancellation, don't schedule. + if (duration.isFinite()) { + cont.context.delay.scheduleResumeAfterDelay(duration, cont) } } } /** - * Delays coroutine for at least the given [duration] without blocking a thread and resumes it after the specified time. - * If the given [duration] is non-positive, this function returns immediately. + * Delays coroutine for at least the given time without blocking a thread and resumes it after a specified time. + * If the given [timeMillis] is non-positive, this function returns immediately. * * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this * suspending function is waiting, this function immediately resumes with [CancellationException]. @@ -142,8 +123,11 @@ public suspend fun delay(timeMillis: Long) { * Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause. * * Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context. + * @param timeMillis time in milliseconds. */ -public suspend fun delay(duration: Duration): Unit = delay(duration.toDelayMillis()) +public suspend fun delay(timeMillis: Long) { + delay(timeMillis.milliseconds) +} /** Returns [Delay] implementation of the given context */ internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay @@ -152,6 +136,7 @@ internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) * Convert this duration to its millisecond value. Durations which have a nanosecond component less than * a single millisecond will be rounded up to the next largest millisecond. */ +@PublishedApi internal fun Duration.toDelayMillis(): Long = when (isPositive()) { true -> plus(999_999L.nanoseconds).inWholeMilliseconds false -> 0L diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 84291a1b69..da55f940b6 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.internal.* import kotlin.concurrent.Volatile import kotlin.coroutines.* import kotlin.jvm.* +import kotlin.time.Duration /** * Extended by [CoroutineDispatcher] implementations that have event loop inside and can @@ -144,24 +145,12 @@ private const val SCHEDULE_OK = 0 private const val SCHEDULE_COMPLETED = 1 private const val SCHEDULE_DISPOSED = 2 -private const val MS_TO_NS = 1_000_000L -private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS - /** * First-line overflow protection -- limit maximal delay. * Delays longer than this one (~146 years) are considered to be delayed "forever". */ private const val MAX_DELAY_NS = Long.MAX_VALUE / 2 -internal fun delayToNanos(timeMillis: Long): Long = when { - timeMillis <= 0 -> 0L - timeMillis >= MAX_MS -> Long.MAX_VALUE - else -> timeMillis * MS_TO_NS -} - -internal fun delayNanosToMillis(timeNanos: Long): Long = - timeNanos / MS_TO_NS - private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY") private typealias Queue = LockFreeTaskQueueCore @@ -224,8 +213,8 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { rescheduleAllDelayed() } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val timeNanos = delayToNanos(timeMillis) + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + val timeNanos = time.inWholeNanoseconds if (timeNanos < MAX_DELAY_NS) { val now = nanoTime() DelayedResumeTask(now + timeNanos, continuation).also { task -> @@ -240,8 +229,8 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { } } - protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { - val timeNanos = delayToNanos(timeMillis) + protected fun scheduleInvokeOnTimeout(timeout: Duration, block: Runnable): DisposableHandle { + val timeNanos = timeout.inWholeNanoseconds return if (timeNanos < MAX_DELAY_NS) { val now = nanoTime() DelayedRunnableTask(now + timeNanos, block).also { task -> diff --git a/kotlinx-coroutines-core/common/src/Timeout.kt b/kotlinx-coroutines-core/common/src/Timeout.kt index c63136575b..a96a14b297 100644 --- a/kotlinx-coroutines-core/common/src/Timeout.kt +++ b/kotlinx-coroutines-core/common/src/Timeout.kt @@ -13,9 +13,9 @@ import kotlin.time.* import kotlin.time.Duration.Companion.milliseconds /** - * Runs a given suspending [block] of code inside a coroutine with a specified [timeout][timeMillis] and throws + * Runs a given suspending [block] of code inside a coroutine with the specified [timeout] and throws * a [TimeoutCancellationException] if the timeout was exceeded. - * If the given [timeMillis] is non-positive, [TimeoutCancellationException] is thrown immediately. + * If the given [timeout] is non-positive, [TimeoutCancellationException] is thrown immediately. * * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of * the cancellable suspending function inside the block throws a [TimeoutCancellationException]. @@ -31,23 +31,21 @@ import kotlin.time.Duration.Companion.milliseconds * section of the coroutines guide for details. * * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. - * - * @param timeMillis timeout time in milliseconds. */ -public suspend fun withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T { +public suspend fun withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } - if (timeMillis <= 0L) throw TimeoutCancellationException("Timed out immediately") + if (timeout <= Duration.ZERO) throw TimeoutCancellationException("Timed out immediately") return suspendCoroutineUninterceptedOrReturn { uCont -> - setupTimeout(TimeoutCoroutine(timeMillis, uCont), block) + setupTimeout(TimeoutCoroutine(timeout, uCont), block) } } /** - * Runs a given suspending [block] of code inside a coroutine with the specified [timeout] and throws + * Runs a given suspending [block] of code inside a coroutine with a specified [timeout][timeMillis] and throws * a [TimeoutCancellationException] if the timeout was exceeded. - * If the given [timeout] is non-positive, [TimeoutCancellationException] is thrown immediately. + * If the given [timeMillis] is non-positive, [TimeoutCancellationException] is thrown immediately. * * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of * the cancellable suspending function inside the block throws a [TimeoutCancellationException]. @@ -63,18 +61,20 @@ public suspend fun withTimeout(timeMillis: Long, block: suspend CoroutineSco * section of the coroutines guide for details. * * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. + * + * @param timeMillis timeout time in milliseconds. */ -public suspend fun withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T { +public suspend fun withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } - return withTimeout(timeout.toDelayMillis(), block) + return withTimeout(timeMillis.milliseconds, block) } /** - * Runs a given suspending block of code inside a coroutine with a specified [timeout][timeMillis] and returns + * Runs a given suspending block of code inside a coroutine with the specified [timeout] and returns * `null` if this timeout was exceeded. - * If the given [timeMillis] is non-positive, `null` is returned immediately. + * If the given [timeout] is non-positive, `null` is returned immediately. * * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of * cancellable suspending function inside the block throws a [TimeoutCancellationException]. @@ -90,16 +90,14 @@ public suspend fun withTimeout(timeout: Duration, block: suspend CoroutineSc * section of the coroutines guide for details. * * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. - * - * @param timeMillis timeout time in milliseconds. */ -public suspend fun withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T? { - if (timeMillis <= 0L) return null +public suspend fun withTimeoutOrNull(timeout: Duration, block: suspend CoroutineScope.() -> T): T? { + if (timeout <= Duration.ZERO) return null var coroutine: TimeoutCoroutine? = null try { return suspendCoroutineUninterceptedOrReturn { uCont -> - val timeoutCoroutine = TimeoutCoroutine(timeMillis, uCont) + val timeoutCoroutine = TimeoutCoroutine(timeout, uCont) coroutine = timeoutCoroutine setupTimeout(timeoutCoroutine, block) } @@ -113,9 +111,9 @@ public suspend fun withTimeoutOrNull(timeMillis: Long, block: suspend Corout } /** - * Runs a given suspending block of code inside a coroutine with the specified [timeout] and returns + * Runs a given suspending block of code inside a coroutine with a specified [timeout][timeMillis] and returns * `null` if this timeout was exceeded. - * If the given [timeout] is non-positive, `null` is returned immediately. + * If the given [timeMillis] is non-positive, `null` is returned immediately. * * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of * cancellable suspending function inside the block throws a [TimeoutCancellationException]. @@ -131,9 +129,11 @@ public suspend fun withTimeoutOrNull(timeMillis: Long, block: suspend Corout * section of the coroutines guide for details. * * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. + * + * @param timeMillis timeout time in milliseconds. */ -public suspend fun withTimeoutOrNull(timeout: Duration, block: suspend CoroutineScope.() -> T): T? = - withTimeoutOrNull(timeout.toDelayMillis(), block) +public suspend fun withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T? = + withTimeoutOrNull(timeMillis.milliseconds, block) private fun setupTimeout( coroutine: TimeoutCoroutine, @@ -149,11 +149,11 @@ private fun setupTimeout( } private class TimeoutCoroutine( - @JvmField val time: Long, + val time: Duration, uCont: Continuation // unintercepted continuation ) : ScopeCoroutine(uCont.context, uCont), Runnable { override fun run() { - cancelCoroutine(TimeoutCancellationException(time, context.delay, this)) + cancelCoroutine(TimeoutCancellationException(context.delay.timeoutMessage(time), this)) } override fun nameString(): String = @@ -162,6 +162,18 @@ private class TimeoutCoroutine( /** * This exception is thrown by [withTimeout] to indicate timeout. + * + * **Pitfall**: This exception is an instance of [CancellationException] and inherits its behavior. + * In particular, if this exception is not caught, it cancels the coroutine it's thrown from. + * ``` + * // This coroutine will simply be cancelled, without any errors being printed + * launch { + * withTimeout(1.seconds) { + * delay(100.seconds) + * } // will throw TimeoutCancellationException + * error("Will not be printed") + * } + * ``` */ public class TimeoutCancellationException internal constructor( message: String, @@ -177,13 +189,3 @@ public class TimeoutCancellationException internal constructor( override fun createCopy(): TimeoutCancellationException = TimeoutCancellationException(message ?: "", coroutine).also { it.initCause(this) } } - -internal fun TimeoutCancellationException( - time: Long, - delay: Delay, - coroutine: Job -) : TimeoutCancellationException { - val message = (delay as? DelayWithTimeoutDiagnostics)?.timeoutMessage(time.milliseconds) - ?: "Timed out waiting for $time ms" - return TimeoutCancellationException(message, coroutine) -} diff --git a/kotlinx-coroutines-core/common/src/Unconfined.kt b/kotlinx-coroutines-core/common/src/Unconfined.kt index 2e16f951b8..d5def2082a 100644 --- a/kotlinx-coroutines-core/common/src/Unconfined.kt +++ b/kotlinx-coroutines-core/common/src/Unconfined.kt @@ -33,7 +33,7 @@ internal object Unconfined : CoroutineDispatcher() { /** * Used to detect calls to [Unconfined.dispatch] from [yield] function. */ -@PublishedApi +@PublishedApi // for `kotlinx-coroutines-test` internal class YieldContext : AbstractCoroutineContextElement(Key) { companion object Key : CoroutineContext.Key diff --git a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt index b9b73603c4..d1abe2df26 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt @@ -3,6 +3,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.internal.IgnoreJreRequirement import kotlin.time.* +import kotlin.time.Duration.Companion.milliseconds /** * A command emitted by [SharingStarted] implementations to control the sharing coroutine in @@ -102,7 +103,7 @@ public fun interface SharingStarted { stopTimeoutMillis: Long = 0, replayExpirationMillis: Long = Long.MAX_VALUE ): SharingStarted = - StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis) + StartedWhileSubscribed(stopTimeoutMillis.milliseconds, replayExpirationMillis.milliseconds) } /** @@ -135,7 +136,7 @@ public fun SharingStarted.Companion.WhileSubscribed( stopTimeout: Duration = Duration.ZERO, replayExpiration: Duration = Duration.INFINITE ): SharingStarted = - StartedWhileSubscribed(stopTimeout.inWholeMilliseconds, replayExpiration.inWholeMilliseconds) + StartedWhileSubscribed(stopTimeout, replayExpiration) // -------------------------------- implementation -------------------------------- @@ -160,12 +161,12 @@ private class StartedLazily : SharingStarted { } private class StartedWhileSubscribed( - private val stopTimeout: Long, - private val replayExpiration: Long + private val stopTimeout: Duration, + private val replayExpiration: Duration, ) : SharingStarted { init { - require(stopTimeout >= 0) { "stopTimeout($stopTimeout ms) cannot be negative" } - require(replayExpiration >= 0) { "replayExpiration($replayExpiration ms) cannot be negative" } + require(stopTimeout >= Duration.ZERO) { "stopTimeout($stopTimeout) cannot be negative" } + require(replayExpiration >= Duration.ZERO) { "replayExpiration($replayExpiration) cannot be negative" } } override fun command(subscriptionCount: StateFlow): Flow = subscriptionCount @@ -174,7 +175,7 @@ private class StartedWhileSubscribed( emit(SharingCommand.START) } else { delay(stopTimeout) - if (replayExpiration > 0) { + if (replayExpiration > Duration.ZERO) { emit(SharingCommand.STOP) delay(replayExpiration) } @@ -187,8 +188,8 @@ private class StartedWhileSubscribed( @OptIn(ExperimentalStdlibApi::class) override fun toString(): String { val params = buildList(2) { - if (stopTimeout > 0) add("stopTimeout=${stopTimeout}ms") - if (replayExpiration < Long.MAX_VALUE) add("replayExpiration=${replayExpiration}ms") + if (stopTimeout > Duration.ZERO) add("stopTimeout=${stopTimeout}ms") + if (replayExpiration < Duration.INFINITE) add("replayExpiration=${replayExpiration}ms") } return "SharingStarted.WhileSubscribed(${params.joinToString()})" } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 2a701c0c12..b832f6feb5 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.selects.* import kotlin.jvm.* import kotlin.time.* +import kotlin.time.Duration.Companion.milliseconds /* Scaffolding for Knit code examples @@ -27,7 +28,7 @@ fun main() = runBlocking { /** * Returns a flow that mirrors the original flow, but filters out values - * that are followed by the newer values within the given [timeout][timeoutMillis]. + * that are followed by the newer values within the given [timeout]. * The latest value is always emitted. * * Example: @@ -35,17 +36,17 @@ fun main() = runBlocking { * ```kotlin * flow { * emit(1) - * delay(90) + * delay(90.milliseconds) * emit(2) - * delay(90) + * delay(90.milliseconds) * emit(3) - * delay(1010) + * delay(1010.milliseconds) * emit(4) - * delay(1010) + * delay(1010.milliseconds) * emit(5) - * }.debounce(1000) + * }.debounce(1000.milliseconds) * ``` - * + * * * produces the following emissions * @@ -55,18 +56,18 @@ fun main() = runBlocking { * * * Note that the resulting flow does not emit anything as long as the original flow emits - * items faster than every [timeoutMillis] milliseconds. + * items faster than every [timeout] milliseconds. */ @FlowPreview -public fun Flow.debounce(timeoutMillis: Long): Flow { - require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" } - if (timeoutMillis == 0L) return this - return debounceInternal { timeoutMillis } +public fun Flow.debounce(timeout: Duration): Flow { + require(!timeout.isNegative()) { "Debounce timeout should not be negative" } + if (timeout == Duration.ZERO) return this + return debounceInternal { timeout } } /** * Returns a flow that mirrors the original flow, but filters out values - * that are followed by the newer values within the given [timeout][timeoutMillis]. + * that are followed by the newer values within the given [timeout]. * The latest value is always emitted. * * A variation of [debounce] that allows specifying the timeout value dynamically. @@ -76,23 +77,23 @@ public fun Flow.debounce(timeoutMillis: Long): Flow { * ```kotlin * flow { * emit(1) - * delay(90) + * delay(90.milliseconds) * emit(2) - * delay(90) + * delay(90.milliseconds) * emit(3) - * delay(1010) + * delay(1010.milliseconds) * emit(4) - * delay(1010) + * delay(1010.milliseconds) * emit(5) * }.debounce { * if (it == 1) { - * 0L + * 0.milliseconds * } else { - * 1000L + * 1000.milliseconds * } * } * ``` - * + * * * produces the following emissions * @@ -102,18 +103,19 @@ public fun Flow.debounce(timeoutMillis: Long): Flow { * * * Note that the resulting flow does not emit anything as long as the original flow emits - * items faster than every [timeoutMillis] milliseconds. + * items faster than every [timeout] unit. * - * @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds. + * @param timeout [T] is the emitted value and the return value is timeout in [Duration]. */ @FlowPreview +@JvmName("debounceDuration") @OverloadResolutionByLambdaReturnType -public fun Flow.debounce(timeoutMillis: (T) -> Long): Flow = - debounceInternal(timeoutMillis) +public fun Flow.debounce(timeout: (T) -> Duration): Flow = + debounceInternal(timeout) /** * Returns a flow that mirrors the original flow, but filters out values - * that are followed by the newer values within the given [timeout]. + * that are followed by the newer values within the given [timeout][timeoutMillis]. * The latest value is always emitted. * * Example: @@ -121,17 +123,17 @@ public fun Flow.debounce(timeoutMillis: (T) -> Long): Flow = * ```kotlin * flow { * emit(1) - * delay(90.milliseconds) + * delay(90) * emit(2) - * delay(90.milliseconds) + * delay(90) * emit(3) - * delay(1010.milliseconds) + * delay(1010) * emit(4) - * delay(1010.milliseconds) + * delay(1010) * emit(5) - * }.debounce(1000.milliseconds) + * }.debounce(1000) * ``` - * + * * * produces the following emissions * @@ -141,15 +143,15 @@ public fun Flow.debounce(timeoutMillis: (T) -> Long): Flow = * * * Note that the resulting flow does not emit anything as long as the original flow emits - * items faster than every [timeout] milliseconds. + * items faster than every [timeoutMillis] milliseconds. */ @FlowPreview -public fun Flow.debounce(timeout: Duration): Flow = - debounce(timeout.toDelayMillis()) +public fun Flow.debounce(timeoutMillis: Long): Flow = + debounce(timeoutMillis.milliseconds) /** * Returns a flow that mirrors the original flow, but filters out values - * that are followed by the newer values within the given [timeout]. + * that are followed by the newer values within the given [timeout][timeoutMillis]. * The latest value is always emitted. * * A variation of [debounce] that allows specifying the timeout value dynamically. @@ -159,23 +161,23 @@ public fun Flow.debounce(timeout: Duration): Flow = * ```kotlin * flow { * emit(1) - * delay(90.milliseconds) + * delay(90) * emit(2) - * delay(90.milliseconds) + * delay(90) * emit(3) - * delay(1010.milliseconds) + * delay(1010) * emit(4) - * delay(1010.milliseconds) + * delay(1010) * emit(5) * }.debounce { * if (it == 1) { - * 0.milliseconds + * 0L * } else { - * 1000.milliseconds + * 1000L * } * } * ``` - * + * * * produces the following emissions * @@ -185,19 +187,16 @@ public fun Flow.debounce(timeout: Duration): Flow = * * * Note that the resulting flow does not emit anything as long as the original flow emits - * items faster than every [timeout] unit. + * items faster than every [timeoutMillis] milliseconds. * - * @param timeout [T] is the emitted value and the return value is timeout in [Duration]. + * @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds. */ @FlowPreview -@JvmName("debounceDuration") @OverloadResolutionByLambdaReturnType -public fun Flow.debounce(timeout: (T) -> Duration): Flow = - debounceInternal { emittedItem -> - timeout(emittedItem).toDelayMillis() - } +public fun Flow.debounce(timeoutMillis: (T) -> Long): Flow = + debounceInternal { emittedItem -> timeoutMillis(emittedItem).milliseconds } -private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long): Flow = +private fun Flow.debounceInternal(timeoutSelector: (T) -> Duration): Flow = scopedFlow { downstream -> // Produce the values using the default (rendezvous) channel val values = produce { @@ -206,23 +205,23 @@ private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl // Now consume the values var lastValue: Any? = null while (lastValue !== DONE) { - var timeoutMillis = 0L // will be always computed when lastValue != null + var timeout = Duration.ZERO // will be always computed when lastValue != null // Compute timeout for this value if (lastValue != null) { - timeoutMillis = timeoutMillisSelector(NULL.unbox(lastValue)) - require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" } - if (timeoutMillis == 0L) { + timeout = timeoutSelector(NULL.unbox(lastValue)) + require(!timeout.isNegative()) { "Debounce timeout should not be negative" } + if (timeout == Duration.ZERO) { downstream.emit(NULL.unbox(lastValue)) lastValue = null // Consume the value } } // assert invariant: lastValue != null implies timeoutMillis > 0 - assert { lastValue == null || timeoutMillis > 0 } + assert { lastValue == null || timeout.isPositive() } // wait for the next value with timeout select { // Set timeout when lastValue exists and is not consumed yet if (lastValue != null) { - onTimeout(timeoutMillis) { + onTimeout(timeout) { downstream.emit(NULL.unbox(lastValue)) lastValue = null // Consume the value } @@ -242,7 +241,7 @@ private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl } /** - * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis]. + * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period]. * * Example: * @@ -250,11 +249,11 @@ private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl * flow { * repeat(10) { * emit(it) - * delay(110) + * delay(110.milliseconds) * } - * }.sample(200) + * }.sample(200.milliseconds) * ``` - * + * * * produces the following emissions * @@ -266,14 +265,14 @@ private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl * Note that the latest element is not emitted if it does not fit into the sampling window. */ @FlowPreview -public fun Flow.sample(periodMillis: Long): Flow { - require(periodMillis > 0) { "Sample period should be positive" } +public fun Flow.sample(period: Duration): Flow { + require(period.isPositive()) { "Sample period should be positive" } return scopedFlow { downstream -> val values = produce(capacity = Channel.CONFLATED) { collect { value -> send(value ?: NULL) } } var lastValue: Any? = null - val ticker = fixedPeriodTicker(periodMillis) + val ticker = fixedPeriodTicker(period) while (lastValue !== DONE) { select { values.onReceiveCatching { result -> @@ -301,19 +300,19 @@ public fun Flow.sample(periodMillis: Long): Flow { * TODO this design (and design of the corresponding operator) depends on #540 */ internal fun CoroutineScope.fixedPeriodTicker( - delayMillis: Long, + period: Duration, ): ReceiveChannel { return produce(capacity = 0) { - delay(delayMillis) + delay(period) while (true) { channel.send(Unit) - delay(delayMillis) + delay(period) } } } /** - * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period]. + * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis]. * * Example: * @@ -321,11 +320,11 @@ internal fun CoroutineScope.fixedPeriodTicker( * flow { * repeat(10) { * emit(it) - * delay(110.milliseconds) + * delay(110) * } - * }.sample(200.milliseconds) + * }.sample(200) * ``` - * + * * * produces the following emissions * @@ -337,7 +336,7 @@ internal fun CoroutineScope.fixedPeriodTicker( * Note that the latest element is not emitted if it does not fit into the sampling window. */ @FlowPreview -public fun Flow.sample(period: Duration): Flow = sample(period.toDelayMillis()) +public fun Flow.sample(periodMillis: Long): Flow = sample(periodMillis.milliseconds) /** * Returns a flow that will emit a [TimeoutCancellationException] if the upstream doesn't emit an item within the given time. @@ -391,13 +390,11 @@ private fun Flow.timeoutInternal( val values = buffer(Channel.RENDEZVOUS).produceIn(this) whileSelect { values.onReceiveCatching { value -> - value.onSuccess { + !value.onSuccess { downStream.emit(it) }.onClosed { it?.let { throw it } - return@onReceiveCatching false - } - return@onReceiveCatching true + }.isClosed } onTimeout(timeout) { throw TimeoutCancellationException("Timed out waiting for $timeout") diff --git a/kotlinx-coroutines-core/common/src/selects/OnTimeout.kt b/kotlinx-coroutines-core/common/src/selects/OnTimeout.kt index 449972648d..8b4e50b3c9 100644 --- a/kotlinx-coroutines-core/common/src/selects/OnTimeout.kt +++ b/kotlinx-coroutines-core/common/src/selects/OnTimeout.kt @@ -2,6 +2,7 @@ package kotlinx.coroutines.selects import kotlinx.coroutines.* import kotlin.time.* +import kotlin.time.Duration.Companion.milliseconds /** * Clause that selects the given [block] after a specified timeout passes. @@ -14,7 +15,7 @@ import kotlin.time.* @ExperimentalCoroutinesApi @Suppress("EXTENSION_SHADOWED_BY_MEMBER") public fun SelectBuilder.onTimeout(timeMillis: Long, block: suspend () -> R): Unit = - OnTimeout(timeMillis).selectClause.invoke(block) + onTimeout(timeMillis.milliseconds, block) /** * Clause that selects the given [block] after the specified [timeout] passes. @@ -24,15 +25,15 @@ public fun SelectBuilder.onTimeout(timeMillis: Long, block: suspend () -> */ @ExperimentalCoroutinesApi public fun SelectBuilder.onTimeout(timeout: Duration, block: suspend () -> R): Unit = - onTimeout(timeout.toDelayMillis(), block) + OnTimeout(timeout).selectClause.invoke(block) /** * We implement [SelectBuilder.onTimeout] as a clause, so each invocation creates * an instance of [OnTimeout] that specifies the registration part according to - * the [timeout][timeMillis] parameter. + * the [timeout] parameter. */ private class OnTimeout( - private val timeMillis: Long + private val timeout: Duration ) { @Suppress("UNCHECKED_CAST") val selectClause: SelectClause0 @@ -44,7 +45,7 @@ private class OnTimeout( @Suppress("UNUSED_PARAMETER") private fun register(select: SelectInstance<*>, ignoredParam: Any?) { // Should this clause complete immediately? - if (timeMillis <= 0) { + if (timeout <= Duration.ZERO) { select.selectInRegistrationPhase(Unit) return } @@ -54,7 +55,7 @@ private class OnTimeout( } select as SelectImplementation<*> val context = select.context - val disposableHandle = context.delay.invokeOnTimeout(timeMillis, action, context) + val disposableHandle = context.delay.invokeOnTimeout(timeout, action, context) // Do not forget to clean-up when this `select` is completed or cancelled. select.disposeOnCompletion(disposableHandle) } diff --git a/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt b/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt index 855b00f2c7..e7f9fdd330 100644 --- a/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt +++ b/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt @@ -123,7 +123,7 @@ class WithTimeoutDurationTest : TestBase() { "OK" } } catch (e: CancellationException) { - assertEquals("Timed out waiting for 100 ms", e.message) + assertEquals("Timed out waiting for 100ms", e.message) finish(3) } } diff --git a/kotlinx-coroutines-core/common/test/WithTimeoutTest.kt b/kotlinx-coroutines-core/common/test/WithTimeoutTest.kt index 5f2690c198..5e00e31327 100644 --- a/kotlinx-coroutines-core/common/test/WithTimeoutTest.kt +++ b/kotlinx-coroutines-core/common/test/WithTimeoutTest.kt @@ -115,7 +115,7 @@ class WithTimeoutTest : TestBase() { "OK" } } catch (e: CancellationException) { - assertEquals("Timed out waiting for 100 ms", e.message) + assertEquals("Timed out waiting for 100ms", e.message) finish(3) } } diff --git a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt index 771768e008..6dc03217c7 100644 --- a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt +++ b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt @@ -1,15 +1,25 @@ package kotlinx.coroutines +import kotlinx.atomicfu.atomic +import kotlinx.coroutines.internal.ThreadSafeHeap +import kotlinx.coroutines.internal.ThreadSafeHeapNode import kotlinx.coroutines.testing.* import kotlin.coroutines.* import kotlin.jvm.* +import kotlin.time.* +import kotlin.time.Duration.Companion.nanoseconds internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineDispatcher(), Delay { private val originalDispatcher = enclosingScope.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher - private val heap = ArrayList() // TODO use MPP heap/ordered set implementation (commonize ThreadSafeHeap) + private val heap = ThreadSafeHeap() - var currentTime = 0L - private set + /** This counter establishes some order on the events that happen at the same virtual time. */ + private val count = atomic(0) + + private val timeSource = TestTimeSource() + + val currentTime: ComparableTimeMark + get() = timeSource.markNow() init { /* @@ -24,22 +34,20 @@ internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : Coroutine if (delayNanos <= 0) continue if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) { if (usesSharedEventLoop) { - val targetTime = currentTime + delayNanos + val targetTime = currentTime + delayNanos.nanoseconds while (currentTime < targetTime) { - val nextTask = heap.minByOrNull { it.deadline } ?: break - if (nextTask.deadline > targetTime) break - heap.remove(nextTask) - currentTime = nextTask.deadline + val nextTask = heap.removeFirstIf { it.deadline <= targetTime } ?: break + timeSource += nextTask.deadline - currentTime nextTask.run() } - currentTime = maxOf(currentTime, targetTime) + if (targetTime > currentTime) timeSource += targetTime - currentTime } else { error("Unexpected external delay: $delayNanos") } } - val nextTask = heap.minByOrNull { it.deadline } ?: return@launch + val nextTask = heap.removeFirstOrNull() ?: return@launch heap.remove(nextTask) - currentTime = nextTask.deadline + timeSource += nextTask.deadline - currentTime nextTask.run() } } @@ -47,11 +55,17 @@ internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : Coroutine private inner class TimedTask( private val runnable: Runnable, - @JvmField val deadline: Long - ) : DisposableHandle, Runnable by runnable { + val deadline: ComparableTimeMark, + @JvmField val count: Int, + ) : DisposableHandle, Runnable by runnable, Comparable, ThreadSafeHeapNode { + override var heap: ThreadSafeHeap<*>? = null + override var index: Int = 0 + + override fun compareTo(other: TimedTask): Int = + compareValuesBy(this, other, TimedTask::deadline, TimedTask::count) override fun dispose() { - heap.remove(this) + this@VirtualTimeDispatcher.heap.remove(this) } } @@ -61,20 +75,18 @@ internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : Coroutine override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context) - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val task = TimedTask(block, deadline(timeMillis)) - heap += task - return task - } + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + schedule(timeout, block) - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val task = TimedTask(Runnable { with(continuation) { resumeUndispatched(Unit) } }, deadline(timeMillis)) - heap += task - continuation.invokeOnCancellation { task.dispose() } + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + schedule(time, Runnable { + with(continuation) { resumeUndispatched(Unit) } + }).also { continuation.disposeOnCancellation(it) } } - private fun deadline(timeMillis: Long) = - if (timeMillis == Long.MAX_VALUE) Long.MAX_VALUE else currentTime + timeMillis + private fun schedule(time: Duration, block: Runnable): DisposableHandle = + TimedTask(block, currentTime + time, count.getAndIncrement()).also { heap.addLast(it) } + } /** diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/SharingStartedTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/SharingStartedTest.kt index 09450a1c50..67a17eee84 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/SharingStartedTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/SharingStartedTest.kt @@ -4,6 +4,8 @@ import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlin.coroutines.* import kotlin.test.* +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds /** * Functional tests for [SharingStarted] using [withVirtualTime] and a DSL to describe @@ -55,7 +57,7 @@ class SharingStartedTest : TestBase() { subscriptions(1) // resubscribe again rampUpAndDown() subscriptions(0) - afterTime(100, SharingCommand.STOP) + afterTime(100.milliseconds, SharingCommand.STOP) delay(100) } @@ -69,7 +71,7 @@ class SharingStartedTest : TestBase() { subscriptions(1, SharingCommand.START) rampUpAndDown() subscriptions(0, SharingCommand.STOP) - afterTime(200, SharingCommand.STOP_AND_RESET_REPLAY_CACHE) + afterTime(200.milliseconds, SharingCommand.STOP_AND_RESET_REPLAY_CACHE) } @Test @@ -82,13 +84,13 @@ class SharingStartedTest : TestBase() { subscriptions(1) rampUpAndDown() subscriptions(0) - afterTime(400, SharingCommand.STOP) + afterTime(400.milliseconds, SharingCommand.STOP) delay(250) // don't give it time to reset cache subscriptions(1, SharingCommand.START) rampUpAndDown() subscriptions(0) - afterTime(400, SharingCommand.STOP) - afterTime(300, SharingCommand.STOP_AND_RESET_REPLAY_CACHE) + afterTime(400.milliseconds, SharingCommand.STOP) + afterTime(300.milliseconds, SharingCommand.STOP_AND_RESET_REPLAY_CACHE) delay(100) } @@ -129,9 +131,8 @@ class SharingStartedTest : TestBase() { val subscriptionCount = MutableStateFlow(0) var previousCommand: SharingCommand? = null var expectedCommand: SharingCommand? = initialCommand - var expectedTime = 0L - val dispatcher = coroutineContext[ContinuationInterceptor] as VirtualTimeDispatcher + var expectedTime = dispatcher.currentTime val scope = CoroutineScope(coroutineContext + Job()) suspend fun launch() { @@ -153,28 +154,28 @@ class SharingStartedTest : TestBase() { expectedTime = dispatcher.currentTime subscriptionCount.value = count if (command != null) { - afterTime(0, command) + afterTime(Duration.ZERO, command) } else { letItRun() } } - suspend fun afterTime(time: Long = 0, command: SharingCommand) { + suspend fun afterTime(time: Duration, command: SharingCommand) { expectedCommand = command - val remaining = (time - 1).coerceAtLeast(0) // previous letItRun delayed 1ms + val remaining = (time - 1.milliseconds).coerceAtLeast(Duration.ZERO) // previous letItRun delayed 1ms expectedTime += remaining delay(remaining) letItRun() } private suspend fun letItRun() { - delay(1) + delay(1.milliseconds) assertEquals(expectedCommand, previousCommand) // make sure expected command was emitted - expectedTime++ // make one more time tick we've delayed + expectedTime += 1.milliseconds // make one more time tick we've delayed } fun stop() { scope.cancel() } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jdk8/src/time/Time.kt b/kotlinx-coroutines-core/jdk8/src/time/Time.kt index 3bb1d054e6..a604848a21 100644 --- a/kotlinx-coroutines-core/jdk8/src/time/Time.kt +++ b/kotlinx-coroutines-core/jdk8/src/time/Time.kt @@ -8,29 +8,30 @@ import kotlinx.coroutines.selects.* import java.time.* import java.time.temporal.* import kotlin.contracts.* +import kotlin.time.toKotlinDuration /** * "java.time" adapter method for [kotlinx.coroutines.delay]. */ -public suspend fun delay(duration: Duration): Unit = delay(duration.coerceToMillis()) +public suspend fun delay(duration: Duration): Unit = delay(duration.toKotlinDuration()) /** * "java.time" adapter method for [kotlinx.coroutines.flow.debounce]. */ @FlowPreview -public fun Flow.debounce(timeout: Duration): Flow = debounce(timeout.coerceToMillis()) +public fun Flow.debounce(timeout: Duration): Flow = debounce(timeout.toKotlinDuration()) /** * "java.time" adapter method for [kotlinx.coroutines.flow.sample]. */ @FlowPreview -public fun Flow.sample(period: Duration): Flow = sample(period.coerceToMillis()) +public fun Flow.sample(period: Duration): Flow = sample(period.toKotlinDuration()) /** * "java.time" adapter method for [SelectBuilder.onTimeout]. */ public fun SelectBuilder.onTimeout(duration: Duration, block: suspend () -> R): Unit = - onTimeout(duration.coerceToMillis(), block) + onTimeout(duration.toKotlinDuration(), block) /** * "java.time" adapter method for [kotlinx.coroutines.withTimeout]. @@ -39,35 +40,11 @@ public suspend fun withTimeout(duration: Duration, block: suspend CoroutineS contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } - return kotlinx.coroutines.withTimeout(duration.coerceToMillis(), block) + return withTimeout(duration.toKotlinDuration(), block) } /** * "java.time" adapter method for [kotlinx.coroutines.withTimeoutOrNull]. */ public suspend fun withTimeoutOrNull(duration: Duration, block: suspend CoroutineScope.() -> T): T? = - kotlinx.coroutines.withTimeoutOrNull(duration.coerceToMillis(), block) - -/** - * Coerces the given [Duration] to a millisecond delay. - * Negative values are coerced to zero, values that cannot - * be represented in milliseconds as long ("infinite" duration) are coerced to [Long.MAX_VALUE] - * and durations lesser than a millisecond are coerced to 1 millisecond. - * - * The rationale of coercion: - * 1) Too large durations typically indicate infinity and Long.MAX_VALUE is the - * best approximation of infinity we can provide. - * 2) Coercing too small durations to 1 instead of 0 is crucial for two patterns: - * - Programming with deadlines and delays - * - Non-suspending fast-paths (e.g. `withTimeout(1 nanosecond) { 42 }` should not throw) - */ -private fun Duration.coerceToMillis(): Long { - if (this <= Duration.ZERO) return 0 - if (this <= ChronoUnit.MILLIS.duration) return 1 - - // Maximum scalar values of Duration.ofMillis(Long.MAX_VALUE) - val maxSeconds = 9223372036854775 - val maxNanos = 807000000 - return if (seconds < maxSeconds || seconds == maxSeconds && nano < maxNanos) toMillis() - else Long.MAX_VALUE -} + withTimeoutOrNull(duration.toKotlinDuration(), block) diff --git a/kotlinx-coroutines-core/jsAndWasmJsShared/src/internal/JSDispatcher.kt b/kotlinx-coroutines-core/jsAndWasmJsShared/src/internal/JSDispatcher.kt index c98a0672c9..5da69764ca 100644 --- a/kotlinx-coroutines-core/jsAndWasmJsShared/src/internal/JSDispatcher.kt +++ b/kotlinx-coroutines-core/jsAndWasmJsShared/src/internal/JSDispatcher.kt @@ -2,6 +2,8 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlin.coroutines.* +import kotlin.time.Duration +import kotlin.time.DurationUnit internal expect abstract class W3CWindow internal expect fun w3cSetTimeout(window: W3CWindow, handler: () -> Unit, timeout: Int): Int @@ -20,11 +22,6 @@ internal expect class WindowMessageQueue(window: W3CWindow) : MessageQueue { override fun reschedule() } -private const val MAX_DELAY = Int.MAX_VALUE.toLong() - -private fun delayToInt(timeMillis: Long): Int = - timeMillis.coerceIn(0, MAX_DELAY).toInt() - internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay { internal val messageQueue = ScheduledMessageQueue(this) @@ -39,13 +36,14 @@ internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay messageQueue.enqueue(block) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val handle = w3cSetTimeout({ block.run() }, delayToInt(timeMillis)) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val handle = w3cSetTimeout({ block.run() }, timeout.toInt(DurationUnit.MILLISECONDS)) return ClearTimeout(handle) } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val handle = w3cSetTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis)) + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + val handle = + w3cSetTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.toInt(DurationUnit.MILLISECONDS)) continuation.invokeOnCancellation(handler = ClearTimeout(handle)) } } @@ -55,13 +53,17 @@ internal class WindowDispatcher(private val window: W3CWindow) : CoroutineDispat override fun dispatch(context: CoroutineContext, block: Runnable) = queue.enqueue(block) - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val handle = w3cSetTimeout(window, { with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis)) + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + val handle = w3cSetTimeout( + window, + { with(continuation) { resumeUndispatched(Unit) } }, + time.toInt(DurationUnit.MILLISECONDS) + ) continuation.invokeOnCancellation(handler = WindowClearTimeout(handle)) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val handle = w3cSetTimeout(window, block::run, delayToInt(timeMillis)) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val handle = w3cSetTimeout(window, block::run, timeout.toInt(DurationUnit.MILLISECONDS)) return WindowClearTimeout(handle) } diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index 3ce7e0d333..e8372089ca 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -3,6 +3,7 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import java.util.concurrent.* import kotlin.coroutines.* +import kotlin.time.Duration private val defaultMainDelayOptIn = systemProp("kotlinx.coroutines.main.delay", false) @@ -91,8 +92,8 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]), * but it's not exposed as public API. */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - scheduleInvokeOnTimeout(timeMillis, block) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + scheduleInvokeOnTimeout(timeout, block) override fun run() { ThreadLocalEventLoop.setEventLoop(this) diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index 15d4ab5c85..75f38ccbad 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -1,7 +1,5 @@ package kotlinx.coroutines -import kotlinx.coroutines.Runnable -import kotlinx.coroutines.scheduling.* import kotlinx.coroutines.scheduling.CoroutineScheduler internal actual abstract class EventLoopImplPlatform: EventLoop() { @@ -122,4 +120,3 @@ internal fun Thread.isIoDispatcherThread(): Boolean { if (this !is CoroutineScheduler.Worker) return false return isIo() } - diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index bdfbe6dbbc..c7eef2f13c 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -5,6 +5,7 @@ import java.io.Closeable import java.util.concurrent.* import kotlin.coroutines.* import kotlin.AutoCloseable +import kotlin.time.Duration /** * [CoroutineDispatcher] that has underlying [Executor] for dispatching tasks. @@ -135,11 +136,11 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) } } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { val future = (executor as? ScheduledExecutorService)?.scheduleBlock( ResumeUndispatchedRunnable(this, continuation), continuation.context, - timeMillis + time ) // If everything went fine and the scheduling attempt was not rejected -- use it if (future != null) { @@ -147,20 +148,20 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) return } // Otherwise fallback to default executor - DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation) + DefaultExecutor.scheduleResumeAfterDelay(time, continuation) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeout) return when { future != null -> DisposableFutureHandle(future) - else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) + else -> DefaultExecutor.invokeOnTimeout(timeout, block, context) } } - private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { + private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, time: Duration): ScheduledFuture<*>? { return try { - schedule(block, timeMillis, TimeUnit.MILLISECONDS) + schedule(block, time.inWholeNanoseconds, TimeUnit.NANOSECONDS) } catch (e: RejectedExecutionException) { cancelJobOnRejection(context, e) null diff --git a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt index c07e68dc4e..f30e2b2f3d 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt @@ -2,6 +2,9 @@ package kotlinx.coroutines.channels import kotlinx.coroutines.* import kotlin.coroutines.* +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.nanoseconds /** * Mode for [ticker] function. @@ -61,24 +64,26 @@ public fun ticker( context: CoroutineContext = EmptyCoroutineContext, mode: TickerMode = TickerMode.FIXED_PERIOD ): ReceiveChannel { - require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" } - require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" } + val delay = delayMillis.milliseconds + val initialDelay = initialDelayMillis.milliseconds + require(delayMillis >= 0) { "Expected non-negative delay, but got $delay" } + require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but got $initialDelay" } return GlobalScope.produce(Dispatchers.Unconfined + context, capacity = 0) { when (mode) { - TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delayMillis, initialDelayMillis, channel) - TickerMode.FIXED_DELAY -> fixedDelayTicker(delayMillis, initialDelayMillis, channel) + TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delay, initialDelay, channel) + TickerMode.FIXED_DELAY -> fixedDelayTicker(delay, initialDelay, channel) } } } private suspend fun fixedPeriodTicker( - delayMillis: Long, - initialDelayMillis: Long, + delay: Duration, + initialDelay: Duration, channel: SendChannel ) { - var deadline = nanoTime() + delayToNanos(initialDelayMillis) - delay(initialDelayMillis) - val delayNs = delayToNanos(delayMillis) + var deadline = nanoTime() + initialDelay.inWholeNanoseconds + delay(initialDelay) + val delayNs = delay.inWholeNanoseconds while (true) { deadline += delayNs channel.send(Unit) @@ -87,21 +92,21 @@ private suspend fun fixedPeriodTicker( if (nextDelay == 0L && delayNs != 0L) { val adjustedDelay = delayNs - (now - deadline) % delayNs deadline = now + adjustedDelay - delay(delayNanosToMillis(adjustedDelay)) + delay(adjustedDelay.nanoseconds) } else { - delay(delayNanosToMillis(nextDelay)) + delay(nextDelay.nanoseconds) } } } private suspend fun fixedDelayTicker( - delayMillis: Long, - initialDelayMillis: Long, + delay: Duration, + initialDelay: Duration, channel: SendChannel ) { - delay(initialDelayMillis) + delay(initialDelay) while (true) { channel.send(Unit) - delay(delayMillis) + delay(delay) } } diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt index 0cace58cf9..1cf273ed99 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt @@ -3,6 +3,7 @@ package kotlinx.coroutines.internal import kotlinx.coroutines.* import java.util.* import kotlin.coroutines.* +import kotlin.time.Duration /** * Name of the boolean property that enables using of [FastServiceLoader]. @@ -94,13 +95,13 @@ private class MissingMainCoroutineDispatcher( override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher = missing() - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = missing() override fun dispatch(context: CoroutineContext, block: Runnable) = missing() - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) = + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) = missing() private fun missing(): Nothing { diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromLexicalBlockWhenTriggeredByChild.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromLexicalBlockWhenTriggeredByChild.txt index ac40dc152b..b00d67c793 100644 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromLexicalBlockWhenTriggeredByChild.txt +++ b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromLexicalBlockWhenTriggeredByChild.txt @@ -1,7 +1,6 @@ -kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms +kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200ms at _COROUTINE._BOUNDARY._(CoroutineDebugging.kt) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest.outerChildWithTimeout(StackTraceRecoveryWithTimeoutTest.kt:48) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest$testStacktraceIsRecoveredFromLexicalBlockWhenTriggeredByChild$1.invokeSuspend(StackTraceRecoveryWithTimeoutTest.kt:40) -Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms - at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:116) +Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200ms at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:86) diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPoint.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPoint.txt index 9d5ddb6621..5e53e59f08 100644 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPoint.txt +++ b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPoint.txt @@ -1,10 +1,9 @@ -kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms +kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200ms at _COROUTINE._BOUNDARY._(CoroutineDebugging.kt) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest.suspendForever(StackTraceRecoveryWithTimeoutTest.kt:42) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest$outerWithTimeout$2.invokeSuspend(StackTraceRecoveryWithTimeoutTest.kt:32) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest.outerWithTimeout(StackTraceRecoveryWithTimeoutTest.kt:31) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest$testStacktraceIsRecoveredFromSuspensionPoint$1.invokeSuspend(StackTraceRecoveryWithTimeoutTest.kt:19) -Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms - at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:116) +Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200ms at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:86) at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:492) diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPointWithChild.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPointWithChild.txt index 6f21cc6b30..5155f4890a 100644 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPointWithChild.txt +++ b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPointWithChild.txt @@ -1,9 +1,8 @@ -kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms +kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200ms at _COROUTINE._BOUNDARY._(CoroutineDebugging.kt) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest.suspendForever(StackTraceRecoveryWithTimeoutTest.kt:92) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest$outerChild$2.invokeSuspend(StackTraceRecoveryWithTimeoutTest.kt:78) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest.outerChild(StackTraceRecoveryWithTimeoutTest.kt:74) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest$testStacktraceIsRecoveredFromSuspensionPointWithChild$1.invokeSuspend(StackTraceRecoveryWithTimeoutTest.kt:66) -Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms - at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:116) +Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200ms at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:86) diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt index 819b05e9ed..6fc6028c9a 100644 --- a/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt @@ -5,6 +5,7 @@ import org.junit.Test import java.lang.Runnable import java.util.concurrent.* import kotlin.test.* +import kotlin.time.Duration.Companion.seconds class ExecutorAsCoroutineDispatcherDelayTest : TestBase() { @@ -45,13 +46,13 @@ class ExecutorAsCoroutineDispatcherDelayTest : TestBase() { launch(start = CoroutineStart.UNDISPATCHED) { suspendCancellableCoroutine { cont -> expect(1) - (executor.asCoroutineDispatcher() as Delay).scheduleResumeAfterDelay(1_000_000, cont) + (executor.asCoroutineDispatcher() as Delay).scheduleResumeAfterDelay(1_000.seconds, cont) cont.cancel() expect(2) } } expect(3) - assertTrue(executor.getQueue().isEmpty()) + assertTrue(executor.queue.isEmpty()) executor.shutdown() finish(4) } diff --git a/kotlinx-coroutines-core/jvm/test/examples/test/FlowDelayTest.kt b/kotlinx-coroutines-core/jvm/test/examples/test/FlowDelayTest.kt index b0bc05fc8b..31ae908cda 100644 --- a/kotlinx-coroutines-core/jvm/test/examples/test/FlowDelayTest.kt +++ b/kotlinx-coroutines-core/jvm/test/examples/test/FlowDelayTest.kt @@ -6,43 +6,43 @@ import org.junit.Test class FlowDelayTest { @Test - fun testExampleDelay01() { - test("ExampleDelay01") { kotlinx.coroutines.examples.exampleDelay01.main() }.verifyLines( + fun testExampleDelayDuration01() { + test("ExampleDelayDuration01") { kotlinx.coroutines.examples.exampleDelayDuration01.main() }.verifyLines( "3, 4, 5" ) } @Test - fun testExampleDelay02() { - test("ExampleDelay02") { kotlinx.coroutines.examples.exampleDelay02.main() }.verifyLines( + fun testExampleDelayDuration02() { + test("ExampleDelayDuration02") { kotlinx.coroutines.examples.exampleDelayDuration02.main() }.verifyLines( "1, 3, 4, 5" ) } @Test - fun testExampleDelayDuration01() { - test("ExampleDelayDuration01") { kotlinx.coroutines.examples.exampleDelayDuration01.main() }.verifyLines( + fun testExampleDelay01() { + test("ExampleDelay01") { kotlinx.coroutines.examples.exampleDelay01.main() }.verifyLines( "3, 4, 5" ) } @Test - fun testExampleDelayDuration02() { - test("ExampleDelayDuration02") { kotlinx.coroutines.examples.exampleDelayDuration02.main() }.verifyLines( + fun testExampleDelay02() { + test("ExampleDelay02") { kotlinx.coroutines.examples.exampleDelay02.main() }.verifyLines( "1, 3, 4, 5" ) } @Test - fun testExampleDelay03() { - test("ExampleDelay03") { kotlinx.coroutines.examples.exampleDelay03.main() }.verifyLines( + fun testExampleDelayDuration03() { + test("ExampleDelayDuration03") { kotlinx.coroutines.examples.exampleDelayDuration03.main() }.verifyLines( "1, 3, 5, 7, 9" ) } @Test - fun testExampleDelayDuration03() { - test("ExampleDelayDuration03") { kotlinx.coroutines.examples.exampleDelayDuration03.main() }.verifyLines( + fun testExampleDelay03() { + test("ExampleDelay03") { kotlinx.coroutines.examples.exampleDelay03.main() }.verifyLines( "1, 3, 5, 7, 9" ) } diff --git a/kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt index 0d160e6a70..7ccbcdd494 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt @@ -185,7 +185,7 @@ class SharingStressTest : TestBase() { var count = 0L } - private fun log(msg: String) = println("${testStarted.elapsedNow().inWholeMilliseconds} ms: $msg") + private fun log(msg: String) = println("${testStarted.elapsedNow()}: $msg") private fun MutableStateFlow.increment(delta: Int) { update { it + delta } diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/CancellationGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/CancellationGuideTest.kt index f09b762288..796cbb9669 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/CancellationGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/CancellationGuideTest.kt @@ -71,7 +71,7 @@ class CancellationGuideTest { "I'm sleeping 0 ...", "I'm sleeping 1 ...", "I'm sleeping 2 ...", - "Exception in thread \"main\" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms" + "Exception in thread \"main\" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1.3s" ) } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt index dbc6609101..b933f628d2 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt @@ -13,6 +13,7 @@ import kotlin.random.* import kotlin.random.Random import kotlin.test.* import kotlin.time.* +import kotlin.time.Duration.Companion.nanoseconds class CoroutineSchedulerInternalApiStressTest : TestBase() { @@ -71,7 +72,7 @@ class CoroutineSchedulerInternalApiStressTest : TestBase() { ++timesHelped continue } else if (result >= 0L) { - Thread.sleep(result.toDuration(DurationUnit.NANOSECONDS).toDelayMillis()) + Thread.sleep(result.nanoseconds.toDelayMillis()) } else { Thread.sleep(10) } @@ -83,4 +84,3 @@ class CoroutineSchedulerInternalApiStressTest : TestBase() { } } } - diff --git a/kotlinx-coroutines-core/native/src/CoroutineContext.kt b/kotlinx-coroutines-core/native/src/CoroutineContext.kt index 3f4c8d9a01..d84c45396f 100644 --- a/kotlinx-coroutines-core/native/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/native/src/CoroutineContext.kt @@ -2,6 +2,7 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlin.coroutines.* +import kotlin.time.Duration internal actual object DefaultExecutor : CoroutineDispatcher(), Delay { @@ -11,12 +12,12 @@ internal actual object DefaultExecutor : CoroutineDispatcher(), Delay { delegate.dispatch(context, block) } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - delegate.scheduleResumeAfterDelay(timeMillis, continuation) + override fun scheduleResumeAfterDelay(timeout: Duration, continuation: CancellableContinuation) { + delegate.scheduleResumeAfterDelay(timeout, continuation) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - return delegate.invokeOnTimeout(timeMillis, block, context) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + return delegate.invokeOnTimeout(timeout, block, context) } actual fun enqueue(task: Runnable): Unit { diff --git a/kotlinx-coroutines-core/native/src/EventLoop.kt b/kotlinx-coroutines-core/native/src/EventLoop.kt index 58128d52fd..f7996d0c2b 100644 --- a/kotlinx-coroutines-core/native/src/EventLoop.kt +++ b/kotlinx-coroutines-core/native/src/EventLoop.kt @@ -4,7 +4,9 @@ package kotlinx.coroutines import kotlin.coroutines.* import kotlin.native.concurrent.* -import kotlin.time.* +import kotlin.time.Duration +import kotlin.time.Duration.Companion.nanoseconds +import kotlin.time.TimeSource internal actual abstract class EventLoopImplPlatform : EventLoop() { @@ -15,14 +17,14 @@ internal actual abstract class EventLoopImplPlatform : EventLoop() { } protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) { - val delayTimeMillis = delayNanosToMillis(delayedTask.nanoTime - now) - DefaultExecutor.invokeOnTimeout(delayTimeMillis, delayedTask, EmptyCoroutineContext) + // TODO: protect against overflow + DefaultExecutor.invokeOnTimeout((delayedTask.nanoTime - now).nanoseconds, delayedTask, EmptyCoroutineContext) } } internal class EventLoopImpl: EventLoopImplBase() { - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - DefaultDelay.invokeOnTimeout(timeMillis, block, context) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + DefaultDelay.invokeOnTimeout(timeout, block, context) } internal actual fun createEventLoop(): EventLoop = EventLoopImpl() diff --git a/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt b/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt index 7968ffdcef..f49909a7b3 100644 --- a/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt +++ b/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt @@ -24,17 +24,17 @@ internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), worker.executeAfter(0L) { block.run() } } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val handle = schedule(timeMillis, Runnable { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + val handle = schedule(time, Runnable { with(continuation) { resumeUndispatched(Unit) } }) continuation.disposeOnCancellation(handle) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - schedule(timeMillis, block) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + schedule(timeout, block) - private fun schedule(timeMillis: Long, block: Runnable): DisposableHandle { + private fun schedule(time: Duration, block: Runnable): DisposableHandle { // Workers don't have an API to cancel sent "executeAfter" block, but we are trying // to control the damage and reduce reachable objects by nulling out `block` // that may retain a lot of references, and leaving only an empty shell after a timely disposal @@ -65,7 +65,7 @@ internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), } val disposableBlock = DisposableBlock(block) - val targetMoment = TimeSource.Monotonic.markNow() + timeMillis.milliseconds + val targetMoment = TimeSource.Monotonic.markNow() + time worker.runAfterDelay(disposableBlock, targetMoment) return disposableBlock } diff --git a/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt b/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt index 786f0f215d..9d0f5244ad 100644 --- a/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt @@ -8,6 +8,8 @@ import platform.darwin.* import kotlin.coroutines.* import kotlin.concurrent.* import kotlin.native.internal.NativePtr +import kotlin.time.Duration +import kotlin.time.DurationUnit internal fun isMainThread(): Boolean = CFRunLoopGetCurrent() == CFRunLoopGetMain() @@ -42,23 +44,23 @@ private class DarwinMainDispatcher( } } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(timeout: Duration, continuation: CancellableContinuation) { val timer = Timer() val timerBlock: TimerBlock = { timer.dispose() continuation.resume(Unit) } - timer.start(timeMillis, timerBlock) + timer.start(timeout, timerBlock) continuation.disposeOnCancellation(timer) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { val timer = Timer() val timerBlock: TimerBlock = { timer.dispose() block.run() } - timer.start(timeMillis, timerBlock) + timer.start(timeout, timerBlock) return timer } @@ -74,8 +76,8 @@ private val TIMER_DISPOSED = NativePtr.NULL.plus(1) private class Timer : DisposableHandle { private val ref = AtomicNativePtr(TIMER_NEW) - fun start(timeMillis: Long, timerBlock: TimerBlock) { - val fireDate = CFAbsoluteTimeGetCurrent() + timeMillis / 1000.0 + fun start(timeout: Duration, timerBlock: TimerBlock) { + val fireDate = CFAbsoluteTimeGetCurrent() + timeout.toDouble(DurationUnit.SECONDS) val timer = CFRunLoopTimerCreateWithHandler(null, fireDate, 0.0, 0u, 0, timerBlock) CFRunLoopAddTimer(CFRunLoopGetMain(), timer, kCFRunLoopCommonModes) if (!ref.compareAndSet(TIMER_NEW, timer.rawValue)) { diff --git a/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt b/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt index a0f392e5b0..a132f1863f 100644 --- a/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt +++ b/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines import kotlin.coroutines.CoroutineContext import kotlin.wasm.* import kotlin.wasm.unsafe.* +import kotlin.time.Duration @WasmImport("wasi_snapshot_preview1", "poll_oneoff") private external fun wasiPollOneOff(ptrToSubscription: Int, eventPtr: Int, nsubscriptions: Int, resultPtr: Int): Int @@ -50,8 +51,8 @@ internal actual object DefaultExecutor : EventLoopImplBase() { // don't do anything: on WASI, the event loop is the default executor, we can't shut it down } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - scheduleInvokeOnTimeout(timeMillis, block) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + scheduleInvokeOnTimeout(timeout, block) } internal actual abstract class EventLoopImplPlatform : EventLoop() { diff --git a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api index 77cc854cd6..8a127fbffb 100644 --- a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api +++ b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api @@ -70,11 +70,10 @@ public final class kotlinx/coroutines/test/TestCoroutineScopeKt { public static final fun runCurrent (Lkotlinx/coroutines/test/TestCoroutineScope;)V } -public abstract class kotlinx/coroutines/test/TestDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay, kotlinx/coroutines/DelayWithTimeoutDiagnostics { - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; +public abstract class kotlinx/coroutines/test/TestDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay { public abstract fun getScheduler ()Lkotlinx/coroutines/test/TestCoroutineScheduler; - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V public synthetic fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; } diff --git a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.klib.api b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.klib.api index 38dfad99c6..ff63c2dd2b 100644 --- a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.klib.api +++ b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.klib.api @@ -14,24 +14,23 @@ sealed interface kotlinx.coroutines.test/TestScope : kotlinx.coroutines/Coroutin abstract fun (): kotlinx.coroutines.test/TestCoroutineScheduler // kotlinx.coroutines.test/TestScope.testScheduler.|(){}[0] } -abstract class kotlinx.coroutines.test/TestDispatcher : kotlinx.coroutines/CoroutineDispatcher, kotlinx.coroutines/Delay, kotlinx.coroutines/DelayWithTimeoutDiagnostics { // kotlinx.coroutines.test/TestDispatcher|null[0] +abstract class kotlinx.coroutines.test/TestDispatcher : kotlinx.coroutines/CoroutineDispatcher, kotlinx.coroutines/Delay { // kotlinx.coroutines.test/TestDispatcher|null[0] abstract val scheduler // kotlinx.coroutines.test/TestDispatcher.scheduler|{}scheduler[0] abstract fun (): kotlinx.coroutines.test/TestCoroutineScheduler // kotlinx.coroutines.test/TestDispatcher.scheduler.|(){}[0] - open fun invokeOnTimeout(kotlin/Long, kotlinx.coroutines/Runnable, kotlin.coroutines/CoroutineContext): kotlinx.coroutines/DisposableHandle // kotlinx.coroutines.test/TestDispatcher.invokeOnTimeout|invokeOnTimeout(kotlin.Long;kotlinx.coroutines.Runnable;kotlin.coroutines.CoroutineContext){}[0] - open fun scheduleResumeAfterDelay(kotlin/Long, kotlinx.coroutines/CancellableContinuation) // kotlinx.coroutines.test/TestDispatcher.scheduleResumeAfterDelay|scheduleResumeAfterDelay(kotlin.Long;kotlinx.coroutines.CancellableContinuation){}[0] + open fun invokeOnTimeout(kotlin.time/Duration, kotlinx.coroutines/Runnable, kotlin.coroutines/CoroutineContext): kotlinx.coroutines/DisposableHandle // kotlinx.coroutines.test/TestDispatcher.invokeOnTimeout|invokeOnTimeout(kotlin.time.Duration;kotlinx.coroutines.Runnable;kotlin.coroutines.CoroutineContext){}[0] + open fun scheduleResumeAfterDelay(kotlin.time/Duration, kotlinx.coroutines/CancellableContinuation) // kotlinx.coroutines.test/TestDispatcher.scheduleResumeAfterDelay|scheduleResumeAfterDelay(kotlin.time.Duration;kotlinx.coroutines.CancellableContinuation){}[0] open fun timeoutMessage(kotlin.time/Duration): kotlin/String // kotlinx.coroutines.test/TestDispatcher.timeoutMessage|timeoutMessage(kotlin.time.Duration){}[0] } final class kotlinx.coroutines.test/TestCoroutineScheduler : kotlin.coroutines/AbstractCoroutineContextElement, kotlin.coroutines/CoroutineContext.Element { // kotlinx.coroutines.test/TestCoroutineScheduler|null[0] constructor () // kotlinx.coroutines.test/TestCoroutineScheduler.|(){}[0] + final val currentTime // kotlinx.coroutines.test/TestCoroutineScheduler.currentTime|{}currentTime[0] + final fun (): kotlin/Long // kotlinx.coroutines.test/TestCoroutineScheduler.currentTime.|(){}[0] final val timeSource // kotlinx.coroutines.test/TestCoroutineScheduler.timeSource|{}timeSource[0] final fun (): kotlin.time/TimeSource.WithComparableMarks // kotlinx.coroutines.test/TestCoroutineScheduler.timeSource.|(){}[0] - final var currentTime // kotlinx.coroutines.test/TestCoroutineScheduler.currentTime|{}currentTime[0] - final fun (): kotlin/Long // kotlinx.coroutines.test/TestCoroutineScheduler.currentTime.|(){}[0] - final fun advanceTimeBy(kotlin.time/Duration) // kotlinx.coroutines.test/TestCoroutineScheduler.advanceTimeBy|advanceTimeBy(kotlin.time.Duration){}[0] final fun advanceTimeBy(kotlin/Long) // kotlinx.coroutines.test/TestCoroutineScheduler.advanceTimeBy|advanceTimeBy(kotlin.Long){}[0] final fun advanceUntilIdle() // kotlinx.coroutines.test/TestCoroutineScheduler.advanceUntilIdle|advanceUntilIdle(){}[0] diff --git a/kotlinx-coroutines-test/common/src/TestCoroutineDispatchers.kt b/kotlinx-coroutines-test/common/src/TestCoroutineDispatchers.kt index bf1b62a171..071c8a5d2f 100644 --- a/kotlinx-coroutines-test/common/src/TestCoroutineDispatchers.kt +++ b/kotlinx-coroutines-test/common/src/TestCoroutineDispatchers.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.test.internal.TestMainDispatcher import kotlin.coroutines.* +import kotlin.time.Duration /** * Creates an instance of an unconfined [TestDispatcher]. @@ -147,7 +148,7 @@ private class StandardTestDispatcherImpl( ) : TestDispatcher() { override fun dispatch(context: CoroutineContext, block: Runnable) { - scheduler.registerEvent(this, 0, block, context) { false } + scheduler.registerEvent(this, Duration.ZERO, block, context) { false } } override fun toString(): String = "${name ?: "StandardTestDispatcher"}[scheduler=$scheduler]" diff --git a/kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt b/kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt index 8c70fa8e05..fc216b677b 100644 --- a/kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt +++ b/kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt @@ -39,11 +39,29 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout /** This counter establishes some order on the events that happen at the same virtual time. */ private val count = atomic(0L) + private val _timeSource = TestTimeSource() + private val startMark = _timeSource.markNow() + + // may only be called under the lock + private fun setVirtualTime(time: ComparableTimeMark) { + val toWait = time - _timeSource.markNow() + when { + toWait < Duration.ZERO -> currentTimeAheadOfEvents() + toWait > Duration.ZERO -> try { + _timeSource += toWait + } catch (_: IllegalStateException) { + throw IllegalStateException( + "The test scheduler encountered too large a value: at ${timeSource.markNow()}, " + + "we tried to advance by $toWait, but the maximum value is ${Long.MAX_VALUE / 2} ms." + ) + } + } + } + /** The current virtual time in milliseconds. */ @ExperimentalCoroutinesApi - public var currentTime: Long = 0 - get() = synchronized(lock) { field } - private set + public val currentTime: Long + get() = synchronized(lock) { startMark.elapsedNow().inWholeMilliseconds } /** A channel for notifying about the fact that a foreground work dispatch recently happened. */ private val dispatchEventsForeground: Channel = Channel(CONFLATED) @@ -52,25 +70,28 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout private val dispatchEvents: Channel = Channel(CONFLATED) /** - * Registers a request for the scheduler to notify [dispatcher] at a virtual moment [timeDeltaMillis] milliseconds + * Registers a request for the scheduler to notify [dispatcher] at a virtual moment [timeDelta] milliseconds * later via [TestDispatcher.processEvent], which will be called with the provided [marker] object. * * Returns the handler which can be used to cancel the registration. */ internal fun registerEvent( dispatcher: TestDispatcher, - timeDeltaMillis: Long, + timeDelta: Duration, marker: T, context: CoroutineContext, isCancelled: (T) -> Boolean ): DisposableHandle { - require(timeDeltaMillis >= 0) { "Attempted scheduling an event earlier in time (with the time delta $timeDeltaMillis)" } + require(timeDelta >= Duration.ZERO) { + "Attempted scheduling an event earlier in time (with the time delta $timeDelta)" + } checkSchedulerInContext(this, context) val count = count.getAndIncrement() val isForeground = context[BackgroundWork] === null return synchronized(lock) { - val time = addClamping(currentTime, timeDeltaMillis) - val event = TestDispatchEvent(dispatcher, count, time, marker as Any, isForeground) { isCancelled(marker) } + val event = TestDispatchEvent( + dispatcher, count, _timeSource.markNow() + timeDelta, marker as Any, isForeground + ) { isCancelled(marker) } events.addLast(event) /** can't be moved above: otherwise, [onDispatchEventForeground] or [onDispatchEvent] could consume the * token sent here before there's actually anything in the event queue. */ @@ -91,9 +112,7 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout val event = synchronized(lock) { if (condition()) return false val event = events.removeFirstOrNull() ?: return false - if (currentTime > event.time) - currentTimeAheadOfEvents() - currentTime = event.time + setVirtualTime(event.time) event } event.dispatcher.processEvent(event.marker) @@ -124,7 +143,7 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout * Runs the tasks that are scheduled to execute at this moment of virtual time. */ public fun runCurrent() { - val timeMark = synchronized(lock) { currentTime } + val timeMark = synchronized(lock) { _timeSource.markNow() } while (true) { val event = synchronized(lock) { events.removeFirstIf { it.time <= timeMark } ?: return @@ -160,20 +179,19 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout */ public fun advanceTimeBy(delayTime: Duration) { require(!delayTime.isNegative()) { "Can not advance time by a negative delay: $delayTime" } - val startingTime = currentTime - val targetTime = addClamping(startingTime, delayTime.inWholeMilliseconds) + val targetTime = synchronized(lock) { _timeSource.markNow() } + delayTime while (true) { val event = synchronized(lock) { - val timeMark = currentTime + val timeMark = _timeSource.markNow() val event = events.removeFirstIf { targetTime > it.time } when { event == null -> { - currentTime = targetTime + setVirtualTime(targetTime) return } timeMark > event.time -> currentTimeAheadOfEvents() else -> { - currentTime = event.time + setVirtualTime(event.time) event } } @@ -219,6 +237,7 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout /** * Returns the [TimeSource] representation of the virtual time of this scheduler. */ + // this is a wrapper around `_timeSource` to prevent external modifications public val timeSource: TimeSource.WithComparableMarks = object : AbstractLongTimeSource(DurationUnit.MILLISECONDS) { override fun read(): Long = currentTime } @@ -234,7 +253,7 @@ private fun invalidSchedulerState(): Nothing = private class TestDispatchEvent( @JvmField val dispatcher: TestDispatcher, private val count: Long, - @JvmField val time: Long, + @JvmField val time: ComparableTimeMark, @JvmField val marker: T, @JvmField val isForeground: Boolean, // TODO: remove once the deprecated API is gone @@ -249,9 +268,6 @@ private class TestDispatchEvent( override fun toString() = "TestDispatchEvent(time=$time, dispatcher=$dispatcher${if (isForeground) "" else ", background"})" } -// works with positive `a`, `b` -private fun addClamping(a: Long, b: Long): Long = (a + b).let { if (it >= 0) it else Long.MAX_VALUE } - internal fun checkSchedulerInContext(scheduler: TestCoroutineScheduler, context: CoroutineContext) { context[TestCoroutineScheduler]?.let { check(it === scheduler) { diff --git a/kotlinx-coroutines-test/common/src/TestDispatcher.kt b/kotlinx-coroutines-test/common/src/TestDispatcher.kt index a4427a1a6f..a30176e101 100644 --- a/kotlinx-coroutines-test/common/src/TestDispatcher.kt +++ b/kotlinx-coroutines-test/common/src/TestDispatcher.kt @@ -14,7 +14,7 @@ import kotlin.time.* * the virtual time. */ @Suppress("INVISIBLE_REFERENCE") -public abstract class TestDispatcher internal constructor() : CoroutineDispatcher(), Delay, DelayWithTimeoutDiagnostics { +public abstract class TestDispatcher internal constructor() : CoroutineDispatcher(), Delay { /** The scheduler that this dispatcher is linked to. */ public abstract val scheduler: TestCoroutineScheduler @@ -25,11 +25,11 @@ public abstract class TestDispatcher internal constructor() : CoroutineDispatche } /** @suppress */ - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { val timedRunnable = CancellableContinuationRunnable(continuation, this) val handle = scheduler.registerEvent( this, - timeMillis, + time, timedRunnable, continuation.context, ::cancellableRunnableIsCancelled @@ -38,8 +38,8 @@ public abstract class TestDispatcher internal constructor() : CoroutineDispatche } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - scheduler.registerEvent(this, timeMillis, block, context) { false } + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + scheduler.registerEvent(this, timeout, block, context) { false } /** @suppress */ @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER") diff --git a/kotlinx-coroutines-test/common/src/internal/TestMainDispatcher.kt b/kotlinx-coroutines-test/common/src/internal/TestMainDispatcher.kt index b94e261f7b..89fbb2e8e6 100644 --- a/kotlinx-coroutines-test/common/src/internal/TestMainDispatcher.kt +++ b/kotlinx-coroutines-test/common/src/internal/TestMainDispatcher.kt @@ -4,6 +4,7 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.test.* import kotlin.coroutines.* +import kotlin.time.Duration /** * The testable main dispatcher used by kotlinx-coroutines-test. @@ -36,11 +37,11 @@ internal class TestMainDispatcher(delegate: CoroutineDispatcher): delegate.value = mainDispatcher } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) = - delay.scheduleResumeAfterDelay(timeMillis, continuation) + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) = + delay.scheduleResumeAfterDelay(time, continuation) - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - delay.invokeOnTimeout(timeMillis, block, context) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + delay.invokeOnTimeout(timeout, block, context) companion object { internal val currentTestDispatcher diff --git a/kotlinx-coroutines-test/jvm/src/migration/TestCoroutineDispatcher.kt b/kotlinx-coroutines-test/jvm/src/migration/TestCoroutineDispatcher.kt index 7f8062257d..cb2af3b4be 100644 --- a/kotlinx-coroutines-test/jvm/src/migration/TestCoroutineDispatcher.kt +++ b/kotlinx-coroutines-test/jvm/src/migration/TestCoroutineDispatcher.kt @@ -2,6 +2,7 @@ package kotlinx.coroutines.test import kotlinx.coroutines.* import kotlin.coroutines.* +import kotlin.time.Duration /** * @suppress @@ -43,7 +44,7 @@ public class TestCoroutineDispatcher(public override val scheduler: TestCoroutin override fun toString(): String = "TestCoroutineDispatcher[scheduler=$scheduler]" private fun post(block: Runnable, context: CoroutineContext) = - scheduler.registerEvent(this, 0, block, context) { false } + scheduler.registerEvent(this, Duration.ZERO, block, context) { false } val currentTime: Long get() = scheduler.currentTime diff --git a/kotlinx-coroutines-test/jvm/test/migration/TestRunBlockingOrderTest.kt b/kotlinx-coroutines-test/jvm/test/migration/TestRunBlockingOrderTest.kt index 5ba1216909..271d29af14 100644 --- a/kotlinx-coroutines-test/jvm/test/migration/TestRunBlockingOrderTest.kt +++ b/kotlinx-coroutines-test/jvm/test/migration/TestRunBlockingOrderTest.kt @@ -56,7 +56,7 @@ class TestRunBlockingOrderTest: OrderedExecutionTestBase() { expect(1) delay(100) // move time forward a bit some that naive time + delay gives an overflow launch { - delay(Long.MAX_VALUE / 2) // very long delay + delay(Long.MAX_VALUE / 3_000_000) // very long delay finish(4) } launch { diff --git a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api index 5a881a128e..3f67331e63 100644 --- a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api +++ b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api @@ -54,13 +54,13 @@ public final class kotlinx/coroutines/reactor/ReactorFlowKt { public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay { public fun (Lreactor/core/scheduler/Scheduler;)V - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lreactor/core/scheduler/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V + public fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt b/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt index 5371ff39d7..89cf633e8a 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt @@ -5,6 +5,7 @@ import reactor.core.Disposable import reactor.core.scheduler.Scheduler import java.util.concurrent.TimeUnit import kotlin.coroutines.CoroutineContext +import kotlin.time.Duration /** * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]. @@ -27,16 +28,16 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { val disposable = scheduler.schedule({ with(continuation) { resumeUndispatched(Unit) } - }, timeMillis, TimeUnit.MILLISECONDS) + }, time.inWholeNanoseconds, TimeUnit.NANOSECONDS) continuation.disposeOnCancellation(disposable.asDisposableHandle()) } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - scheduler.schedule(block, timeMillis, TimeUnit.MILLISECONDS).asDisposableHandle() + override fun invokeOnTimeout(time: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + scheduler.schedule(block, time.inWholeNanoseconds, TimeUnit.NANOSECONDS).asDisposableHandle() /** @suppress */ override fun toString(): String = scheduler.toString() diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api index 803ac90564..e93b40c07a 100644 --- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api +++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api @@ -83,13 +83,13 @@ public final class kotlinx/coroutines/rx2/RxSingleKt { public final class kotlinx/coroutines/rx2/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay { public fun (Lio/reactivex/Scheduler;)V - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lio/reactivex/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V + public fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt index bac20210f6..20b537779d 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt @@ -8,6 +8,8 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import java.util.concurrent.* import kotlin.coroutines.* +import kotlin.time.Duration +import kotlin.time.Duration.Companion.nanoseconds /** * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher] @@ -52,7 +54,7 @@ private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) private val workerCounter = atomic(1L) override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable = - scope.scheduleTask(block, unit.toMillis(delay)) { task -> + scope.scheduleTask(block, delay, unit) { task -> Runnable { scope.launch { task() } } } @@ -81,7 +83,7 @@ private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) } override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable = - workerScope.scheduleTask(block, unit.toMillis(delay)) { task -> + workerScope.scheduleTask(block, delay, unit) { task -> Runnable { blockChannel.trySend(task) } } @@ -106,7 +108,8 @@ private typealias Task = suspend () -> Unit */ private fun CoroutineScope.scheduleTask( block: Runnable, - delayMillis: Long, + delay: Long, + unit: TimeUnit, adaptForScheduling: (Task) -> Runnable ): Disposable { val ctx = coroutineContext @@ -129,11 +132,11 @@ private fun CoroutineScope.scheduleTask( val toSchedule = adaptForScheduling(::task) if (!isActive) return Disposables.disposed() - if (delayMillis <= 0) { + if (delay <= 0) { toSchedule.run() } else { @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2 - ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it } + ctx.delay.invokeOnTimeout(unit.toNanos(delay).nanoseconds, toSchedule, ctx).let { handle = it } } return disposable } @@ -153,16 +156,16 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { val disposable = scheduler.scheduleDirect({ with(continuation) { resumeUndispatched(Unit) } - }, timeMillis, TimeUnit.MILLISECONDS) + }, time.inWholeNanoseconds, TimeUnit.NANOSECONDS) continuation.disposeOnCancellation(disposable) } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val disposable = scheduler.scheduleDirect(block, timeout.inWholeNanoseconds, TimeUnit.NANOSECONDS) return DisposableHandle { disposable.dispose() } } diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api index f86276e195..61694f4f5b 100644 --- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api +++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api @@ -70,13 +70,13 @@ public final class kotlinx/coroutines/rx3/RxSingleKt { public final class kotlinx/coroutines/rx3/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay { public fun (Lio/reactivex/rxjava3/core/Scheduler;)V - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lio/reactivex/rxjava3/core/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V + public fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt index 8f77d4c867..8916f435e4 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt @@ -8,6 +8,8 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import java.util.concurrent.* import kotlin.coroutines.* +import kotlin.time.Duration +import kotlin.time.Duration.Companion.nanoseconds /** * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher] @@ -52,7 +54,7 @@ private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) private val workerCounter = atomic(1L) override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable = - scope.scheduleTask(block, unit.toMillis(delay)) { task -> + scope.scheduleTask(block, delay, unit) { task -> Runnable { scope.launch { task() } } } @@ -81,7 +83,7 @@ private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) } override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable = - workerScope.scheduleTask(block, unit.toMillis(delay)) { task -> + workerScope.scheduleTask(block, delay, unit) { task -> Runnable { blockChannel.trySend(task) } } @@ -106,7 +108,8 @@ private typealias Task = suspend () -> Unit */ private fun CoroutineScope.scheduleTask( block: Runnable, - delayMillis: Long, + delay: Long, + unit: TimeUnit, adaptForScheduling: (Task) -> Runnable ): Disposable { val ctx = coroutineContext @@ -129,11 +132,11 @@ private fun CoroutineScope.scheduleTask( val toSchedule = adaptForScheduling(::task) if (!isActive) return Disposable.disposed() - if (delayMillis <= 0) { + if (delay <= 0) { toSchedule.run() } else { @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2 - ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it } + ctx.delay.invokeOnTimeout(unit.toNanos(delay).nanoseconds, toSchedule, ctx).let { handle = it } } return disposable } @@ -153,16 +156,16 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { val disposable = scheduler.scheduleDirect({ with(continuation) { resumeUndispatched(Unit) } - }, timeMillis, TimeUnit.MILLISECONDS) + }, time.inWholeNanoseconds, TimeUnit.NANOSECONDS) continuation.disposeOnCancellation(disposable) } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val disposable = scheduler.scheduleDirect(block, timeout.inWholeNanoseconds, TimeUnit.NANOSECONDS) return DisposableHandle { disposable.dispose() } } diff --git a/ui/kotlinx-coroutines-android/android-unit-tests/test/ordered/tests/TestComponent.kt b/ui/kotlinx-coroutines-android/android-unit-tests/test/ordered/tests/TestComponent.kt index fbd6d8135a..8ca9bdfe44 100644 --- a/ui/kotlinx-coroutines-android/android-unit-tests/test/ordered/tests/TestComponent.kt +++ b/ui/kotlinx-coroutines-android/android-unit-tests/test/ordered/tests/TestComponent.kt @@ -17,7 +17,7 @@ public class TestComponent { fun launchDelayed() { scope.launch { - delay(Long.MAX_VALUE / 2) + delay(Long.MAX_VALUE / 2 - 1) delayedLaunchCompleted = true } } diff --git a/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api b/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api index 090c14e09c..d07d985bd0 100644 --- a/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api +++ b/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api @@ -1,7 +1,7 @@ public abstract class kotlinx/coroutines/android/HandlerDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun getImmediate ()Lkotlinx/coroutines/android/HandlerDispatcher; - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; } public final class kotlinx/coroutines/android/HandlerDispatcherKt { diff --git a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt index 9261b2e1c7..069f422c9c 100644 --- a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt +++ b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import java.lang.reflect.* import kotlin.coroutines.* +import kotlin.time.Duration /** * Dispatches execution onto Android [Handler]. @@ -136,25 +137,29 @@ internal class HandlerContext private constructor( } } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { val block = Runnable { with(continuation) { resumeUndispatched(Unit) } } - if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) { + if (schedule(time, block)) { continuation.invokeOnCancellation { handler.removeCallbacks(block) } } else { cancelOnRejection(continuation.context, block) } } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) { + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + if (schedule(timeout, block)) { return DisposableHandle { handler.removeCallbacks(block) } } cancelOnRejection(context, block) return NonDisposableHandle } + @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2 + private fun schedule(time: Duration, block: Runnable): Boolean = + handler.postDelayed(block, time.toDelayMillis().coerceAtMost(MAX_DELAY)) + private fun cancelOnRejection(context: CoroutineContext, block: Runnable) { context.cancel(CancellationException("The task was rejected, the handler underlying the dispatcher '${toString()}' was closed")) Dispatchers.IO.dispatch(context, block) diff --git a/ui/kotlinx-coroutines-android/test/DisabledHandlerTest.kt b/ui/kotlinx-coroutines-android/test/DisabledHandlerTest.kt index e5a877e305..28a52304ed 100644 --- a/ui/kotlinx-coroutines-android/test/DisabledHandlerTest.kt +++ b/ui/kotlinx-coroutines-android/test/DisabledHandlerTest.kt @@ -51,7 +51,7 @@ class DisabledHandlerTest : TestBase() { withContext(disabledDispatcher) { expect(1) delegateToSuper = false - delay(Long.MAX_VALUE - 1) + delay(Long.MAX_VALUE / 2 - 1) expectUnreached() } expectUnreached() diff --git a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api index e2c3b8f326..6f6d2c2ec4 100644 --- a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api +++ b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api @@ -3,10 +3,10 @@ public final class kotlinx/coroutines/javafx/JavaFxConvertKt { } public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V + public fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; } public final class kotlinx/coroutines/javafx/JavaFxDispatcherKt { diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt index 941f458282..54e80aa2c1 100644 --- a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt +++ b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt @@ -3,13 +3,15 @@ package kotlinx.coroutines.javafx import javafx.animation.* import javafx.application.* import javafx.event.* -import javafx.util.* +import javafx.util.Duration as jfxDuration +import kotlin.time.Duration import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import java.lang.UnsupportedOperationException import java.lang.reflect.* import java.util.concurrent.* import kotlin.coroutines.* +import kotlin.time.DurationUnit /** * Dispatches execution onto JavaFx application thread and provides native [delay] support. @@ -29,23 +31,23 @@ public sealed class JavaFxDispatcher : MainCoroutineDispatcher(), Delay { override fun dispatch(context: CoroutineContext, block: Runnable): Unit = Platform.runLater(block) /** @suppress */ - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val timeline = schedule(timeMillis) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + val timeline = schedule(time) { with(continuation) { resumeUndispatched(Unit) } } continuation.invokeOnCancellation { timeline.stop() } } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val timeline = schedule(timeMillis) { + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val timeline = schedule(timeout) { block.run() } return DisposableHandle { timeline.stop() } } - private fun schedule(timeMillis: Long, handler: EventHandler): Timeline = - Timeline(KeyFrame(Duration.millis(timeMillis.toDouble()), handler)).apply { play() } + private fun schedule(time: Duration, handler: EventHandler): Timeline = + Timeline(KeyFrame(jfxDuration.millis(time.toDouble(DurationUnit.MILLISECONDS)), handler)).apply { play() } } internal class JavaFxDispatcherFactory : MainDispatcherFactory { diff --git a/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api b/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api index d33191fd96..873e515ffb 100644 --- a/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api +++ b/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api @@ -1,8 +1,8 @@ public abstract class kotlinx/coroutines/swing/SwingDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V + public fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; } public final class kotlinx/coroutines/swing/SwingDispatcherKt { diff --git a/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt b/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt index aa378be12d..de5328530a 100644 --- a/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt +++ b/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt @@ -5,6 +5,8 @@ import kotlinx.coroutines.internal.* import java.awt.event.* import javax.swing.* import kotlin.coroutines.* +import kotlin.time.Duration +import kotlin.time.DurationUnit /** * Dispatches execution onto Swing event dispatching thread and provides native [delay] support. @@ -23,23 +25,23 @@ public sealed class SwingDispatcher : MainCoroutineDispatcher(), Delay { override fun dispatch(context: CoroutineContext, block: Runnable): Unit = SwingUtilities.invokeLater(block) /** @suppress */ - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val timer = schedule(timeMillis) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + val timer = schedule(time) { with(continuation) { resumeUndispatched(Unit) } } continuation.invokeOnCancellation { timer.stop() } } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val timer = schedule(timeMillis) { + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val timer = schedule(timeout) { block.run() } return DisposableHandle { timer.stop() } } - private fun schedule(timeMillis: Long, action: ActionListener): Timer = - Timer(timeMillis.coerceAtMost(Int.MAX_VALUE.toLong()).toInt(), action).apply { + private fun schedule(time: Duration, action: ActionListener): Timer = + Timer(time.toInt(DurationUnit.MILLISECONDS), action).apply { isRepeats = false start() }