Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Commit

Permalink
Added Condition and boundedForEach
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasfj committed Aug 16, 2023
1 parent b65622a commit 06a0e66
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 0 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
## 2.12.0-wip

- Require Dart 2.19
- Add `Condition`, allowing tasks to wait for `Condition.wait`, which is
completed the next time `Condition.notify` is called by another task.
- Added `StreamExtensions.boundedForEach(N, each)` to enable concurrently
running a micro-task for each item in a stream, while never running more
than `N` micro-tasks concurrently.

## 2.11.0

Expand Down
1 change: 1 addition & 0 deletions lib/async.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export 'src/async_memoizer.dart';
export 'src/byte_collector.dart';
export 'src/cancelable_operation.dart';
export 'src/chunked_stream_reader.dart';
export 'src/condition.dart';
export 'src/delegate/event_sink.dart';
export 'src/delegate/future.dart';
export 'src/delegate/sink.dart';
Expand Down
65 changes: 65 additions & 0 deletions lib/src/condition.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:meta/meta.dart';

/// A [Condition] allows micro-tasks to [wait] for other micro-tasks to
/// [notify].
///
/// [Condition] is a concurrency primitive that allows one micro-task to
/// wait for notification from another micro-task. The [Future] return from
/// [wait] will be completed the next time [notify] is called.
///
/// ```dart
/// var weather = 'rain';
/// final condition = Condition();
///
/// // Create a micro task to fetch the weather
/// scheduleMicrotask(() async {
/// // Infinitely loop that just keeps the weather up-to-date
/// while (true) {
/// weather = await getWeather();
/// condition.notify();
///
/// // Sleep 5s before updating the weather again
/// await Future.delayed(Duration(seconds: 5));
/// }
/// });
///
/// // Wait for sunny weather
/// while (weather != 'sunny') {
/// await condition.wait;
/// }
/// ```
// TODO: Apply `final` when language version for this library is bumped to 3.0
@sealed
class Condition {
var _completer = Completer<void>();

/// Complete all futures previously returned by [wait].
///
/// Calls to [wait] after this call, will not be resolved, until the next time
/// [notify] is called.
void notify() {
if (!_completer.isCompleted) {
_completer.complete();
}
}

/// Returns a [Future] that will complete the next time [notify] is called.
///
/// This will always return an unresolved [Future]. Once [notify] is called
/// the future will be completed, and any new calls to [wait] will return a
/// new future. This future will also be unresolved, until [notify] is called.
///
/// The [Future] return from this condition will never throw.
Future<void> get wait {
if (_completer.isCompleted) {
_completer = Completer();
}
return _completer.future;
}
}
50 changes: 50 additions & 0 deletions lib/src/stream_extensions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import 'dart:async';

import 'condition.dart';

/// Utility extensions on [Stream].
extension StreamExtensions<T> on Stream<T> {
/// Creates a stream whose elements are contiguous slices of `this`.
Expand Down Expand Up @@ -78,4 +80,52 @@ extension StreamExtensions<T> on Stream<T> {
..onCancel = subscription.cancel;
return controller.stream;
}

/// Invoke [each] for each item in this stream, and wait for the [Future]
/// returned by [each] to be resolved. Running no more than [concurrency]
/// number of [each] calls at the same time.
///
/// This function will wait for the futures returned by [each] to be resolved
/// before completing. If any [each] invocation throws, [boundedForEach] will
/// continue subsequent [each] calls, ignore additional errors and throw the
/// first error encountered.
Future<void> boundedForEach(
int concurrency,
FutureOr<void> Function(T item) each,
) async {
Object? firstError;
StackTrace? firstStackTrace;

var running = 0;
final wakeUp = Condition();
await for (final item in this) {
running += 1;
scheduleMicrotask(() async {
try {
await each(item);
} catch (e, st) {
if (firstError == null) {
firstError = e;
firstStackTrace = st;
}
} finally {
running -= 1;
wakeUp.notify();
}
});

if (running >= concurrency) {
await wakeUp.wait;
}
}

while (running >= concurrency) {
await wakeUp.wait;
}

final firstError_ = firstError;
if (firstError_ != null) {
return Future.error(firstError_, firstStackTrace);
}
}
}

0 comments on commit 06a0e66

Please sign in to comment.