diff --git a/CHANGELOG.md b/CHANGELOG.md index bc9bb9f..e8a452d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,9 @@ ## 2.12.0-wip - Require Dart 2.19 -- Add `ConditionVariable`, allowing tasks to wait for `ConditionVariable.wait`, - which is completed the next time `ConditionVariable.notify` is called. -- Added `StreamExtensions.boundedForEach(N, each)` to enable concurrently +- Add `Notifier`, allowing micro-tasks to wait for `Notifier.wait`, + which is completed the next time `Notifier.notify()` is called. +- Added `StreamExtensions.parallelForEach(N, each)` to enable concurrently running a micro-task for each item in a stream, while never running more than `N` micro-tasks concurrently. diff --git a/lib/async.dart b/lib/async.dart index 68ecdc9..1b40fcc 100644 --- a/lib/async.dart +++ b/lib/async.dart @@ -13,7 +13,6 @@ export 'src/async_memoizer.dart'; export 'src/byte_collector.dart'; export 'src/cancelable_operation.dart'; export 'src/chunked_stream_reader.dart'; -export 'src/condition_variable.dart'; export 'src/delegate/event_sink.dart'; export 'src/delegate/future.dart'; export 'src/delegate/sink.dart'; @@ -23,6 +22,7 @@ export 'src/delegate/stream_sink.dart'; export 'src/delegate/stream_subscription.dart'; export 'src/future_group.dart'; export 'src/lazy_stream.dart'; +export 'src/notifier.dart'; export 'src/null_stream_sink.dart'; export 'src/restartable_timer.dart'; export 'src/result/error.dart'; diff --git a/lib/src/condition_variable.dart b/lib/src/notifier.dart similarity index 66% rename from lib/src/condition_variable.dart rename to lib/src/notifier.dart index b3223bd..1aad9be 100644 --- a/lib/src/condition_variable.dart +++ b/lib/src/notifier.dart @@ -6,23 +6,23 @@ import 'dart:async'; import 'package:meta/meta.dart'; -/// A [ConditionVariable] allows micro-tasks to [wait] for other micro-tasks to +/// A [Notifier] allows micro-tasks to [wait] for other micro-tasks to /// [notify]. /// -/// [ConditionVariable] is a concurrency primitive that allows one micro-task to +/// [Notifier] is a concurrency primitive that allows one micro-task to /// wait for notification from another micro-task. The [Future] return from /// [wait] will be completed the next time [notify] is called. /// /// ```dart /// var weather = 'rain'; -/// final cond = ConditionVariable(); +/// final notifier = Notifier(); /// /// // Create a micro task to fetch the weather /// scheduleMicrotask(() async { /// // Infinitely loop that just keeps the weather up-to-date /// while (true) { /// weather = await getWeather(); -/// cond.notify(); +/// notifier.notify(); /// /// // Sleep 5s before updating the weather again /// await Future.delayed(Duration(seconds: 5)); @@ -31,31 +31,33 @@ import 'package:meta/meta.dart'; /// /// // Wait for sunny weather /// while (weather != 'sunny') { -/// await cond.wait; +/// await notifier.wait; /// } /// ``` // TODO: Apply `final` when language version for this library is bumped to 3.0 @sealed -class ConditionVariable { +class Notifier { var _completer = Completer(); - /// Complete all futures previously returned by [wait]. + /// Notify everybody waiting for notification. /// - /// Calls to [wait] after this call, will not be resolved, until the next time - /// [notify] is called. + /// This will complete all futures previously returned by [wait]. + /// Calls to [wait] after this call, will not be resolved, until the + /// next time [notify] is called. void notify() { if (!_completer.isCompleted) { _completer.complete(); } } - /// Returns a [Future] that will complete the next time [notify] is called. + /// Wait for notification. /// - /// This will always return an unresolved [Future]. Once [notify] is called - /// the future will be completed, and any new calls to [wait] will return a - /// new future. This future will also be unresolved, until [notify] is called. + /// Returns a [Future] that will complete the next time [notify] is called. /// - /// The [Future] return from this condition variable will never throw. + /// The [Future] returned will always be unresolved, and it will never throw. + /// Once [notify] is called the future will be completed, and any new calls + /// to [wait] will return a new future. This new future will also be + /// unresolved, until [notify] is called. Future get wait { if (_completer.isCompleted) { _completer = Completer(); diff --git a/lib/src/stream_extensions.dart b/lib/src/stream_extensions.dart index b2a6d64..30593bb 100644 --- a/lib/src/stream_extensions.dart +++ b/lib/src/stream_extensions.dart @@ -4,7 +4,7 @@ import 'dart:async'; -import 'condition_variable.dart'; +import 'notifier.dart'; /// Utility extensions on [Stream]. extension StreamExtensions on Stream { @@ -81,38 +81,55 @@ extension StreamExtensions on Stream { return controller.stream; } - /// Invoke [each] for each item in this stream, and wait for the [Future] - /// returned by [each] to be resolved. Running no more than [concurrency] - /// number of [each] calls at the same time. + /// Call [each] for each item in this stream with [maxParallel] invocations. /// - /// This function will wait for the futures returned by [each] to be resolved - /// before completing. If any [each] invocation throws, [boundedForEach] stop - /// subsequent calls + /// This method will invoke [each] for each item in this stream, and wait for + /// all futures from [each] to be resolved. [parallelForEach] will call [each] + /// in parallel, but never more then [maxParallel]. /// - /// will - /// call [onError], if [onError] throws (it does) - /// continue subsequent [each] calls, ignore additional errors and throw the - /// first error encountered. - Future boundedForEach( - int concurrency, + /// If [each] throws and [onError] rethrows (default behavior), then + /// [parallelForEach] will wait for ongoing [each] invocations to finish, + /// before throw the first error. + /// + /// If [onError] does not throw, then iteration will not be interrupted and + /// errors from [each] will be ignored. + /// + /// ```dart + /// // Count size of all files in the current folder + /// var folderSize = 0; + /// // Use parallelForEach to read at-most 5 files at the same time. + /// await Directory.current.list().parallelForEach(5, (item) async { + /// if (item is File) { + /// final bytes = await item.readAsBytes(); + /// folderSize += bytes.length; + /// } + /// }); + /// print('Folder size: $folderSize'); + /// ``` + Future parallelForEach( + int maxParallel, FutureOr Function(T item) each, { - FutureOr Function(Object e, StackTrace? st) onError = _throwOnError, + FutureOr Function(Object e, StackTrace? st) onError = Future.error, }) async { + // Track the first error, so we rethrow when we're done. Object? firstError; StackTrace? firstStackTrace; + // Track number of running items. var running = 0; - final wakeUp = ConditionVariable(); + final itemDone = Notifier(); try { var doBreak = false; await for (final item in this) { + // For each item we increment [running] and call [each] running += 1; - scheduleMicrotask(() async { + unawaited(() async { try { await each(item); } catch (e, st) { try { + // If [onError] doesn't throw, we'll just continue. await onError(e, st); } catch (e, st) { doBreak = true; @@ -122,29 +139,31 @@ extension StreamExtensions on Stream { } } } finally { + // When [each] is done, we decrement [running] and notify running -= 1; - wakeUp.notify(); + itemDone.notify(); } - }); + }()); - if (running >= concurrency) { - await wakeUp.wait; + if (running >= maxParallel) { + await itemDone.wait; } if (doBreak) { break; } } } finally { - while (running >= concurrency) { - await wakeUp.wait; + // Wait for all items to be finished + while (running > 0) { + await itemDone.wait; } } + // If an error happened, then we rethrow the first one. final firstError_ = firstError; - if (firstError_ != null) { - return Future.error(firstError_, firstStackTrace); + final firstStackTrace_ = firstStackTrace; + if (firstError_ != null && firstStackTrace_ != null) { + Error.throwWithStackTrace(firstError_, firstStackTrace_); } } } - -Future _throwOnError(Object e, StackTrace? st) => Future.error(e, st);