Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Commit

Permalink
Renaming ConditionVariable -> Notifier, boundedForEach -> parallelFor…
Browse files Browse the repository at this point in the history
…Each
  • Loading branch information
jonasfj committed Aug 21, 2023
1 parent 45c6b66 commit 0561ab2
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 44 deletions.
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
## 2.12.0-wip

- Require Dart 2.19
- Add `ConditionVariable`, allowing tasks to wait for `ConditionVariable.wait`,
which is completed the next time `ConditionVariable.notify` is called.
- Added `StreamExtensions.boundedForEach(N, each)` to enable concurrently
- Add `Notifier`, allowing micro-tasks to wait for `Notifier.wait`,
which is completed the next time `Notifier.notify()` is called.
- Added `StreamExtensions.parallelForEach(N, each)` to enable concurrently
running a micro-task for each item in a stream, while never running more
than `N` micro-tasks concurrently.

Expand Down
2 changes: 1 addition & 1 deletion lib/async.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export 'src/async_memoizer.dart';
export 'src/byte_collector.dart';
export 'src/cancelable_operation.dart';
export 'src/chunked_stream_reader.dart';
export 'src/condition_variable.dart';
export 'src/delegate/event_sink.dart';
export 'src/delegate/future.dart';
export 'src/delegate/sink.dart';
Expand All @@ -23,6 +22,7 @@ export 'src/delegate/stream_sink.dart';
export 'src/delegate/stream_subscription.dart';
export 'src/future_group.dart';
export 'src/lazy_stream.dart';
export 'src/notifier.dart';
export 'src/null_stream_sink.dart';
export 'src/restartable_timer.dart';
export 'src/result/error.dart';
Expand Down
30 changes: 16 additions & 14 deletions lib/src/condition_variable.dart → lib/src/notifier.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@ import 'dart:async';

import 'package:meta/meta.dart';

/// A [ConditionVariable] allows micro-tasks to [wait] for other micro-tasks to
/// A [Notifier] allows micro-tasks to [wait] for other micro-tasks to
/// [notify].
///
/// [ConditionVariable] is a concurrency primitive that allows one micro-task to
/// [Notifier] is a concurrency primitive that allows one micro-task to
/// wait for notification from another micro-task. The [Future] return from
/// [wait] will be completed the next time [notify] is called.
///
/// ```dart
/// var weather = 'rain';
/// final cond = ConditionVariable();
/// final notifier = Notifier();
///
/// // Create a micro task to fetch the weather
/// scheduleMicrotask(() async {
/// // Infinitely loop that just keeps the weather up-to-date
/// while (true) {
/// weather = await getWeather();
/// cond.notify();
/// notifier.notify();
///
/// // Sleep 5s before updating the weather again
/// await Future.delayed(Duration(seconds: 5));
Expand All @@ -31,31 +31,33 @@ import 'package:meta/meta.dart';
///
/// // Wait for sunny weather
/// while (weather != 'sunny') {
/// await cond.wait;
/// await notifier.wait;
/// }
/// ```
// TODO: Apply `final` when language version for this library is bumped to 3.0
@sealed
class ConditionVariable {
class Notifier {
var _completer = Completer<void>();

/// Complete all futures previously returned by [wait].
/// Notify everybody waiting for notification.
///
/// Calls to [wait] after this call, will not be resolved, until the next time
/// [notify] is called.
/// This will complete all futures previously returned by [wait].
/// Calls to [wait] after this call, will not be resolved, until the
/// next time [notify] is called.
void notify() {
if (!_completer.isCompleted) {
_completer.complete();
}
}

/// Returns a [Future] that will complete the next time [notify] is called.
/// Wait for notification.
///
/// This will always return an unresolved [Future]. Once [notify] is called
/// the future will be completed, and any new calls to [wait] will return a
/// new future. This future will also be unresolved, until [notify] is called.
/// Returns a [Future] that will complete the next time [notify] is called.
///
/// The [Future] return from this condition variable will never throw.
/// The [Future] returned will always be unresolved, and it will never throw.
/// Once [notify] is called the future will be completed, and any new calls
/// to [wait] will return a new future. This new future will also be
/// unresolved, until [notify] is called.
Future<void> get wait {
if (_completer.isCompleted) {
_completer = Completer();
Expand Down
71 changes: 45 additions & 26 deletions lib/src/stream_extensions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import 'dart:async';

import 'condition_variable.dart';
import 'notifier.dart';

/// Utility extensions on [Stream].
extension StreamExtensions<T> on Stream<T> {
Expand Down Expand Up @@ -81,38 +81,55 @@ extension StreamExtensions<T> on Stream<T> {
return controller.stream;
}

/// Invoke [each] for each item in this stream, and wait for the [Future]
/// returned by [each] to be resolved. Running no more than [concurrency]
/// number of [each] calls at the same time.
/// Call [each] for each item in this stream with [maxParallel] invocations.
///
/// This function will wait for the futures returned by [each] to be resolved
/// before completing. If any [each] invocation throws, [boundedForEach] stop
/// subsequent calls
/// This method will invoke [each] for each item in this stream, and wait for
/// all futures from [each] to be resolved. [parallelForEach] will call [each]
/// in parallel, but never more then [maxParallel].
///
/// will
/// call [onError], if [onError] throws (it does)
/// continue subsequent [each] calls, ignore additional errors and throw the
/// first error encountered.
Future<void> boundedForEach(
int concurrency,
/// If [each] throws and [onError] rethrows (default behavior), then
/// [parallelForEach] will wait for ongoing [each] invocations to finish,
/// before throw the first error.
///
/// If [onError] does not throw, then iteration will not be interrupted and
/// errors from [each] will be ignored.
///
/// ```dart
/// // Count size of all files in the current folder
/// var folderSize = 0;
/// // Use parallelForEach to read at-most 5 files at the same time.
/// await Directory.current.list().parallelForEach(5, (item) async {
/// if (item is File) {
/// final bytes = await item.readAsBytes();
/// folderSize += bytes.length;
/// }
/// });
/// print('Folder size: $folderSize');
/// ```
Future<void> parallelForEach(
int maxParallel,
FutureOr<void> Function(T item) each, {
FutureOr<void> Function(Object e, StackTrace? st) onError = _throwOnError,
FutureOr<void> Function(Object e, StackTrace? st) onError = Future.error,
}) async {
// Track the first error, so we rethrow when we're done.
Object? firstError;
StackTrace? firstStackTrace;

// Track number of running items.
var running = 0;
final wakeUp = ConditionVariable();
final itemDone = Notifier();

try {
var doBreak = false;
await for (final item in this) {
// For each item we increment [running] and call [each]
running += 1;
scheduleMicrotask(() async {
unawaited(() async {
try {
await each(item);
} catch (e, st) {
try {
// If [onError] doesn't throw, we'll just continue.
await onError(e, st);
} catch (e, st) {
doBreak = true;
Expand All @@ -122,29 +139,31 @@ extension StreamExtensions<T> on Stream<T> {
}
}
} finally {
// When [each] is done, we decrement [running] and notify
running -= 1;
wakeUp.notify();
itemDone.notify();
}
});
}());

if (running >= concurrency) {
await wakeUp.wait;
if (running >= maxParallel) {
await itemDone.wait;
}
if (doBreak) {
break;
}
}
} finally {
while (running >= concurrency) {
await wakeUp.wait;
// Wait for all items to be finished
while (running > 0) {
await itemDone.wait;
}
}

// If an error happened, then we rethrow the first one.
final firstError_ = firstError;
if (firstError_ != null) {
return Future.error(firstError_, firstStackTrace);
final firstStackTrace_ = firstStackTrace;
if (firstError_ != null && firstStackTrace_ != null) {
Error.throwWithStackTrace(firstError_, firstStackTrace_);
}
}
}

Future<void> _throwOnError(Object e, StackTrace? st) => Future.error(e, st);

0 comments on commit 0561ab2

Please sign in to comment.