Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Commit

Permalink
Renamed Condition -> ConditionVariable
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasfj committed Aug 16, 2023
1 parent 06a0e66 commit 45c6b66
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 34 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
## 2.12.0-wip

- Require Dart 2.19
- Add `Condition`, allowing tasks to wait for `Condition.wait`, which is
completed the next time `Condition.notify` is called by another task.
- Add `ConditionVariable`, allowing tasks to wait for `ConditionVariable.wait`,
which is completed the next time `ConditionVariable.notify` is called.
- Added `StreamExtensions.boundedForEach(N, each)` to enable concurrently
running a micro-task for each item in a stream, while never running more
than `N` micro-tasks concurrently.
Expand Down
2 changes: 1 addition & 1 deletion lib/async.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export 'src/async_memoizer.dart';
export 'src/byte_collector.dart';
export 'src/cancelable_operation.dart';
export 'src/chunked_stream_reader.dart';
export 'src/condition.dart';
export 'src/condition_variable.dart';
export 'src/delegate/event_sink.dart';
export 'src/delegate/future.dart';
export 'src/delegate/sink.dart';
Expand Down
14 changes: 7 additions & 7 deletions lib/src/condition.dart → lib/src/condition_variable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@ import 'dart:async';

import 'package:meta/meta.dart';

/// A [Condition] allows micro-tasks to [wait] for other micro-tasks to
/// A [ConditionVariable] allows micro-tasks to [wait] for other micro-tasks to
/// [notify].
///
/// [Condition] is a concurrency primitive that allows one micro-task to
/// [ConditionVariable] is a concurrency primitive that allows one micro-task to
/// wait for notification from another micro-task. The [Future] return from
/// [wait] will be completed the next time [notify] is called.
///
/// ```dart
/// var weather = 'rain';
/// final condition = Condition();
/// final cond = ConditionVariable();
///
/// // Create a micro task to fetch the weather
/// scheduleMicrotask(() async {
/// // Infinitely loop that just keeps the weather up-to-date
/// while (true) {
/// weather = await getWeather();
/// condition.notify();
/// cond.notify();
///
/// // Sleep 5s before updating the weather again
/// await Future.delayed(Duration(seconds: 5));
Expand All @@ -31,12 +31,12 @@ import 'package:meta/meta.dart';
///
/// // Wait for sunny weather
/// while (weather != 'sunny') {
/// await condition.wait;
/// await cond.wait;
/// }
/// ```
// TODO: Apply `final` when language version for this library is bumped to 3.0
@sealed
class Condition {
class ConditionVariable {
var _completer = Completer<void>();

/// Complete all futures previously returned by [wait].
Expand All @@ -55,7 +55,7 @@ class Condition {
/// the future will be completed, and any new calls to [wait] will return a
/// new future. This future will also be unresolved, until [notify] is called.
///
/// The [Future] return from this condition will never throw.
/// The [Future] return from this condition variable will never throw.
Future<void> get wait {
if (_completer.isCompleted) {
_completer = Completer();
Expand Down
67 changes: 43 additions & 24 deletions lib/src/stream_extensions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import 'dart:async';

import 'condition.dart';
import 'condition_variable.dart';

/// Utility extensions on [Stream].
extension StreamExtensions<T> on Stream<T> {
Expand Down Expand Up @@ -86,46 +86,65 @@ extension StreamExtensions<T> on Stream<T> {
/// number of [each] calls at the same time.
///
/// This function will wait for the futures returned by [each] to be resolved
/// before completing. If any [each] invocation throws, [boundedForEach] will
/// before completing. If any [each] invocation throws, [boundedForEach] stop
/// subsequent calls
///
/// will
/// call [onError], if [onError] throws (it does)
/// continue subsequent [each] calls, ignore additional errors and throw the
/// first error encountered.
Future<void> boundedForEach(
int concurrency,
FutureOr<void> Function(T item) each,
) async {
FutureOr<void> Function(T item) each, {
FutureOr<void> Function(Object e, StackTrace? st) onError = _throwOnError,
}) async {
Object? firstError;
StackTrace? firstStackTrace;

var running = 0;
final wakeUp = Condition();
await for (final item in this) {
running += 1;
scheduleMicrotask(() async {
try {
await each(item);
} catch (e, st) {
if (firstError == null) {
firstError = e;
firstStackTrace = st;
final wakeUp = ConditionVariable();

try {
var doBreak = false;
await for (final item in this) {
running += 1;
scheduleMicrotask(() async {
try {
await each(item);
} catch (e, st) {
try {
await onError(e, st);
} catch (e, st) {
doBreak = true;
if (firstError == null) {
firstError = e;
firstStackTrace = st;
}
}
} finally {
running -= 1;
wakeUp.notify();
}
} finally {
running -= 1;
wakeUp.notify();
}
});
});

if (running >= concurrency) {
if (running >= concurrency) {
await wakeUp.wait;
}
if (doBreak) {
break;
}
}
} finally {
while (running >= concurrency) {
await wakeUp.wait;
}
}

while (running >= concurrency) {
await wakeUp.wait;
}

final firstError_ = firstError;
if (firstError_ != null) {
return Future.error(firstError_, firstStackTrace);
}
}
}

Future<void> _throwOnError(Object e, StackTrace? st) => Future.error(e, st);

0 comments on commit 45c6b66

Please sign in to comment.