From 45c6b66fdd6e294883b2353a0c513fb139aa89f1 Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Wed, 16 Aug 2023 17:26:10 +0200 Subject: [PATCH] Renamed Condition -> ConditionVariable --- CHANGELOG.md | 4 +- lib/async.dart | 2 +- ...condition.dart => condition_variable.dart} | 14 ++-- lib/src/stream_extensions.dart | 67 ++++++++++++------- 4 files changed, 53 insertions(+), 34 deletions(-) rename lib/src/{condition.dart => condition_variable.dart} (83%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 77a44b3..bc9bb9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,8 @@ ## 2.12.0-wip - Require Dart 2.19 -- Add `Condition`, allowing tasks to wait for `Condition.wait`, which is - completed the next time `Condition.notify` is called by another task. +- Add `ConditionVariable`, allowing tasks to wait for `ConditionVariable.wait`, + which is completed the next time `ConditionVariable.notify` is called. - Added `StreamExtensions.boundedForEach(N, each)` to enable concurrently running a micro-task for each item in a stream, while never running more than `N` micro-tasks concurrently. diff --git a/lib/async.dart b/lib/async.dart index e7b3189..68ecdc9 100644 --- a/lib/async.dart +++ b/lib/async.dart @@ -13,7 +13,7 @@ export 'src/async_memoizer.dart'; export 'src/byte_collector.dart'; export 'src/cancelable_operation.dart'; export 'src/chunked_stream_reader.dart'; -export 'src/condition.dart'; +export 'src/condition_variable.dart'; export 'src/delegate/event_sink.dart'; export 'src/delegate/future.dart'; export 'src/delegate/sink.dart'; diff --git a/lib/src/condition.dart b/lib/src/condition_variable.dart similarity index 83% rename from lib/src/condition.dart rename to lib/src/condition_variable.dart index 23f92d9..b3223bd 100644 --- a/lib/src/condition.dart +++ b/lib/src/condition_variable.dart @@ -6,23 +6,23 @@ import 'dart:async'; import 'package:meta/meta.dart'; -/// A [Condition] allows micro-tasks to [wait] for other micro-tasks to +/// A [ConditionVariable] allows micro-tasks to [wait] for other micro-tasks to /// [notify]. /// -/// [Condition] is a concurrency primitive that allows one micro-task to +/// [ConditionVariable] is a concurrency primitive that allows one micro-task to /// wait for notification from another micro-task. The [Future] return from /// [wait] will be completed the next time [notify] is called. /// /// ```dart /// var weather = 'rain'; -/// final condition = Condition(); +/// final cond = ConditionVariable(); /// /// // Create a micro task to fetch the weather /// scheduleMicrotask(() async { /// // Infinitely loop that just keeps the weather up-to-date /// while (true) { /// weather = await getWeather(); -/// condition.notify(); +/// cond.notify(); /// /// // Sleep 5s before updating the weather again /// await Future.delayed(Duration(seconds: 5)); @@ -31,12 +31,12 @@ import 'package:meta/meta.dart'; /// /// // Wait for sunny weather /// while (weather != 'sunny') { -/// await condition.wait; +/// await cond.wait; /// } /// ``` // TODO: Apply `final` when language version for this library is bumped to 3.0 @sealed -class Condition { +class ConditionVariable { var _completer = Completer(); /// Complete all futures previously returned by [wait]. @@ -55,7 +55,7 @@ class Condition { /// the future will be completed, and any new calls to [wait] will return a /// new future. This future will also be unresolved, until [notify] is called. /// - /// The [Future] return from this condition will never throw. + /// The [Future] return from this condition variable will never throw. Future get wait { if (_completer.isCompleted) { _completer = Completer(); diff --git a/lib/src/stream_extensions.dart b/lib/src/stream_extensions.dart index 371a018..b2a6d64 100644 --- a/lib/src/stream_extensions.dart +++ b/lib/src/stream_extensions.dart @@ -4,7 +4,7 @@ import 'dart:async'; -import 'condition.dart'; +import 'condition_variable.dart'; /// Utility extensions on [Stream]. extension StreamExtensions on Stream { @@ -86,46 +86,65 @@ extension StreamExtensions on Stream { /// number of [each] calls at the same time. /// /// This function will wait for the futures returned by [each] to be resolved - /// before completing. If any [each] invocation throws, [boundedForEach] will + /// before completing. If any [each] invocation throws, [boundedForEach] stop + /// subsequent calls + /// + /// will + /// call [onError], if [onError] throws (it does) /// continue subsequent [each] calls, ignore additional errors and throw the /// first error encountered. Future boundedForEach( int concurrency, - FutureOr Function(T item) each, - ) async { + FutureOr Function(T item) each, { + FutureOr Function(Object e, StackTrace? st) onError = _throwOnError, + }) async { Object? firstError; StackTrace? firstStackTrace; var running = 0; - final wakeUp = Condition(); - await for (final item in this) { - running += 1; - scheduleMicrotask(() async { - try { - await each(item); - } catch (e, st) { - if (firstError == null) { - firstError = e; - firstStackTrace = st; + final wakeUp = ConditionVariable(); + + try { + var doBreak = false; + await for (final item in this) { + running += 1; + scheduleMicrotask(() async { + try { + await each(item); + } catch (e, st) { + try { + await onError(e, st); + } catch (e, st) { + doBreak = true; + if (firstError == null) { + firstError = e; + firstStackTrace = st; + } + } + } finally { + running -= 1; + wakeUp.notify(); } - } finally { - running -= 1; - wakeUp.notify(); - } - }); + }); - if (running >= concurrency) { + if (running >= concurrency) { + await wakeUp.wait; + } + if (doBreak) { + break; + } + } + } finally { + while (running >= concurrency) { await wakeUp.wait; } } - while (running >= concurrency) { - await wakeUp.wait; - } - final firstError_ = firstError; if (firstError_ != null) { return Future.error(firstError_, firstStackTrace); } } } + +Future _throwOnError(Object e, StackTrace? st) => Future.error(e, st);