diff --git a/.github/ISSUE_TEMPLATE/stream_channel.md b/.github/ISSUE_TEMPLATE/stream_channel.md new file mode 100644 index 000000000..76b599497 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/stream_channel.md @@ -0,0 +1,5 @@ +--- +name: "package:stream_channel" +about: "Create a bug or file a feature request against package:stream_channel." +labels: "package:stream_channel" +--- \ No newline at end of file diff --git a/.github/labeler.yml b/.github/labeler.yml index 25efb2a1f..64585f35a 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -116,6 +116,10 @@ - changed-files: - any-glob-to-any-file: 'pkgs/stack_trace/**' +'package:stream_channel': + - changed-files: + - any-glob-to-any-file: 'pkgs/stream_channel/**' + 'package:stream_transform': - changed-files: - any-glob-to-any-file: 'pkgs/stream_transform/**' diff --git a/.github/workflows/stream_channel.yaml b/.github/workflows/stream_channel.yaml new file mode 100644 index 000000000..c39424dc5 --- /dev/null +++ b/.github/workflows/stream_channel.yaml @@ -0,0 +1,74 @@ +name: package:stream_channel + +on: + # Run on PRs and pushes to the default branch. + push: + branches: [ main ] + paths: + - '.github/workflows/stream_channel.yaml' + - 'pkgs/stream_channel/**' + pull_request: + branches: [ main ] + paths: + - '.github/workflows/stream_channel.yaml' + - 'pkgs/stream_channel/**' + schedule: + - cron: "0 0 * * 0" + +env: + PUB_ENVIRONMENT: bot.github + +defaults: + run: + working-directory: pkgs/stream_channel/ + +jobs: + # Check code formatting and static analysis on a single OS (linux) + # against Dart dev. + analyze: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + sdk: [dev] + steps: + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 + - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94 + with: + sdk: ${{ matrix.sdk }} + - id: install + name: Install dependencies + run: dart pub get + - name: Check formatting + run: dart format --output=none --set-exit-if-changed . + if: always() && steps.install.outcome == 'success' + - name: Analyze code + run: dart analyze --fatal-infos + if: always() && steps.install.outcome == 'success' + + # Run tests on a matrix consisting of two dimensions: + # 1. OS: ubuntu-latest, (macos-latest, windows-latest) + # 2. release channel: dev + test: + needs: analyze + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + # Add macos-latest and/or windows-latest if relevant for this package. + os: [ubuntu-latest] + sdk: [3.3, dev] + steps: + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 + - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94 + with: + sdk: ${{ matrix.sdk }} + - id: install + name: Install dependencies + run: dart pub get + - name: Run VM tests + run: dart test --platform vm + if: always() && steps.install.outcome == 'success' + - name: Run Chrome tests + run: dart test --platform chrome + if: always() && steps.install.outcome == 'success' diff --git a/README.md b/README.md index 563f90dea..e58417bb7 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,7 @@ don't naturally belong to other topic monorepos (like | [source_span](pkgs/source_span/) | Provides a standard representation for source code locations and spans. | [![package issues](https://img.shields.io/badge/package:source_span-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Asource_span) | [![pub package](https://img.shields.io/pub/v/source_span.svg)](https://pub.dev/packages/source_span) | | [sse](pkgs/sse/) | Provides client and server functionality for setting up bi-directional communication through Server Sent Events (SSE) and corresponding POST requests. | [![package issues](https://img.shields.io/badge/package:sse-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Asse) | [![pub package](https://img.shields.io/pub/v/sse.svg)](https://pub.dev/packages/sse) | | [stack_trace](pkgs/stack_trace/) | A package for manipulating stack traces and printing them readably. | [![package issues](https://img.shields.io/badge/package:stack_trace-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Astack_trace) | [![pub package](https://img.shields.io/pub/v/stack_trace.svg)](https://pub.dev/packages/stack_trace) | +| [stream_channel](pkgs/stream_channel/) | An abstraction for two-way communication channels based on the Dart Stream class. | [![package issues](https://img.shields.io/badge/package:stream_channel-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Astream_channel) | [![pub package](https://img.shields.io/pub/v/stream_channel.svg)](https://pub.dev/packages/stream_channel) | | [stream_transform](pkgs/stream_transform/) | A collection of utilities to transform and manipulate streams. | [![package issues](https://img.shields.io/badge/package:stream_transform-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Astream_transform) | [![pub package](https://img.shields.io/pub/v/stream_transform.svg)](https://pub.dev/packages/stream_transform) | | [term_glyph](pkgs/term_glyph/) | Useful Unicode glyphs and ASCII substitutes. | [![package issues](https://img.shields.io/badge/package:term_glyph-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aterm_glyph) | [![pub package](https://img.shields.io/pub/v/term_glyph.svg)](https://pub.dev/packages/term_glyph) | | [test_reflective_loader](pkgs/test_reflective_loader/) | Support for discovering tests and test suites using reflection. | [![package issues](https://img.shields.io/badge/package:test_reflective_loader-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Atest_reflective_loader) | [![pub package](https://img.shields.io/pub/v/test_reflective_loader.svg)](https://pub.dev/packages/test_reflective_loader) | diff --git a/pkgs/stream_channel/.gitignore b/pkgs/stream_channel/.gitignore new file mode 100644 index 000000000..1447012ea --- /dev/null +++ b/pkgs/stream_channel/.gitignore @@ -0,0 +1,10 @@ +.buildlog +.dart_tool/ +.DS_Store +.idea +.pub/ +.settings/ +build/ +packages +.packages +pubspec.lock diff --git a/pkgs/stream_channel/AUTHORS b/pkgs/stream_channel/AUTHORS new file mode 100644 index 000000000..e8063a8cd --- /dev/null +++ b/pkgs/stream_channel/AUTHORS @@ -0,0 +1,6 @@ +# Below is a list of people and organizations that have contributed +# to the project. Names should be added to the list like so: +# +# Name/Organization + +Google Inc. diff --git a/pkgs/stream_channel/CHANGELOG.md b/pkgs/stream_channel/CHANGELOG.md new file mode 100644 index 000000000..30f7d32bc --- /dev/null +++ b/pkgs/stream_channel/CHANGELOG.md @@ -0,0 +1,158 @@ +## 2.1.3 + +* Require Dart 3.3 +* Move to `dart-lang/tools` monorepo. + +## 2.1.2 + +* Require Dart 2.19 +* Add an example. +* Fix a race condition in `IsolateChannel.connectReceive()` where the channel + could hang forever if its sink was closed before the connection was established. + +## 2.1.1 + +* Require Dart 2.14 +* Populate the pubspec `repository` field. +* Handle multichannel messages where the ID element is a `double` at runtime + instead of an `int`. When reading an array with `dart2wasm` numbers within the + array are parsed as `double`. + +## 2.1.0 + +* Stable release for null safety. + +## 2.0.0 + +**Breaking changes** + +* `IsolateChannel` requires a separate import + `package:stram_channel/isolate_channel.dart`. + `package:stream_channel/stream_channel.dart` will now not trigger any platform + concerns due to importing `dart:isolate`. +* Remove `JsonDocumentTransformer` class. The `jsonDocument` top level is still + available. +* Remove `StreamChannelTransformer.typed`. Use `.cast` on the transformed + channel instead. +* Change `Future` returns to `Future`. + +## 1.7.0 + +* Make `IsolateChannel` available through + `package:stream_channel/isolate_channel.dart`. This will be the required + import in the next release. +* Require `2.0.0` or newer SDK. +* Internal style changes. + +## 1.6.8 + +* Set max SDK version to `<3.0.0`, and adjust other dependencies. + +## 1.6.7+1 + +* Fix Dart 2 runtime types in `IsolateChannel`. + +## 1.6.7 + +* Update SDK version to 2.0.0-dev.17.0. +* Add a type argument to `MultiChannel`. + +## 1.6.6 + +* Fix a Dart 2 issue with inner stream transformation in `GuaranteeChannel`. + +* Fix a Dart 2 issue with `StreamChannelTransformer.fromCodec()`. + +## 1.6.5 + +* Fix an issue with `JsonDocumentTransformer.bind` where it created an internal + stream channel which didn't get a properly inferred type for its `sink`. + +## 1.6.4 + +* Fix a race condition in `MultiChannel` where messages from a remote virtual + channel could get dropped if the corresponding local channel wasn't registered + quickly enough. + +## 1.6.3 + +* Use `pumpEventQueue()` from test. + +## 1.6.2 + +* Declare support for `async` 2.0.0. + +## 1.6.1 + +* Fix the type of `StreamChannel.transform()`. This previously inverted the + generic parameters, so it only really worked with transformers where both + generic types were identical. + +## 1.6.0 + +* `Disconnector.disconnect()` now returns a future that completes when all the + inner `StreamSink.close()` futures have completed. + +## 1.5.0 + +* Add `new StreamChannel.withCloseGuarantee()` to provide the specific guarantee + that closing the sink causes the stream to close before it emits any more + events. This is the only guarantee that isn't automatically preserved when + transforming a channel. + +* `StreamChannelTransformer`s provided by the `stream_channel` package now + properly provide the guarantee that closing the sink causes the stream to + close before it emits any more events + +## 1.4.0 + +* Add `StreamChannel.cast()`, which soundly coerces the generic type of a + channel. + +* Add `StreamChannelTransformer.typed()`, which soundly coerces the generic type + of a transformer. + +## 1.3.2 + +* Fix all strong-mode errors and warnings. + +## 1.3.1 + +* Make `IsolateChannel` slightly more efficient. + +* Make `MultiChannel` follow the stream channel rules. + +## 1.3.0 + +* Add `Disconnector`, a transformer that allows the caller to disconnect the + transformed channel. + +## 1.2.0 + +* Add `new StreamChannel.withGuarantees()`, which creates a channel with extra + wrapping to ensure that it obeys the stream channel guarantees. + +* Add `StreamChannelController`, which can be used to create custom + `StreamChannel` objects. + +## 1.1.1 + +* Fix the type annotation for `StreamChannel.transform()`'s parameter. + +## 1.1.0 + +* Add `StreamChannel.transformStream()`, `StreamChannel.transformSink()`, + `StreamChannel.changeStream()`, and `StreamChannel.changeSink()` to support + changing only the stream or only the sink of a channel. + +* Be more explicit about `JsonDocumentTransformer`'s error-handling behavior. + +## 1.0.1 + +* Fix `MultiChannel`'s constructor to take a `StreamChannel`. This is + technically a breaking change, but since 1.0.0 was only released an hour ago, + we're treating it as a bug fix. + +## 1.0.0 + +* Initial version diff --git a/pkgs/stream_channel/LICENSE b/pkgs/stream_channel/LICENSE new file mode 100644 index 000000000..dbd2843a0 --- /dev/null +++ b/pkgs/stream_channel/LICENSE @@ -0,0 +1,27 @@ +Copyright 2015, the Dart project authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + * Neither the name of Google LLC nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/pkgs/stream_channel/README.md b/pkgs/stream_channel/README.md new file mode 100644 index 000000000..3677ccf5e --- /dev/null +++ b/pkgs/stream_channel/README.md @@ -0,0 +1,20 @@ +[![Build Status](https://github.com/dart-lang/tools/actions/workflows/stream_channel.yaml/badge.svg)](https://github.com/dart-lang/tools/actions/workflows/stream_channel.yaml) +[![pub package](https://img.shields.io/pub/v/stream_channel.svg)](https://pub.dev/packages/stream_channel) +[![package publisher](https://img.shields.io/pub/publisher/stream_channel.svg)](https://pub.dev/packages/stream_channel/publisher) + +This package exposes the `StreamChannel` interface, which represents a two-way +communication channel. Each `StreamChannel` exposes a `Stream` for receiving +data and a `StreamSink` for sending it. + +`StreamChannel` helps abstract communication logic away from the underlying +protocol. For example, the [`test`][test] package re-uses its test suite +communication protocol for both WebSocket connections to browser suites and +Isolate connections to VM tests. + +[test]: https://pub.dev/packages/test + +This package also contains utilities for dealing with `StreamChannel`s and with +two-way communications in general. For documentation of these utilities, see +[the API docs][api]. + +[api]: https://pub.dev/documentation/stream_channel/latest/ diff --git a/pkgs/stream_channel/analysis_options.yaml b/pkgs/stream_channel/analysis_options.yaml new file mode 100644 index 000000000..44cda4da2 --- /dev/null +++ b/pkgs/stream_channel/analysis_options.yaml @@ -0,0 +1,5 @@ +include: package:dart_flutter_team_lints/analysis_options.yaml + +analyzer: + language: + strict-casts: true diff --git a/pkgs/stream_channel/example/example.dart b/pkgs/stream_channel/example/example.dart new file mode 100644 index 000000000..b41d8d94a --- /dev/null +++ b/pkgs/stream_channel/example/example.dart @@ -0,0 +1,110 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; +import 'dart:isolate'; + +import 'package:stream_channel/isolate_channel.dart'; +import 'package:stream_channel/stream_channel.dart'; + +Future main() async { + // A StreamChannel, is in simplest terms, a wrapper around a Stream and + // a StreamSink. For example, you can create a channel that wraps standard + // IO: + var stdioChannel = StreamChannel(stdin, stdout); + stdioChannel.sink.add('Hello!\n'.codeUnits); + + // Like a Stream can be transformed with a StreamTransformer, a + // StreamChannel can be transformed with a StreamChannelTransformer. + // For example, we can handle standard input as strings: + var stringChannel = stdioChannel + .transform(StreamChannelTransformer.fromCodec(utf8)) + .transformStream(const LineSplitter()); + stringChannel.sink.add('world!\n'); + + // You can implement StreamChannel by extending StreamChannelMixin, but + // it's much easier to use a StreamChannelController. A controller has two + // StreamChannel members: `local` and `foreign`. The creator of a + // controller should work with the `local` channel, while the recipient should + // work with the `foreign` channel, and usually will not have direct access to + // the underlying controller. + var ctrl = StreamChannelController(); + ctrl.local.stream.listen((event) { + // Do something useful here... + }); + + // You can also pipe events from one channel to another. + ctrl + ..foreign.pipe(stringChannel) + ..local.sink.add('Piped!\n'); + await ctrl.local.sink.close(); + + // The StreamChannel interface provides several guarantees, which can be + // found here: + // https://pub.dev/documentation/stream_channel/latest/stream_channel/StreamChannel-class.html + // + // By calling `StreamChannel.withGuarantees()`, you can create a + // StreamChannel that provides all guarantees. + var dummyCtrl0 = StreamChannelController(); + var guaranteedChannel = StreamChannel.withGuarantees( + dummyCtrl0.foreign.stream, dummyCtrl0.foreign.sink); + + // To close a StreamChannel, use `sink.close()`. + await guaranteedChannel.sink.close(); + + // A MultiChannel multiplexes multiple virtual channels across a single + // underlying transport layer. For example, an application listening over + // standard I/O can still support multiple clients if it has a mechanism to + // separate events from different clients. + // + // A MultiChannel splits events into numbered channels, which are + // instances of VirtualChannel. + var dummyCtrl1 = StreamChannelController(); + var multiChannel = MultiChannel(dummyCtrl1.foreign); + var channel1 = multiChannel.virtualChannel(); + await multiChannel.sink.close(); + + // The client/peer should also create its own MultiChannel, connected to + // the underlying transport, use the corresponding ID's to handle events in + // their respective channels. It is up to you how to communicate channel ID's + // across different endpoints. + var dummyCtrl2 = StreamChannelController(); + var multiChannel2 = MultiChannel(dummyCtrl2.foreign); + var channel2 = multiChannel2.virtualChannel(channel1.id); + await channel2.sink.close(); + await multiChannel2.sink.close(); + + // Multiple instances of a Dart application can communicate easily across + // `SendPort`/`ReceivePort` pairs by means of the `IsolateChannel` class. + // Typically, one endpoint will create a `ReceivePort`, and call the + // `IsolateChannel.connectReceive` constructor. The other endpoint will be + // given the corresponding `SendPort`, and then call + // `IsolateChannel.connectSend`. + var recv = ReceivePort(); + var recvChannel = IsolateChannel.connectReceive(recv); + var sendChannel = IsolateChannel.connectSend(recv.sendPort); + + // You must manually close `IsolateChannel` sinks, however. + await recvChannel.sink.close(); + await sendChannel.sink.close(); + + // You can use the `Disconnector` transformer to cause a channel to act as + // though the remote end of its transport had disconnected. + var disconnector = Disconnector(); + var disconnectable = stringChannel.transform(disconnector); + disconnectable.sink.add('Still connected!'); + await disconnector.disconnect(); + + // Additionally: + // * The `DelegatingStreamController` class can be extended to build a + // basis for wrapping other `StreamChannel` objects. + // * The `jsonDocument` transformer converts events to/from JSON, using + // the `json` codec from `dart:convert`. + // * `package:json_rpc_2` directly builds on top of + // `package:stream_channel`, so any compatible transport can be used to + // create interactive client/server or peer-to-peer applications (i.e. + // language servers, microservices, etc. +} diff --git a/pkgs/stream_channel/lib/isolate_channel.dart b/pkgs/stream_channel/lib/isolate_channel.dart new file mode 100644 index 000000000..5d9f6e19f --- /dev/null +++ b/pkgs/stream_channel/lib/isolate_channel.dart @@ -0,0 +1,5 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +export 'src/isolate_channel.dart' show IsolateChannel; diff --git a/pkgs/stream_channel/lib/src/close_guarantee_channel.dart b/pkgs/stream_channel/lib/src/close_guarantee_channel.dart new file mode 100644 index 000000000..13432d17d --- /dev/null +++ b/pkgs/stream_channel/lib/src/close_guarantee_channel.dart @@ -0,0 +1,91 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:async/async.dart'; + +import '../stream_channel.dart'; + +/// A [StreamChannel] that specifically enforces the stream channel guarantee +/// that closing the sink causes the stream to close before it emits any more +/// events +/// +/// This is exposed via [StreamChannel.withCloseGuarantee]. +class CloseGuaranteeChannel extends StreamChannelMixin { + @override + Stream get stream => _stream; + late final _CloseGuaranteeStream _stream; + + @override + StreamSink get sink => _sink; + late final _CloseGuaranteeSink _sink; + + /// The subscription to the inner stream. + StreamSubscription? _subscription; + + /// Whether the sink has closed, causing the underlying channel to disconnect. + bool _disconnected = false; + + CloseGuaranteeChannel(Stream innerStream, StreamSink innerSink) { + _sink = _CloseGuaranteeSink(innerSink, this); + _stream = _CloseGuaranteeStream(innerStream, this); + } +} + +/// The stream for [CloseGuaranteeChannel]. +/// +/// This wraps the inner stream to save the subscription on the channel when +/// [listen] is called. +class _CloseGuaranteeStream extends Stream { + /// The inner stream this is delegating to. + final Stream _inner; + + /// The [CloseGuaranteeChannel] this belongs to. + final CloseGuaranteeChannel _channel; + + _CloseGuaranteeStream(this._inner, this._channel); + + @override + StreamSubscription listen(void Function(T)? onData, + {Function? onError, void Function()? onDone, bool? cancelOnError}) { + // If the channel is already disconnected, we shouldn't dispatch anything + // but a done event. + if (_channel._disconnected) { + onData = null; + onError = null; + } + + var subscription = _inner.listen(onData, + onError: onError, onDone: onDone, cancelOnError: cancelOnError); + if (!_channel._disconnected) { + _channel._subscription = subscription; + } + return subscription; + } +} + +/// The sink for [CloseGuaranteeChannel]. +/// +/// This wraps the inner sink to cancel the stream subscription when the sink is +/// canceled. +class _CloseGuaranteeSink extends DelegatingStreamSink { + /// The [CloseGuaranteeChannel] this belongs to. + final CloseGuaranteeChannel _channel; + + _CloseGuaranteeSink(super.inner, this._channel); + + @override + Future close() { + var done = super.close(); + _channel._disconnected = true; + var subscription = _channel._subscription; + if (subscription != null) { + // Don't dispatch anything but a done event. + subscription.onData(null); + subscription.onError(null); + } + return done; + } +} diff --git a/pkgs/stream_channel/lib/src/delegating_stream_channel.dart b/pkgs/stream_channel/lib/src/delegating_stream_channel.dart new file mode 100644 index 000000000..4484a5989 --- /dev/null +++ b/pkgs/stream_channel/lib/src/delegating_stream_channel.dart @@ -0,0 +1,23 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import '../stream_channel.dart'; + +/// A simple delegating wrapper around [StreamChannel]. +/// +/// Subclasses can override individual methods, or use this to expose only +/// [StreamChannel] methods. +class DelegatingStreamChannel extends StreamChannelMixin { + /// The inner channel to which methods are forwarded. + final StreamChannel _inner; + + @override + Stream get stream => _inner.stream; + @override + StreamSink get sink => _inner.sink; + + DelegatingStreamChannel(this._inner); +} diff --git a/pkgs/stream_channel/lib/src/disconnector.dart b/pkgs/stream_channel/lib/src/disconnector.dart new file mode 100644 index 000000000..3414e9c77 --- /dev/null +++ b/pkgs/stream_channel/lib/src/disconnector.dart @@ -0,0 +1,153 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:async/async.dart'; + +import '../stream_channel.dart'; + +/// Allows the caller to force a channel to disconnect. +/// +/// When [disconnect] is called, the channel (or channels) transformed by this +/// transformer will act as though the remote end had disconnected—the stream +/// will emit a done event, and the sink will ignore future inputs. The inner +/// sink will also be closed to notify the remote end of the disconnection. +/// +/// If a channel is transformed after the [disconnect] has been called, it will +/// be disconnected immediately. +class Disconnector implements StreamChannelTransformer { + /// Whether [disconnect] has been called. + bool get isDisconnected => _disconnectMemo.hasRun; + + /// The sinks for transformed channels. + /// + /// Note that we assume that transformed channels provide the stream channel + /// guarantees. This allows us to only track sinks, because we know closing + /// the underlying sink will cause the stream to emit a done event. + final _sinks = <_DisconnectorSink>[]; + + /// Disconnects all channels that have been transformed. + /// + /// Returns a future that completes when all inner sinks' [StreamSink.close] + /// futures have completed. Note that a [StreamController]'s sink won't close + /// until the corresponding stream has a listener. + Future disconnect() => _disconnectMemo.runOnce(() { + var futures = _sinks.map((sink) => sink._disconnect()).toList(); + _sinks.clear(); + return Future.wait(futures, eagerError: true); + }); + final _disconnectMemo = AsyncMemoizer>(); + + @override + StreamChannel bind(StreamChannel channel) { + return channel.changeSink((innerSink) { + var sink = _DisconnectorSink(innerSink); + + if (isDisconnected) { + // Ignore errors here, because otherwise there would be no way for the + // user to handle them gracefully. + sink._disconnect().catchError((_) {}); + } else { + _sinks.add(sink); + } + + return sink; + }); + } +} + +/// A sink wrapper that can force a disconnection. +class _DisconnectorSink implements StreamSink { + /// The inner sink. + final StreamSink _inner; + + @override + Future get done => _inner.done; + + /// Whether [Disconnector.disconnect] has been called. + var _isDisconnected = false; + + /// Whether the user has called [close]. + var _closed = false; + + /// The subscription to the stream passed to [addStream], if a stream is + /// currently being added. + StreamSubscription? _addStreamSubscription; + + /// The completer for the future returned by [addStream], if a stream is + /// currently being added. + Completer? _addStreamCompleter; + + /// Whether we're currently adding a stream with [addStream]. + bool get _inAddStream => _addStreamSubscription != null; + + _DisconnectorSink(this._inner); + + @override + void add(T data) { + if (_closed) throw StateError('Cannot add event after closing.'); + if (_inAddStream) { + throw StateError('Cannot add event while adding stream.'); + } + if (_isDisconnected) return; + + _inner.add(data); + } + + @override + void addError(Object error, [StackTrace? stackTrace]) { + if (_closed) throw StateError('Cannot add event after closing.'); + if (_inAddStream) { + throw StateError('Cannot add event while adding stream.'); + } + if (_isDisconnected) return; + + _inner.addError(error, stackTrace); + } + + @override + Future addStream(Stream stream) { + if (_closed) throw StateError('Cannot add stream after closing.'); + if (_inAddStream) { + throw StateError('Cannot add stream while adding stream.'); + } + if (_isDisconnected) return Future.value(); + + _addStreamCompleter = Completer.sync(); + _addStreamSubscription = stream.listen(_inner.add, + onError: _inner.addError, onDone: _addStreamCompleter!.complete); + return _addStreamCompleter!.future.then((_) { + _addStreamCompleter = null; + _addStreamSubscription = null; + }); + } + + @override + Future close() { + if (_inAddStream) { + throw StateError('Cannot close sink while adding stream.'); + } + + _closed = true; + return _inner.close(); + } + + /// Disconnects this sink. + /// + /// This closes the underlying sink and stops forwarding events. It returns + /// the [StreamSink.close] future for the underlying sink. + Future _disconnect() { + _isDisconnected = true; + var future = _inner.close(); + + if (_inAddStream) { + _addStreamCompleter!.complete(_addStreamSubscription!.cancel()); + _addStreamCompleter = null; + _addStreamSubscription = null; + } + + return future; + } +} diff --git a/pkgs/stream_channel/lib/src/guarantee_channel.dart b/pkgs/stream_channel/lib/src/guarantee_channel.dart new file mode 100644 index 000000000..30ebe2ec6 --- /dev/null +++ b/pkgs/stream_channel/lib/src/guarantee_channel.dart @@ -0,0 +1,207 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:async/async.dart'; + +import '../stream_channel.dart'; + +/// A [StreamChannel] that enforces the stream channel guarantees. +/// +/// This is exposed via [StreamChannel.withGuarantees]. +class GuaranteeChannel extends StreamChannelMixin { + @override + Stream get stream => _streamController.stream; + + @override + StreamSink get sink => _sink; + late final _GuaranteeSink _sink; + + /// The controller for [stream]. + /// + /// This intermediate controller allows us to continue listening for a done + /// event even after the user has canceled their subscription, and to send our + /// own done event when the sink is closed. + late final StreamController _streamController; + + /// The subscription to the inner stream. + StreamSubscription? _subscription; + + /// Whether the sink has closed, causing the underlying channel to disconnect. + bool _disconnected = false; + + GuaranteeChannel(Stream innerStream, StreamSink innerSink, + {bool allowSinkErrors = true}) { + _sink = _GuaranteeSink(innerSink, this, allowErrors: allowSinkErrors); + + // Enforce the single-subscription guarantee by changing a broadcast stream + // to single-subscription. + if (innerStream.isBroadcast) { + innerStream = + innerStream.transform(SingleSubscriptionTransformer()); + } + + _streamController = StreamController( + onListen: () { + // If the sink has disconnected, we've already called + // [_streamController.close]. + if (_disconnected) return; + + _subscription = innerStream.listen(_streamController.add, + onError: _streamController.addError, onDone: () { + _sink._onStreamDisconnected(); + _streamController.close(); + }); + }, + sync: true); + } + + /// Called by [_GuaranteeSink] when the user closes it. + /// + /// The sink closing indicates that the connection is closed, so the stream + /// should stop emitting events. + void _onSinkDisconnected() { + _disconnected = true; + var subscription = _subscription; + if (subscription != null) subscription.cancel(); + _streamController.close(); + } +} + +/// The sink for [GuaranteeChannel]. +/// +/// This wraps the inner sink to ignore events and cancel any in-progress +/// [addStream] calls when the underlying channel closes. +class _GuaranteeSink implements StreamSink { + /// The inner sink being wrapped. + final StreamSink _inner; + + /// The [GuaranteeChannel] this belongs to. + final GuaranteeChannel _channel; + + @override + Future get done => _doneCompleter.future; + final _doneCompleter = Completer(); + + /// Whether connection is disconnected. + /// + /// This can happen because the stream has emitted a done event, or because + /// the user added an error when [_allowErrors] is `false`. + bool _disconnected = false; + + /// Whether the user has called [close]. + bool _closed = false; + + /// The subscription to the stream passed to [addStream], if a stream is + /// currently being added. + StreamSubscription? _addStreamSubscription; + + /// The completer for the future returned by [addStream], if a stream is + /// currently being added. + Completer? _addStreamCompleter; + + /// Whether we're currently adding a stream with [addStream]. + bool get _inAddStream => _addStreamSubscription != null; + + /// Whether errors are passed on to the underlying sink. + /// + /// If this is `false`, any error passed to the sink is piped to [done] and + /// the underlying sink is closed. + final bool _allowErrors; + + _GuaranteeSink(this._inner, this._channel, {bool allowErrors = true}) + : _allowErrors = allowErrors; + + @override + void add(T data) { + if (_closed) throw StateError('Cannot add event after closing.'); + if (_inAddStream) { + throw StateError('Cannot add event while adding stream.'); + } + if (_disconnected) return; + + _inner.add(data); + } + + @override + void addError(Object error, [StackTrace? stackTrace]) { + if (_closed) throw StateError('Cannot add event after closing.'); + if (_inAddStream) { + throw StateError('Cannot add event while adding stream.'); + } + if (_disconnected) return; + + _addError(error, stackTrace); + } + + /// Like [addError], but doesn't check to ensure that an error can be added. + /// + /// This is called from [addStream], so it shouldn't fail if a stream is being + /// added. + void _addError(Object error, [StackTrace? stackTrace]) { + if (_allowErrors) { + _inner.addError(error, stackTrace); + return; + } + + _doneCompleter.completeError(error, stackTrace); + + // Treat an error like both the stream and sink disconnecting. + _onStreamDisconnected(); + _channel._onSinkDisconnected(); + + // Ignore errors from the inner sink. We're already surfacing one error, and + // if the user handles it we don't want them to have another top-level. + _inner.close().catchError((_) {}); + } + + @override + Future addStream(Stream stream) { + if (_closed) throw StateError('Cannot add stream after closing.'); + if (_inAddStream) { + throw StateError('Cannot add stream while adding stream.'); + } + if (_disconnected) return Future.value(); + + _addStreamCompleter = Completer.sync(); + _addStreamSubscription = stream.listen(_inner.add, + onError: _addError, onDone: _addStreamCompleter!.complete); + return _addStreamCompleter!.future.then((_) { + _addStreamCompleter = null; + _addStreamSubscription = null; + }); + } + + @override + Future close() { + if (_inAddStream) { + throw StateError('Cannot close sink while adding stream.'); + } + + if (_closed) return done; + _closed = true; + + if (!_disconnected) { + _channel._onSinkDisconnected(); + _doneCompleter.complete(_inner.close()); + } + + return done; + } + + /// Called by [GuaranteeChannel] when the stream emits a done event. + /// + /// The stream being done indicates that the connection is closed, so the + /// sink should stop forwarding events. + void _onStreamDisconnected() { + _disconnected = true; + if (!_doneCompleter.isCompleted) _doneCompleter.complete(); + + if (!_inAddStream) return; + _addStreamCompleter!.complete(_addStreamSubscription!.cancel()); + _addStreamCompleter = null; + _addStreamSubscription = null; + } +} diff --git a/pkgs/stream_channel/lib/src/isolate_channel.dart b/pkgs/stream_channel/lib/src/isolate_channel.dart new file mode 100644 index 000000000..15c68a41d --- /dev/null +++ b/pkgs/stream_channel/lib/src/isolate_channel.dart @@ -0,0 +1,115 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:isolate'; + +import 'package:async/async.dart'; + +import '../stream_channel.dart'; + +/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair, +/// presumably with another isolate. +/// +/// The remote endpoint doesn't necessarily need to be running an +/// [IsolateChannel]. This can be used with any two ports, although the +/// [StreamChannel] semantics mean that this class will treat them as being +/// paired (for example, closing the [sink] will cause the [stream] to stop +/// emitting events). +/// +/// The underlying isolate ports have no notion of closing connections. This +/// means that [stream] won't close unless [sink] is closed, and that closing +/// [sink] won't cause the remote endpoint to close. Users should take care to +/// ensure that they always close the [sink] of every [IsolateChannel] they use +/// to avoid leaving dangling [ReceivePort]s. +class IsolateChannel extends StreamChannelMixin { + @override + final Stream stream; + @override + final StreamSink sink; + + /// Connects to a remote channel that was created with + /// [IsolateChannel.connectSend]. + /// + /// These constructors establish a connection using only a single + /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the + /// connect constructors. + /// + /// The connection protocol is guaranteed to remain compatible across versions + /// at least until the next major version release. If the protocol is + /// violated, the resulting channel will emit a single value on its stream and + /// then close. + factory IsolateChannel.connectReceive(ReceivePort receivePort) { + // We can't use a [StreamChannelCompleter] here because we need the return + // value to be an [IsolateChannel]. + var isCompleted = false; + var streamCompleter = StreamCompleter(); + var sinkCompleter = StreamSinkCompleter(); + + var channel = IsolateChannel._(streamCompleter.stream, sinkCompleter.sink + .transform(StreamSinkTransformer.fromHandlers(handleDone: (sink) { + if (!isCompleted) { + receivePort.close(); + streamCompleter.setSourceStream(const Stream.empty()); + sinkCompleter.setDestinationSink(NullStreamSink()); + } + sink.close(); + }))); + + // The first message across the ReceivePort should be a SendPort pointing to + // the remote end. If it's not, we'll make the stream emit an error + // complaining. + late StreamSubscription subscription; + subscription = receivePort.listen((message) { + isCompleted = true; + if (message is SendPort) { + var controller = + StreamChannelController(allowForeignErrors: false, sync: true); + SubscriptionStream(subscription).cast().pipe(controller.local.sink); + controller.local.stream + .listen((data) => message.send(data), onDone: receivePort.close); + + streamCompleter.setSourceStream(controller.foreign.stream); + sinkCompleter.setDestinationSink(controller.foreign.sink); + return; + } + + streamCompleter.setError( + StateError('Unexpected Isolate response "$message".'), + StackTrace.current); + sinkCompleter.setDestinationSink(NullStreamSink()); + subscription.cancel(); + }); + + return channel; + } + + /// Connects to a remote channel that was created with + /// [IsolateChannel.connectReceive]. + /// + /// These constructors establish a connection using only a single + /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the + /// connect constructors. + /// + /// The connection protocol is guaranteed to remain compatible across versions + /// at least until the next major version release. + factory IsolateChannel.connectSend(SendPort sendPort) { + var receivePort = ReceivePort(); + sendPort.send(receivePort.sendPort); + return IsolateChannel(receivePort, sendPort); + } + + /// Creates a stream channel that receives messages from [receivePort] and + /// sends them over [sendPort]. + factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) { + var controller = + StreamChannelController(allowForeignErrors: false, sync: true); + receivePort.cast().pipe(controller.local.sink); + controller.local.stream + .listen((data) => sendPort.send(data), onDone: receivePort.close); + return IsolateChannel._(controller.foreign.stream, controller.foreign.sink); + } + + IsolateChannel._(this.stream, this.sink); +} diff --git a/pkgs/stream_channel/lib/src/json_document_transformer.dart b/pkgs/stream_channel/lib/src/json_document_transformer.dart new file mode 100644 index 000000000..3feda43fd --- /dev/null +++ b/pkgs/stream_channel/lib/src/json_document_transformer.dart @@ -0,0 +1,35 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:convert'; + +import 'package:async/async.dart'; + +import '../stream_channel.dart'; + +/// A [StreamChannelTransformer] that transforms JSON documents—strings that +/// contain individual objects encoded as JSON—into decoded Dart objects. +/// +/// This decodes JSON that's emitted by the transformed channel's stream, and +/// encodes objects so that JSON is passed to the transformed channel's sink. +/// +/// If the transformed channel emits invalid JSON, this emits a +/// [FormatException]. If an unencodable object is added to the sink, it +/// synchronously throws a [JsonUnsupportedObjectError]. +final StreamChannelTransformer jsonDocument = + const _JsonDocument(); + +class _JsonDocument implements StreamChannelTransformer { + const _JsonDocument(); + + @override + StreamChannel bind(StreamChannel channel) { + var stream = channel.stream.map(jsonDecode); + var sink = StreamSinkTransformer.fromHandlers( + handleData: (data, sink) { + sink.add(jsonEncode(data)); + }).bind(channel.sink); + return StreamChannel.withCloseGuarantee(stream, sink); + } +} diff --git a/pkgs/stream_channel/lib/src/multi_channel.dart b/pkgs/stream_channel/lib/src/multi_channel.dart new file mode 100644 index 000000000..48942392e --- /dev/null +++ b/pkgs/stream_channel/lib/src/multi_channel.dart @@ -0,0 +1,274 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:async/async.dart'; + +import '../stream_channel.dart'; + +/// A class that multiplexes multiple virtual channels across a single +/// underlying transport layer. +/// +/// This should be connected to another [MultiChannel] on the other end of the +/// underlying channel. It starts with a single default virtual channel, +/// accessible via [stream] and [sink]. Additional virtual channels can be +/// created with [virtualChannel]. +/// +/// When a virtual channel is created by one endpoint, the other must connect to +/// it before messages may be sent through it. The first endpoint passes its +/// [VirtualChannel.id] to the second, which then creates a channel from that id +/// also using [virtualChannel]. For example: +/// +/// ```dart +/// // First endpoint +/// var virtual = multiChannel.virtualChannel(); +/// multiChannel.sink.add({ +/// "channel": virtual.id +/// }); +/// +/// // Second endpoint +/// multiChannel.stream.listen((message) { +/// var virtual = multiChannel.virtualChannel(message["channel"]); +/// // ... +/// }); +/// ``` +/// +/// Sending errors across a [MultiChannel] is not supported. Any errors from the +/// underlying stream will be reported only via the default +/// [MultiChannel.stream]. +/// +/// Each virtual channel may be closed individually. When all of them are +/// closed, the underlying [StreamSink] is closed automatically. +abstract class MultiChannel implements StreamChannel { + /// The default input stream. + /// + /// This connects to the remote [sink]. + @override + Stream get stream; + + /// The default output stream. + /// + /// This connects to the remote [stream]. If this is closed, the remote + /// [stream] will close, but other virtual channels will remain open and new + /// virtual channels may be opened. + @override + StreamSink get sink; + + /// Creates a new [MultiChannel] that sends and receives messages over + /// [inner]. + /// + /// The inner channel must take JSON-like objects. + factory MultiChannel(StreamChannel inner) => _MultiChannel(inner); + + /// Creates a new virtual channel. + /// + /// If [id] is not passed, this creates a virtual channel from scratch. Before + /// it's used, its [VirtualChannel.id] must be sent to the remote endpoint + /// where [virtualChannel] should be called with that id. + /// + /// If [id] is passed, this creates a virtual channel corresponding to the + /// channel with that id on the remote channel. + /// + /// Throws an [ArgumentError] if a virtual channel already exists for [id]. + /// Throws a [StateError] if the underlying channel is closed. + VirtualChannel virtualChannel([int? id]); +} + +/// The implementation of [MultiChannel]. +/// +/// This is private so that [VirtualChannel] can inherit from [MultiChannel] +/// without having to implement all the private members. +class _MultiChannel extends StreamChannelMixin + implements MultiChannel { + /// The inner channel over which all communication is conducted. + /// + /// This will be `null` if the underlying communication channel is closed. + StreamChannel? _inner; + + /// The subscription to [_inner].stream. + StreamSubscription? _innerStreamSubscription; + + @override + Stream get stream => _mainController.foreign.stream; + @override + StreamSink get sink => _mainController.foreign.sink; + + /// The controller for this channel. + final _mainController = StreamChannelController(sync: true); + + /// A map from input IDs to [StreamChannelController]s that should be used to + /// communicate over those channels. + final _controllers = >{}; + + /// Input IDs of controllers in [_controllers] that we've received messages + /// for but that have not yet had a local [virtualChannel] created. + final _pendingIds = {}; + + /// Input IDs of virtual channels that used to exist but have since been + /// closed. + final _closedIds = {}; + + /// The next id to use for a local virtual channel. + /// + /// Ids are used to identify virtual channels. Each message is tagged with an + /// id; the receiving [MultiChannel] uses this id to look up which + /// [VirtualChannel] the message should be dispatched to. + /// + /// The id scheme for virtual channels is somewhat complicated. This is + /// necessary to ensure that there are no conflicts even when both endpoints + /// have virtual channels with the same id; since both endpoints can send and + /// receive messages across each virtual channel, a naïve scheme would make it + /// impossible to tell whether a message was from a channel that originated in + /// the remote endpoint or a reply on a channel that originated in the local + /// endpoint. + /// + /// The trick is that each endpoint only uses odd ids for its own channels. + /// When sending a message over a channel that was created by the remote + /// endpoint, the channel's id plus one is used. This way each [MultiChannel] + /// knows that if an incoming message has an odd id, it's coming from a + /// channel that was originally created remotely, but if it has an even id, + /// it's coming from a channel that was originally created locally. + var _nextId = 1; + + _MultiChannel(StreamChannel inner) : _inner = inner { + // The default connection is a special case which has id 0 on both ends. + // This allows it to begin connected without having to send over an id. + _controllers[0] = _mainController; + _mainController.local.stream.listen( + (message) => _inner!.sink.add([0, message]), + onDone: () => _closeChannel(0, 0)); + + _innerStreamSubscription = _inner!.stream.cast().listen((message) { + var id = (message[0] as num).toInt(); + + // If the channel was closed before an incoming message was processed, + // ignore that message. + if (_closedIds.contains(id)) return; + + var controller = _controllers.putIfAbsent(id, () { + // If we receive a message for a controller that doesn't have a local + // counterpart yet, create a controller for it to buffer incoming + // messages for when a local connection is created. + _pendingIds.add(id); + return StreamChannelController(sync: true); + }); + + if (message.length > 1) { + controller.local.sink.add(message[1] as T); + } else { + // A message without data indicates that the channel has been closed. We + // can just close the sink here without doing any more cleanup, because + // the sink closing will cause the stream to emit a done event which + // will trigger more cleanup. + controller.local.sink.close(); + } + }, + onDone: _closeInnerChannel, + onError: _mainController.local.sink.addError); + } + + @override + VirtualChannel virtualChannel([int? id]) { + int inputId; + int outputId; + if (id != null) { + // Since the user is passing in an id, we're connected to a remote + // VirtualChannel. This means messages they send over this channel will + // have the original odd id, but our replies will have an even id. + inputId = id; + outputId = id + 1; + } else { + // Since we're generating an id, we originated this VirtualChannel. This + // means messages we send over this channel will have the original odd id, + // but the remote channel's replies will have an even id. + inputId = _nextId + 1; + outputId = _nextId; + _nextId += 2; + } + + // If the inner channel has already closed, create new virtual channels in a + // closed state. + if (_inner == null) { + return VirtualChannel._( + this, inputId, const Stream.empty(), NullStreamSink()); + } + + late StreamChannelController controller; + if (_pendingIds.remove(inputId)) { + // If we've already received messages for this channel, use the controller + // where those messages are buffered. + controller = _controllers[inputId]!; + } else if (_controllers.containsKey(inputId) || + _closedIds.contains(inputId)) { + throw ArgumentError('A virtual channel with id $id already exists.'); + } else { + controller = StreamChannelController(sync: true); + _controllers[inputId] = controller; + } + + controller.local.stream.listen( + (message) => _inner!.sink.add([outputId, message]), + onDone: () => _closeChannel(inputId, outputId)); + return VirtualChannel._( + this, outputId, controller.foreign.stream, controller.foreign.sink); + } + + /// Closes the virtual channel for which incoming messages have [inputId] and + /// outgoing messages have [outputId]. + void _closeChannel(int inputId, int outputId) { + _closedIds.add(inputId); + var controller = _controllers.remove(inputId)!; + controller.local.sink.close(); + + if (_inner == null) return; + + // A message without data indicates that the virtual channel has been + // closed. + _inner!.sink.add([outputId]); + if (_controllers.isEmpty) _closeInnerChannel(); + } + + /// Closes the underlying communication channel. + void _closeInnerChannel() { + _inner!.sink.close(); + _innerStreamSubscription!.cancel(); + _inner = null; + + // Convert this to a list because the close is dispatched synchronously, and + // that could conceivably remove a controller from [_controllers]. + for (var controller in _controllers.values.toList(growable: false)) { + controller.local.sink.close(); + } + _controllers.clear(); + } +} + +/// A virtual channel created by [MultiChannel]. +/// +/// This implements [MultiChannel] for convenience. +/// [VirtualChannel.virtualChannel] is semantically identical to the parent's +/// [MultiChannel.virtualChannel]. +class VirtualChannel extends StreamChannelMixin + implements MultiChannel { + /// The [MultiChannel] that created this. + final MultiChannel _parent; + + /// The identifier for this channel. + /// + /// This can be sent across the [MultiChannel] to provide the remote endpoint + /// a means to connect to this channel. Nothing about this is guaranteed + /// except that it will be JSON-serializable. + final int id; + + @override + final Stream stream; + @override + final StreamSink sink; + + VirtualChannel._(this._parent, this.id, this.stream, this.sink); + + @override + VirtualChannel virtualChannel([int? id]) => _parent.virtualChannel(id); +} diff --git a/pkgs/stream_channel/lib/src/stream_channel_completer.dart b/pkgs/stream_channel/lib/src/stream_channel_completer.dart new file mode 100644 index 000000000..9d007eb6c --- /dev/null +++ b/pkgs/stream_channel/lib/src/stream_channel_completer.dart @@ -0,0 +1,74 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:async/async.dart'; + +import '../stream_channel.dart'; + +/// A [channel] where the source and destination are provided later. +/// +/// The [channel] is a normal channel that can be listened to and that events +/// can be added to immediately, but until [setChannel] is called it won't emit +/// any events and all events added to it will be buffered. +class StreamChannelCompleter { + /// The completer for this channel's stream. + final _streamCompleter = StreamCompleter(); + + /// The completer for this channel's sink. + final _sinkCompleter = StreamSinkCompleter(); + + /// The channel for this completer. + StreamChannel get channel => _channel; + late final StreamChannel _channel; + + /// Whether [setChannel] has been called. + bool _set = false; + + /// Convert a `Future` to a `StreamChannel`. + /// + /// This creates a channel using a channel completer, and sets the source + /// channel to the result of the future when the future completes. + /// + /// If the future completes with an error, the returned channel's stream will + /// instead contain just that error. The sink will silently discard all + /// events. + static StreamChannel fromFuture(Future channelFuture) { + var completer = StreamChannelCompleter(); + channelFuture.then(completer.setChannel, onError: completer.setError); + return completer.channel; + } + + StreamChannelCompleter() { + _channel = StreamChannel(_streamCompleter.stream, _sinkCompleter.sink); + } + + /// Set a channel as the source and destination for [channel]. + /// + /// A channel may be set at most once. + /// + /// Either [setChannel] or [setError] may be called at most once. Trying to + /// call either of them again will fail. + void setChannel(StreamChannel channel) { + if (_set) throw StateError('The channel has already been set.'); + _set = true; + + _streamCompleter.setSourceStream(channel.stream); + _sinkCompleter.setDestinationSink(channel.sink); + } + + /// Indicates that there was an error connecting the channel. + /// + /// This makes the stream emit [error] and close. It makes the sink discard + /// all its events. + /// + /// Either [setChannel] or [setError] may be called at most once. Trying to + /// call either of them again will fail. + void setError(Object error, [StackTrace? stackTrace]) { + if (_set) throw StateError('The channel has already been set.'); + _set = true; + + _streamCompleter.setError(error, stackTrace); + _sinkCompleter.setDestinationSink(NullStreamSink()); + } +} diff --git a/pkgs/stream_channel/lib/src/stream_channel_controller.dart b/pkgs/stream_channel/lib/src/stream_channel_controller.dart new file mode 100644 index 000000000..25d5239b2 --- /dev/null +++ b/pkgs/stream_channel/lib/src/stream_channel_controller.dart @@ -0,0 +1,67 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +/// @docImport 'isolate_channel.dart'; +library; + +import 'dart:async'; + +import '../stream_channel.dart'; + +/// A controller for exposing a new [StreamChannel]. +/// +/// This exposes two connected [StreamChannel]s, [local] and [foreign]. The +/// user's code should use [local] to emit and receive events. Then [foreign] +/// can be returned for others to use. For example, here's a simplified version +/// of the implementation of [IsolateChannel.new]: +/// +/// ```dart +/// StreamChannel isolateChannel(ReceivePort receivePort, SendPort sendPort) { +/// var controller = new StreamChannelController(allowForeignErrors: false); +/// +/// // Pipe all events from the receive port into the local sink... +/// receivePort.pipe(controller.local.sink); +/// +/// // ...and all events from the local stream into the send port. +/// controller.local.stream.listen(sendPort.send, onDone: receivePort.close); +/// +/// // Then return the foreign controller for your users to use. +/// return controller.foreign; +/// } +/// ``` +class StreamChannelController { + /// The local channel. + /// + /// This channel should be used directly by the creator of this + /// [StreamChannelController] to send and receive events. + StreamChannel get local => _local; + late final StreamChannel _local; + + /// The foreign channel. + /// + /// This channel should be returned to external users so they can communicate + /// with [local]. + StreamChannel get foreign => _foreign; + late final StreamChannel _foreign; + + /// Creates a [StreamChannelController]. + /// + /// If [sync] is true, events added to either channel's sink are synchronously + /// dispatched to the other channel's stream. This should only be done if the + /// source of those events is already asynchronous. + /// + /// If [allowForeignErrors] is `false`, errors are not allowed to be passed to + /// the foreign channel's sink. If any are, the connection will close and the + /// error will be forwarded to the foreign channel's [StreamSink.done] future. + /// This guarantees that the local stream will never emit errors. + StreamChannelController({bool allowForeignErrors = true, bool sync = false}) { + var localToForeignController = StreamController(sync: sync); + var foreignToLocalController = StreamController(sync: sync); + _local = StreamChannel.withGuarantees( + foreignToLocalController.stream, localToForeignController.sink); + _foreign = StreamChannel.withGuarantees( + localToForeignController.stream, foreignToLocalController.sink, + allowSinkErrors: allowForeignErrors); + } +} diff --git a/pkgs/stream_channel/lib/src/stream_channel_transformer.dart b/pkgs/stream_channel/lib/src/stream_channel_transformer.dart new file mode 100644 index 000000000..cf62c76c2 --- /dev/null +++ b/pkgs/stream_channel/lib/src/stream_channel_transformer.dart @@ -0,0 +1,58 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:convert'; + +import 'package:async/async.dart'; + +import '../stream_channel.dart'; + +/// A [StreamChannelTransformer] transforms the events being passed to and +/// emitted by a [StreamChannel]. +/// +/// This works on the same principle as [StreamTransformer] and +/// [StreamSinkTransformer]. Each transformer defines a [bind] method that takes +/// in the original [StreamChannel] and returns the transformed version. +/// +/// Transformers must be able to have [bind] called multiple times. If a +/// subclass implements [bind] explicitly, it should be sure that the returned +/// stream follows the second stream channel guarantee: closing the sink causes +/// the stream to close before it emits any more events. This guarantee is +/// invalidated when an asynchronous gap is added between the original stream's +/// event dispatch and the returned stream's, for example by transforming it +/// with a [StreamTransformer]. The guarantee can be easily preserved using +/// [StreamChannel.withCloseGuarantee]. +class StreamChannelTransformer { + /// The transformer to use on the channel's stream. + final StreamTransformer _streamTransformer; + + /// The transformer to use on the channel's sink. + final StreamSinkTransformer _sinkTransformer; + + /// Creates a [StreamChannelTransformer] from existing stream and sink + /// transformers. + const StreamChannelTransformer( + this._streamTransformer, this._sinkTransformer); + + /// Creates a [StreamChannelTransformer] from a codec's encoder and decoder. + /// + /// All input to the inner channel's sink is encoded using [Codec.encoder], + /// and all output from its stream is decoded using [Codec.decoder]. + StreamChannelTransformer.fromCodec(Codec codec) + : this(codec.decoder, + StreamSinkTransformer.fromStreamTransformer(codec.encoder)); + + /// Transforms the events sent to and emitted by [channel]. + /// + /// Creates a new channel. When events are passed to the returned channel's + /// sink, the transformer will transform them and pass the transformed + /// versions to `channel.sink`. When events are emitted from the + /// `channel.straem`, the transformer will transform them and pass the + /// transformed versions to the returned channel's stream. + StreamChannel bind(StreamChannel channel) => + StreamChannel.withCloseGuarantee( + channel.stream.transform(_streamTransformer), + _sinkTransformer.bind(channel.sink)); +} diff --git a/pkgs/stream_channel/lib/stream_channel.dart b/pkgs/stream_channel/lib/stream_channel.dart new file mode 100644 index 000000000..85f9a9755 --- /dev/null +++ b/pkgs/stream_channel/lib/stream_channel.dart @@ -0,0 +1,181 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:async/async.dart'; + +import 'src/close_guarantee_channel.dart'; +import 'src/guarantee_channel.dart'; +import 'src/stream_channel_transformer.dart'; + +export 'src/delegating_stream_channel.dart'; +export 'src/disconnector.dart'; +export 'src/json_document_transformer.dart'; +export 'src/multi_channel.dart'; +export 'src/stream_channel_completer.dart'; +export 'src/stream_channel_controller.dart'; +export 'src/stream_channel_transformer.dart'; + +/// An abstract class representing a two-way communication channel. +/// +/// Users should consider the [stream] emitting a "done" event to be the +/// canonical indicator that the channel has closed. If they wish to close the +/// channel, they should close the [sink]—canceling the stream subscription is +/// not sufficient. Protocol errors may be emitted through the stream or through +/// [sink].done, depending on their underlying cause. Note that the sink may +/// silently drop events if the channel closes before [sink].close is called. +/// +/// Implementations are strongly encouraged to mix in or extend +/// [StreamChannelMixin] to get default implementations of the various instance +/// methods. Adding new methods to this interface will not be considered a +/// breaking change if implementations are also added to [StreamChannelMixin]. +/// +/// Implementations must provide the following guarantees: +/// +/// * The stream is single-subscription, and must follow all the guarantees of +/// single-subscription streams. +/// +/// * Closing the sink causes the stream to close before it emits any more +/// events. +/// +/// * After the stream closes, the sink is automatically closed. If this +/// happens, sink methods should silently drop their arguments until +/// [sink].close is called. +/// +/// * If the stream closes before it has a listener, the sink should silently +/// drop events if possible. +/// +/// * Canceling the stream's subscription has no effect on the sink. The channel +/// must still be able to respond to the other endpoint closing the channel +/// even after the subscription has been canceled. +/// +/// * The sink *either* forwards errors to the other endpoint *or* closes as +/// soon as an error is added and forwards that error to the [sink].done +/// future. +/// +/// These guarantees allow users to interact uniformly with all implementations, +/// and ensure that either endpoint closing the stream produces consistent +/// behavior. +abstract class StreamChannel { + /// The single-subscription stream that emits values from the other endpoint. + Stream get stream; + + /// The sink for sending values to the other endpoint. + StreamSink get sink; + + /// Creates a new [StreamChannel] that communicates over [stream] and [sink]. + /// + /// Note that this stream/sink pair must provide the guarantees listed in the + /// [StreamChannel] documentation. If they don't do so natively, + /// [StreamChannel.withGuarantees] should be used instead. + factory StreamChannel(Stream stream, StreamSink sink) => + _StreamChannel(stream, sink); + + /// Creates a new [StreamChannel] that communicates over [stream] and [sink]. + /// + /// Unlike [StreamChannel.new], this enforces the guarantees listed in the + /// [StreamChannel] documentation. This makes it somewhat less efficient than + /// just wrapping a stream and a sink directly, so [StreamChannel.new] should + /// be used when the guarantees are provided natively. + /// + /// If [allowSinkErrors] is `false`, errors are not allowed to be passed to + /// [sink]. If any are, the connection will close and the error will be + /// forwarded to [sink].done. + factory StreamChannel.withGuarantees(Stream stream, StreamSink sink, + {bool allowSinkErrors = true}) => + GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors); + + /// Creates a new [StreamChannel] that communicates over [stream] and [sink]. + /// + /// This specifically enforces the second guarantee: closing the sink causes + /// the stream to close before it emits any more events. This guarantee is + /// invalidated when an asynchronous gap is added between the original + /// stream's event dispatch and the returned stream's, for example by + /// transforming it with a [StreamTransformer]. This is a lighter-weight way + /// of preserving that guarantee in particular than + /// [StreamChannel.withGuarantees]. + factory StreamChannel.withCloseGuarantee( + Stream stream, StreamSink sink) => + CloseGuaranteeChannel(stream, sink); + + /// Connects this to [other], so that any values emitted by either are sent + /// directly to the other. + void pipe(StreamChannel other); + + /// Transforms this using [transformer]. + /// + /// This is identical to calling `transformer.bind(channel)`. + StreamChannel transform(StreamChannelTransformer transformer); + + /// Transforms only the [stream] component of this using [transformer]. + StreamChannel transformStream(StreamTransformer transformer); + + /// Transforms only the [sink] component of this using [transformer]. + StreamChannel transformSink(StreamSinkTransformer transformer); + + /// Returns a copy of this with [stream] replaced by [change]'s return + /// value. + StreamChannel changeStream(Stream Function(Stream) change); + + /// Returns a copy of this with [sink] replaced by [change]'s return + /// value. + StreamChannel changeSink(StreamSink Function(StreamSink) change); + + /// Returns a copy of this with the generic type coerced to [S]. + /// + /// If any events emitted by [stream] aren't of type [S], they're converted + /// into [TypeError] events (`CastError` on some SDK versions). Similarly, if + /// any events are added to [sink] that aren't of type [S], a [TypeError] is + /// thrown. + StreamChannel cast(); +} + +/// An implementation of [StreamChannel] that simply takes a stream and a sink +/// as parameters. +/// +/// This is distinct from [StreamChannel] so that it can use +/// [StreamChannelMixin]. +class _StreamChannel extends StreamChannelMixin { + @override + final Stream stream; + @override + final StreamSink sink; + + _StreamChannel(this.stream, this.sink); +} + +/// A mixin that implements the instance methods of [StreamChannel] in terms of +/// [stream] and [sink]. +abstract class StreamChannelMixin implements StreamChannel { + @override + void pipe(StreamChannel other) { + stream.pipe(other.sink); + other.stream.pipe(sink); + } + + @override + StreamChannel transform(StreamChannelTransformer transformer) => + transformer.bind(this); + + @override + StreamChannel transformStream(StreamTransformer transformer) => + changeStream(transformer.bind); + + @override + StreamChannel transformSink(StreamSinkTransformer transformer) => + changeSink(transformer.bind); + + @override + StreamChannel changeStream(Stream Function(Stream) change) => + StreamChannel.withCloseGuarantee(change(stream), sink); + + @override + StreamChannel changeSink(StreamSink Function(StreamSink) change) => + StreamChannel.withCloseGuarantee(stream, change(sink)); + + @override + StreamChannel cast() => StreamChannel( + stream.cast(), StreamController(sync: true)..stream.cast().pipe(sink)); +} diff --git a/pkgs/stream_channel/pubspec.yaml b/pkgs/stream_channel/pubspec.yaml new file mode 100644 index 000000000..eec8c1ba5 --- /dev/null +++ b/pkgs/stream_channel/pubspec.yaml @@ -0,0 +1,16 @@ +name: stream_channel +version: 2.1.3 +description: >- + An abstraction for two-way communication channels based on the Dart Stream + class. +repository: https://github.com/dart-lang/tools/tree/main/pkgs/stream_channel + +environment: + sdk: ^3.3.0 + +dependencies: + async: ^2.5.0 + +dev_dependencies: + dart_flutter_team_lints: ^3.0.0 + test: ^1.16.6 diff --git a/pkgs/stream_channel/test/disconnector_test.dart b/pkgs/stream_channel/test/disconnector_test.dart new file mode 100644 index 000000000..bbba568ed --- /dev/null +++ b/pkgs/stream_channel/test/disconnector_test.dart @@ -0,0 +1,152 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:async/async.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +void main() { + late StreamController streamController; + late StreamController sinkController; + late Disconnector disconnector; + late StreamChannel channel; + setUp(() { + streamController = StreamController(); + sinkController = StreamController(); + disconnector = Disconnector(); + channel = StreamChannel.withGuarantees( + streamController.stream, sinkController.sink) + .transform(disconnector); + }); + + group('before disconnection', () { + test('forwards events from the sink as normal', () { + channel.sink.add(1); + channel.sink.add(2); + channel.sink.add(3); + channel.sink.close(); + + expect(sinkController.stream.toList(), completion(equals([1, 2, 3]))); + }); + + test('forwards events to the stream as normal', () { + streamController.add(1); + streamController.add(2); + streamController.add(3); + streamController.close(); + + expect(channel.stream.toList(), completion(equals([1, 2, 3]))); + }); + + test("events can't be added when the sink is explicitly closed", () { + sinkController.stream.listen(null); // Work around sdk#19095. + + expect(channel.sink.close(), completes); + expect(() => channel.sink.add(1), throwsStateError); + expect(() => channel.sink.addError('oh no'), throwsStateError); + expect(() => channel.sink.addStream(Stream.fromIterable([])), + throwsStateError); + }); + + test("events can't be added while a stream is being added", () { + var controller = StreamController(); + channel.sink.addStream(controller.stream); + + expect(() => channel.sink.add(1), throwsStateError); + expect(() => channel.sink.addError('oh no'), throwsStateError); + expect(() => channel.sink.addStream(Stream.fromIterable([])), + throwsStateError); + expect(() => channel.sink.close(), throwsStateError); + + controller.close(); + }); + }); + + test('cancels addStream when disconnected', () async { + var canceled = false; + var controller = StreamController(onCancel: () { + canceled = true; + }); + expect(channel.sink.addStream(controller.stream), completes); + unawaited(disconnector.disconnect()); + + await pumpEventQueue(); + expect(canceled, isTrue); + }); + + test('disconnect() returns the close future from the inner sink', () async { + var streamController = StreamController(); + var sinkController = StreamController(); + var disconnector = Disconnector(); + var sink = _CloseCompleterSink(sinkController.sink); + StreamChannel.withGuarantees(streamController.stream, sink) + .transform(disconnector); + + var disconnectFutureFired = false; + expect( + disconnector.disconnect().then((_) { + disconnectFutureFired = true; + }), + completes); + + // Give the future time to fire early if it's going to. + await pumpEventQueue(); + expect(disconnectFutureFired, isFalse); + + // When the inner sink's close future completes, so should the + // disconnector's. + sink.completer.complete(); + await pumpEventQueue(); + expect(disconnectFutureFired, isTrue); + }); + + group('after disconnection', () { + setUp(() { + disconnector.disconnect(); + }); + + test('closes the inner sink and ignores events to the outer sink', () { + channel.sink.add(1); + channel.sink.add(2); + channel.sink.add(3); + channel.sink.close(); + + expect(sinkController.stream.toList(), completion(isEmpty)); + }); + + test('closes the stream', () { + expect(channel.stream.toList(), completion(isEmpty)); + }); + + test('completes done', () { + sinkController.stream.listen(null); // Work around sdk#19095. + expect(channel.sink.done, completes); + }); + + test('still emits state errors after explicit close', () { + sinkController.stream.listen(null); // Work around sdk#19095. + expect(channel.sink.close(), completes); + + expect(() => channel.sink.add(1), throwsStateError); + expect(() => channel.sink.addError('oh no'), throwsStateError); + }); + }); +} + +/// A [StreamSink] wrapper that adds the ability to manually complete the Future +/// returned by [close] using [completer]. +class _CloseCompleterSink extends DelegatingStreamSink { + /// The completer for the future returned by [close]. + final completer = Completer(); + + _CloseCompleterSink(super.inner); + + @override + Future close() { + super.close(); + return completer.future; + } +} diff --git a/pkgs/stream_channel/test/isolate_channel_test.dart b/pkgs/stream_channel/test/isolate_channel_test.dart new file mode 100644 index 000000000..3a8b42e9b --- /dev/null +++ b/pkgs/stream_channel/test/isolate_channel_test.dart @@ -0,0 +1,174 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +@TestOn('vm') +library; + +import 'dart:async'; +import 'dart:isolate'; + +import 'package:stream_channel/isolate_channel.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +void main() { + late ReceivePort receivePort; + late SendPort sendPort; + late StreamChannel channel; + setUp(() { + receivePort = ReceivePort(); + var receivePortForSend = ReceivePort(); + sendPort = receivePortForSend.sendPort; + channel = IsolateChannel(receivePortForSend, receivePort.sendPort); + }); + + tearDown(() { + receivePort.close(); + channel.sink.close(); + }); + + test('the channel can send messages', () { + channel.sink.add(1); + channel.sink.add(2); + channel.sink.add(3); + + expect(receivePort.take(3).toList(), completion(equals([1, 2, 3]))); + }); + + test('the channel can receive messages', () { + sendPort.send(1); + sendPort.send(2); + sendPort.send(3); + + expect(channel.stream.take(3).toList(), completion(equals([1, 2, 3]))); + }); + + test("events can't be added to an explicitly-closed sink", () { + expect(channel.sink.close(), completes); + expect(() => channel.sink.add(1), throwsStateError); + expect(() => channel.sink.addError('oh no'), throwsStateError); + expect(() => channel.sink.addStream(Stream.fromIterable([])), + throwsStateError); + }); + + test("events can't be added while a stream is being added", () { + var controller = StreamController(); + channel.sink.addStream(controller.stream); + + expect(() => channel.sink.add(1), throwsStateError); + expect(() => channel.sink.addError('oh no'), throwsStateError); + expect(() => channel.sink.addStream(Stream.fromIterable([])), + throwsStateError); + expect(() => channel.sink.close(), throwsStateError); + + controller.close(); + }); + + group('stream channel rules', () { + test( + 'closing the sink causes the stream to close before it emits any more ' + 'events', () { + sendPort.send(1); + sendPort.send(2); + sendPort.send(3); + sendPort.send(4); + sendPort.send(5); + + channel.stream.listen(expectAsync1((message) { + expect(message, equals(1)); + channel.sink.close(); + }, count: 1)); + }); + + test("cancelling the stream's subscription has no effect on the sink", + () async { + unawaited(channel.stream.listen(null).cancel()); + await pumpEventQueue(); + + channel.sink.add(1); + channel.sink.add(2); + channel.sink.add(3); + expect(receivePort.take(3).toList(), completion(equals([1, 2, 3]))); + }); + + test('the sink closes as soon as an error is added', () async { + channel.sink.addError('oh no'); + channel.sink.add(1); + expect(channel.sink.done, throwsA('oh no')); + + // Since the sink is closed, the stream should also be closed. + expect(channel.stream.isEmpty, completion(isTrue)); + + // The other end shouldn't receive the next event, since the sink was + // closed. Pump the event queue to give it a chance to. + receivePort.listen(expectAsync1((_) {}, count: 0)); + await pumpEventQueue(); + }); + + test('the sink closes as soon as an error is added via addStream', + () async { + var canceled = false; + var controller = StreamController(onCancel: () { + canceled = true; + }); + + // This future shouldn't get the error, because it's sent to [Sink.done]. + expect(channel.sink.addStream(controller.stream), completes); + + controller.addError('oh no'); + expect(channel.sink.done, throwsA('oh no')); + await pumpEventQueue(); + expect(canceled, isTrue); + + // Even though the sink is closed, this shouldn't throw an error because + // the user didn't explicitly close it. + channel.sink.add(1); + }); + }); + + group('connect constructors', () { + late ReceivePort connectPort; + setUp(() { + connectPort = ReceivePort(); + }); + + tearDown(() { + connectPort.close(); + }); + + test('create a connected pair of channels', () async { + var channel1 = IsolateChannel.connectReceive(connectPort); + var channel2 = IsolateChannel.connectSend(connectPort.sendPort); + + channel1.sink.add(1); + channel1.sink.add(2); + channel1.sink.add(3); + expect(await channel2.stream.take(3).toList(), equals([1, 2, 3])); + + channel2.sink.add(4); + channel2.sink.add(5); + channel2.sink.add(6); + expect(await channel1.stream.take(3).toList(), equals([4, 5, 6])); + + await channel2.sink.close(); + }); + + test('the receiving channel produces an error if it gets the wrong message', + () { + var connectedChannel = IsolateChannel.connectReceive(connectPort); + connectPort.sendPort.send('wrong value'); + + expect(connectedChannel.stream.toList(), throwsStateError); + expect(connectedChannel.sink.done, completes); + }); + + test('the receiving channel closes gracefully without a connection', + () async { + var connectedChannel = IsolateChannel.connectReceive(connectPort); + await connectedChannel.sink.close(); + await expectLater(connectedChannel.stream.toList(), completion(isEmpty)); + await expectLater(connectedChannel.sink.done, completes); + }); + }); +} diff --git a/pkgs/stream_channel/test/json_document_transformer_test.dart b/pkgs/stream_channel/test/json_document_transformer_test.dart new file mode 100644 index 000000000..290c4e2c3 --- /dev/null +++ b/pkgs/stream_channel/test/json_document_transformer_test.dart @@ -0,0 +1,46 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:convert'; + +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +void main() { + late StreamController streamController; + late StreamController sinkController; + late StreamChannel channel; + setUp(() { + streamController = StreamController(); + sinkController = StreamController(); + channel = + StreamChannel(streamController.stream, sinkController.sink); + }); + + test('decodes JSON emitted by the channel', () { + var transformed = channel.transform(jsonDocument); + streamController.add('{"foo": "bar"}'); + expect(transformed.stream.first, completion(equals({'foo': 'bar'}))); + }); + + test('encodes objects added to the channel', () { + var transformed = channel.transform(jsonDocument); + transformed.sink.add({'foo': 'bar'}); + expect(sinkController.stream.first, + completion(equals(jsonEncode({'foo': 'bar'})))); + }); + + test('emits a stream error when incoming JSON is malformed', () { + var transformed = channel.transform(jsonDocument); + streamController.add('{invalid'); + expect(transformed.stream.first, throwsFormatException); + }); + + test('synchronously throws if an unencodable object is added', () { + var transformed = channel.transform(jsonDocument); + expect(() => transformed.sink.add(Object()), + throwsA(const TypeMatcher())); + }); +} diff --git a/pkgs/stream_channel/test/multi_channel_test.dart b/pkgs/stream_channel/test/multi_channel_test.dart new file mode 100644 index 000000000..ee6f8d2a1 --- /dev/null +++ b/pkgs/stream_channel/test/multi_channel_test.dart @@ -0,0 +1,478 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +void main() { + late StreamChannelController controller; + late MultiChannel channel1; + late MultiChannel channel2; + setUp(() { + controller = StreamChannelController(); + channel1 = MultiChannel(controller.local); + channel2 = MultiChannel(controller.foreign); + }); + + group('the default virtual channel', () { + test('begins connected', () { + var first = true; + channel2.stream.listen(expectAsync1((message) { + if (first) { + expect(message, equals(1)); + first = false; + } else { + expect(message, equals(2)); + } + }, count: 2)); + + channel1.sink.add(1); + channel1.sink.add(2); + }); + + test('closes the remote virtual channel when it closes', () { + expect(channel2.stream.toList(), completion(isEmpty)); + expect(channel2.sink.done, completes); + + channel1.sink.close(); + }); + + test('closes the local virtual channel when it closes', () { + expect(channel1.stream.toList(), completion(isEmpty)); + expect(channel1.sink.done, completes); + + channel1.sink.close(); + }); + + test( + "doesn't closes the local virtual channel when the stream " + 'subscription is canceled', () { + channel1.sink.done.then(expectAsync1((_) {}, count: 0)); + + channel1.stream.listen((_) {}).cancel(); + + // Ensure that there's enough time for the channel to close if it's going + // to. + return pumpEventQueue(); + }); + + test( + 'closes the underlying channel when it closes without any other ' + 'virtual channels', () { + expect(controller.local.sink.done, completes); + expect(controller.foreign.sink.done, completes); + + channel1.sink.close(); + }); + + test( + "doesn't close the underlying channel when it closes with other " + 'virtual channels', () { + controller.local.sink.done.then(expectAsync1((_) {}, count: 0)); + controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0)); + + // Establish another virtual connection which should keep the underlying + // connection open. + channel2.virtualChannel(channel1.virtualChannel().id); + channel1.sink.close(); + + // Ensure that there's enough time for the underlying channel to complete + // if it's going to. + return pumpEventQueue(); + }); + }); + + group('a locally-created virtual channel', () { + late VirtualChannel virtual1; + late VirtualChannel virtual2; + setUp(() { + virtual1 = channel1.virtualChannel(); + virtual2 = channel2.virtualChannel(virtual1.id); + }); + + test('sends messages only to the other virtual channel', () { + var first = true; + virtual2.stream.listen(expectAsync1((message) { + if (first) { + expect(message, equals(1)); + first = false; + } else { + expect(message, equals(2)); + } + }, count: 2)); + + // No other virtual channels should receive the message. + for (var i = 0; i < 10; i++) { + var virtual = channel2.virtualChannel(channel1.virtualChannel().id); + virtual.stream.listen(expectAsync1((_) {}, count: 0)); + } + channel2.stream.listen(expectAsync1((_) {}, count: 0)); + + virtual1.sink.add(1); + virtual1.sink.add(2); + }); + + test('closes the remote virtual channel when it closes', () { + expect(virtual2.stream.toList(), completion(isEmpty)); + expect(virtual2.sink.done, completes); + + virtual1.sink.close(); + }); + + test('closes the local virtual channel when it closes', () { + expect(virtual1.stream.toList(), completion(isEmpty)); + expect(virtual1.sink.done, completes); + + virtual1.sink.close(); + }); + + test( + "doesn't closes the local virtual channel when the stream " + 'subscription is canceled', () { + virtual1.sink.done.then(expectAsync1((_) {}, count: 0)); + virtual1.stream.listen((_) {}).cancel(); + + // Ensure that there's enough time for the channel to close if it's going + // to. + return pumpEventQueue(); + }); + + test( + 'closes the underlying channel when it closes without any other ' + 'virtual channels', () async { + // First close the default channel so we can test the new channel as the + // last living virtual channel. + unawaited(channel1.sink.close()); + + await channel2.stream.toList(); + expect(controller.local.sink.done, completes); + expect(controller.foreign.sink.done, completes); + + unawaited(virtual1.sink.close()); + }); + + test( + "doesn't close the underlying channel when it closes with other " + 'virtual channels', () { + controller.local.sink.done.then(expectAsync1((_) {}, count: 0)); + controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0)); + + virtual1.sink.close(); + + // Ensure that there's enough time for the underlying channel to complete + // if it's going to. + return pumpEventQueue(); + }); + + test("doesn't conflict with a remote virtual channel", () { + var virtual3 = channel2.virtualChannel(); + var virtual4 = channel1.virtualChannel(virtual3.id); + + // This is an implementation detail, but we assert it here to make sure + // we're properly testing two channels with the same id. + expect(virtual1.id, equals(virtual3.id)); + + virtual2.stream + .listen(expectAsync1((message) => expect(message, equals(1)))); + virtual4.stream + .listen(expectAsync1((message) => expect(message, equals(2)))); + + virtual1.sink.add(1); + virtual3.sink.add(2); + }); + }); + + group('a remotely-created virtual channel', () { + late VirtualChannel virtual1; + late VirtualChannel virtual2; + setUp(() { + virtual1 = channel1.virtualChannel(); + virtual2 = channel2.virtualChannel(virtual1.id); + }); + + test('sends messages only to the other virtual channel', () { + var first = true; + virtual1.stream.listen(expectAsync1((message) { + if (first) { + expect(message, equals(1)); + first = false; + } else { + expect(message, equals(2)); + } + }, count: 2)); + + // No other virtual channels should receive the message. + for (var i = 0; i < 10; i++) { + var virtual = channel2.virtualChannel(channel1.virtualChannel().id); + virtual.stream.listen(expectAsync1((_) {}, count: 0)); + } + channel1.stream.listen(expectAsync1((_) {}, count: 0)); + + virtual2.sink.add(1); + virtual2.sink.add(2); + }); + + test('closes the remote virtual channel when it closes', () { + expect(virtual1.stream.toList(), completion(isEmpty)); + expect(virtual1.sink.done, completes); + + virtual2.sink.close(); + }); + + test('closes the local virtual channel when it closes', () { + expect(virtual2.stream.toList(), completion(isEmpty)); + expect(virtual2.sink.done, completes); + + virtual2.sink.close(); + }); + + test( + "doesn't closes the local virtual channel when the stream " + 'subscription is canceled', () { + virtual2.sink.done.then(expectAsync1((_) {}, count: 0)); + virtual2.stream.listen((_) {}).cancel(); + + // Ensure that there's enough time for the channel to close if it's going + // to. + return pumpEventQueue(); + }); + + test( + 'closes the underlying channel when it closes without any other ' + 'virtual channels', () async { + // First close the default channel so we can test the new channel as the + // last living virtual channel. + unawaited(channel2.sink.close()); + + await channel1.stream.toList(); + expect(controller.local.sink.done, completes); + expect(controller.foreign.sink.done, completes); + + unawaited(virtual2.sink.close()); + }); + + test( + "doesn't close the underlying channel when it closes with other " + 'virtual channels', () { + controller.local.sink.done.then(expectAsync1((_) {}, count: 0)); + controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0)); + + virtual2.sink.close(); + + // Ensure that there's enough time for the underlying channel to complete + // if it's going to. + return pumpEventQueue(); + }); + + test("doesn't allow another virtual channel with the same id", () { + expect(() => channel2.virtualChannel(virtual1.id), throwsArgumentError); + }); + + test('dispatches events received before the virtual channel is created', + () async { + virtual1 = channel1.virtualChannel(); + + virtual1.sink.add(1); + await pumpEventQueue(); + + virtual1.sink.add(2); + await pumpEventQueue(); + + expect(channel2.virtualChannel(virtual1.id).stream, emitsInOrder([1, 2])); + }); + + test( + 'dispatches close events received before the virtual channel is ' + 'created', () async { + virtual1 = channel1.virtualChannel(); + + unawaited(virtual1.sink.close()); + await pumpEventQueue(); + + expect(channel2.virtualChannel(virtual1.id).stream.toList(), + completion(isEmpty)); + }); + }); + + group('when the underlying stream', () { + late VirtualChannel virtual1; + late VirtualChannel virtual2; + setUp(() { + virtual1 = channel1.virtualChannel(); + virtual2 = channel2.virtualChannel(virtual1.id); + }); + + test('closes, all virtual channels close', () { + expect(channel1.stream.toList(), completion(isEmpty)); + expect(channel1.sink.done, completes); + expect(channel2.stream.toList(), completion(isEmpty)); + expect(channel2.sink.done, completes); + expect(virtual1.stream.toList(), completion(isEmpty)); + expect(virtual1.sink.done, completes); + expect(virtual2.stream.toList(), completion(isEmpty)); + expect(virtual2.sink.done, completes); + + controller.local.sink.close(); + }); + + test('closes, more virtual channels are created closed', () async { + unawaited(channel2.sink.close()); + unawaited(virtual2.sink.close()); + + // Wait for the existing channels to emit done events. + await channel1.stream.toList(); + await virtual1.stream.toList(); + + var virtual = channel1.virtualChannel(); + expect(virtual.stream.toList(), completion(isEmpty)); + expect(virtual.sink.done, completes); + + virtual = channel1.virtualChannel(); + expect(virtual.stream.toList(), completion(isEmpty)); + expect(virtual.sink.done, completes); + }); + + test('emits an error, the error is sent only to the default channel', () { + channel1.stream.listen(expectAsync1((_) {}, count: 0), + onError: expectAsync1((error) => expect(error, equals('oh no')))); + virtual1.stream.listen(expectAsync1((_) {}, count: 0), + onError: expectAsync1((_) {}, count: 0)); + + controller.foreign.sink.addError('oh no'); + }); + }); + + group('stream channel rules', () { + group('for the main stream:', () { + test( + 'closing the sink causes the stream to close before it emits any ' + 'more events', () { + channel1.sink.add(1); + channel1.sink.add(2); + channel1.sink.add(3); + + channel2.stream.listen(expectAsync1((message) { + expect(message, equals(1)); + channel2.sink.close(); + }, count: 1)); + }); + + test('after the stream closes, the sink ignores events', () async { + unawaited(channel1.sink.close()); + + // Wait for the done event to be delivered. + await channel2.stream.toList(); + channel2.sink.add(1); + channel2.sink.add(2); + channel2.sink.add(3); + unawaited(channel2.sink.close()); + + // None of our channel.sink additions should make it to the other + // endpoint. + channel1.stream.listen(expectAsync1((_) {}, count: 0)); + await pumpEventQueue(); + }); + + test("canceling the stream's subscription has no effect on the sink", + () async { + unawaited(channel1.stream.listen(null).cancel()); + await pumpEventQueue(); + + channel1.sink.add(1); + channel1.sink.add(2); + channel1.sink.add(3); + unawaited(channel1.sink.close()); + expect(channel2.stream.toList(), completion(equals([1, 2, 3]))); + }); + + test("canceling the stream's subscription doesn't stop a done event", + () async { + unawaited(channel1.stream.listen(null).cancel()); + await pumpEventQueue(); + + unawaited(channel2.sink.close()); + await pumpEventQueue(); + + channel1.sink.add(1); + channel1.sink.add(2); + channel1.sink.add(3); + unawaited(channel1.sink.close()); + + // The sink should be ignoring events because the channel closed. + channel2.stream.listen(expectAsync1((_) {}, count: 0)); + await pumpEventQueue(); + }); + }); + + group('for a virtual channel:', () { + late VirtualChannel virtual1; + late VirtualChannel virtual2; + setUp(() { + virtual1 = channel1.virtualChannel(); + virtual2 = channel2.virtualChannel(virtual1.id); + }); + + test( + 'closing the sink causes the stream to close before it emits any ' + 'more events', () { + virtual1.sink.add(1); + virtual1.sink.add(2); + virtual1.sink.add(3); + + virtual2.stream.listen(expectAsync1((message) { + expect(message, equals(1)); + virtual2.sink.close(); + }, count: 1)); + }); + + test('after the stream closes, the sink ignores events', () async { + unawaited(virtual1.sink.close()); + + // Wait for the done event to be delivered. + await virtual2.stream.toList(); + virtual2.sink.add(1); + virtual2.sink.add(2); + virtual2.sink.add(3); + unawaited(virtual2.sink.close()); + + // None of our virtual.sink additions should make it to the other + // endpoint. + virtual1.stream.listen(expectAsync1((_) {}, count: 0)); + await pumpEventQueue(); + }); + + test("canceling the stream's subscription has no effect on the sink", + () async { + unawaited(virtual1.stream.listen(null).cancel()); + await pumpEventQueue(); + + virtual1.sink.add(1); + virtual1.sink.add(2); + virtual1.sink.add(3); + unawaited(virtual1.sink.close()); + expect(virtual2.stream.toList(), completion(equals([1, 2, 3]))); + }); + + test("canceling the stream's subscription doesn't stop a done event", + () async { + unawaited(virtual1.stream.listen(null).cancel()); + await pumpEventQueue(); + + unawaited(virtual2.sink.close()); + await pumpEventQueue(); + + virtual1.sink.add(1); + virtual1.sink.add(2); + virtual1.sink.add(3); + unawaited(virtual1.sink.close()); + + // The sink should be ignoring events because the stream closed. + virtual2.stream.listen(expectAsync1((_) {}, count: 0)); + await pumpEventQueue(); + }); + }); + }); +} diff --git a/pkgs/stream_channel/test/stream_channel_completer_test.dart b/pkgs/stream_channel/test/stream_channel_completer_test.dart new file mode 100644 index 000000000..c6fddc011 --- /dev/null +++ b/pkgs/stream_channel/test/stream_channel_completer_test.dart @@ -0,0 +1,120 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +void main() { + late StreamChannelCompleter completer; + late StreamController streamController; + late StreamController sinkController; + late StreamChannel innerChannel; + setUp(() { + completer = StreamChannelCompleter(); + streamController = StreamController(); + sinkController = StreamController(); + innerChannel = StreamChannel(streamController.stream, sinkController.sink); + }); + + group('when a channel is set before accessing', () { + test('forwards events through the stream', () { + completer.setChannel(innerChannel); + expect(completer.channel.stream.toList(), completion(equals([1, 2, 3]))); + + streamController.add(1); + streamController.add(2); + streamController.add(3); + streamController.close(); + }); + + test('forwards events through the sink', () { + completer.setChannel(innerChannel); + expect(sinkController.stream.toList(), completion(equals([1, 2, 3]))); + + completer.channel.sink.add(1); + completer.channel.sink.add(2); + completer.channel.sink.add(3); + completer.channel.sink.close(); + }); + + test('forwards an error through the stream', () { + completer.setError('oh no'); + expect(completer.channel.stream.first, throwsA('oh no')); + }); + + test('drops sink events', () { + completer.setError('oh no'); + expect(completer.channel.sink.done, completes); + completer.channel.sink.add(1); + completer.channel.sink.addError('oh no'); + }); + }); + + group('when a channel is set after accessing', () { + test('forwards events through the stream', () async { + expect(completer.channel.stream.toList(), completion(equals([1, 2, 3]))); + await pumpEventQueue(); + + completer.setChannel(innerChannel); + streamController.add(1); + streamController.add(2); + streamController.add(3); + unawaited(streamController.close()); + }); + + test('forwards events through the sink', () async { + completer.channel.sink.add(1); + completer.channel.sink.add(2); + completer.channel.sink.add(3); + unawaited(completer.channel.sink.close()); + await pumpEventQueue(); + + completer.setChannel(innerChannel); + expect(sinkController.stream.toList(), completion(equals([1, 2, 3]))); + }); + + test('forwards an error through the stream', () async { + expect(completer.channel.stream.first, throwsA('oh no')); + await pumpEventQueue(); + + completer.setError('oh no'); + }); + + test('drops sink events', () async { + expect(completer.channel.sink.done, completes); + completer.channel.sink.add(1); + completer.channel.sink.addError('oh no'); + await pumpEventQueue(); + + completer.setError('oh no'); + }); + }); + + group('forFuture', () { + test('forwards a StreamChannel', () { + var channel = + StreamChannelCompleter.fromFuture(Future.value(innerChannel)); + channel.sink.add(1); + channel.sink.close(); + streamController.sink.add(2); + streamController.sink.close(); + + expect(sinkController.stream.toList(), completion(equals([1]))); + expect(channel.stream.toList(), completion(equals([2]))); + }); + + test('forwards an error', () { + var channel = StreamChannelCompleter.fromFuture(Future.error('oh no')); + expect(channel.stream.toList(), throwsA('oh no')); + }); + }); + + test("doesn't allow the channel to be set multiple times", () { + completer.setChannel(innerChannel); + expect(() => completer.setChannel(innerChannel), throwsStateError); + expect(() => completer.setChannel(innerChannel), throwsStateError); + }); +} diff --git a/pkgs/stream_channel/test/stream_channel_controller_test.dart b/pkgs/stream_channel/test/stream_channel_controller_test.dart new file mode 100644 index 000000000..3d661e302 --- /dev/null +++ b/pkgs/stream_channel/test/stream_channel_controller_test.dart @@ -0,0 +1,104 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +void main() { + group('asynchronously', () { + late StreamChannelController controller; + setUp(() { + controller = StreamChannelController(); + }); + + test('forwards events from the local sink to the foreign stream', () { + controller.local.sink + ..add(1) + ..add(2) + ..add(3) + ..close(); + expect(controller.foreign.stream.toList(), completion(equals([1, 2, 3]))); + }); + + test('forwards events from the foreign sink to the local stream', () { + controller.foreign.sink + ..add(1) + ..add(2) + ..add(3) + ..close(); + expect(controller.local.stream.toList(), completion(equals([1, 2, 3]))); + }); + + test( + 'with allowForeignErrors: false, shuts down the connection if an ' + 'error is added to the foreign channel', () { + controller = StreamChannelController(allowForeignErrors: false); + + controller.foreign.sink.addError('oh no'); + expect(controller.foreign.sink.done, throwsA('oh no')); + expect(controller.foreign.stream.toList(), completion(isEmpty)); + expect(controller.local.sink.done, completes); + expect(controller.local.stream.toList(), completion(isEmpty)); + }); + }); + + group('synchronously', () { + late StreamChannelController controller; + setUp(() { + controller = StreamChannelController(sync: true); + }); + + test( + 'synchronously forwards events from the local sink to the foreign ' + 'stream', () { + var receivedEvent = false; + var receivedError = false; + var receivedDone = false; + controller.foreign.stream.listen(expectAsync1((event) { + expect(event, equals(1)); + receivedEvent = true; + }), onError: expectAsync1((error) { + expect(error, equals('oh no')); + receivedError = true; + }), onDone: expectAsync0(() { + receivedDone = true; + })); + + controller.local.sink.add(1); + expect(receivedEvent, isTrue); + + controller.local.sink.addError('oh no'); + expect(receivedError, isTrue); + + controller.local.sink.close(); + expect(receivedDone, isTrue); + }); + + test( + 'synchronously forwards events from the foreign sink to the local ' + 'stream', () { + var receivedEvent = false; + var receivedError = false; + var receivedDone = false; + controller.local.stream.listen(expectAsync1((event) { + expect(event, equals(1)); + receivedEvent = true; + }), onError: expectAsync1((error) { + expect(error, equals('oh no')); + receivedError = true; + }), onDone: expectAsync0(() { + receivedDone = true; + })); + + controller.foreign.sink.add(1); + expect(receivedEvent, isTrue); + + controller.foreign.sink.addError('oh no'); + expect(receivedError, isTrue); + + controller.foreign.sink.close(); + expect(receivedDone, isTrue); + }); + }); +} diff --git a/pkgs/stream_channel/test/stream_channel_test.dart b/pkgs/stream_channel/test/stream_channel_test.dart new file mode 100644 index 000000000..c44b6ab6f --- /dev/null +++ b/pkgs/stream_channel/test/stream_channel_test.dart @@ -0,0 +1,138 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:convert'; + +import 'package:async/async.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +void main() { + test("pipe() pipes data from each channel's stream into the other's sink", + () { + var otherStreamController = StreamController(); + var otherSinkController = StreamController(); + var otherChannel = + StreamChannel(otherStreamController.stream, otherSinkController.sink); + + var streamController = StreamController(); + var sinkController = StreamController(); + var channel = StreamChannel(streamController.stream, sinkController.sink); + + channel.pipe(otherChannel); + + streamController.add(1); + streamController.add(2); + streamController.add(3); + streamController.close(); + expect(otherSinkController.stream.toList(), completion(equals([1, 2, 3]))); + + otherStreamController.add(4); + otherStreamController.add(5); + otherStreamController.add(6); + otherStreamController.close(); + expect(sinkController.stream.toList(), completion(equals([4, 5, 6]))); + }); + + test('transform() transforms the channel', () async { + var streamController = StreamController>(); + var sinkController = StreamController>(); + var channel = StreamChannel(streamController.stream, sinkController.sink); + + var transformed = channel + .cast>() + .transform(StreamChannelTransformer.fromCodec(utf8)); + + streamController.add([102, 111, 111, 98, 97, 114]); + unawaited(streamController.close()); + expect(await transformed.stream.toList(), equals(['foobar'])); + + transformed.sink.add('fblthp'); + unawaited(transformed.sink.close()); + expect( + sinkController.stream.toList(), + completion(equals([ + [102, 98, 108, 116, 104, 112] + ]))); + }); + + test('transformStream() transforms only the stream', () async { + var streamController = StreamController(); + var sinkController = StreamController(); + var channel = StreamChannel(streamController.stream, sinkController.sink); + + var transformed = + channel.cast().transformStream(const LineSplitter()); + + streamController.add('hello world'); + streamController.add(' what\nis'); + streamController.add('\nup'); + unawaited(streamController.close()); + expect(await transformed.stream.toList(), + equals(['hello world what', 'is', 'up'])); + + transformed.sink.add('fbl\nthp'); + unawaited(transformed.sink.close()); + expect(sinkController.stream.toList(), completion(equals(['fbl\nthp']))); + }); + + test('transformSink() transforms only the sink', () async { + var streamController = StreamController(); + var sinkController = StreamController(); + var channel = StreamChannel(streamController.stream, sinkController.sink); + + var transformed = channel.cast().transformSink( + const StreamSinkTransformer.fromStreamTransformer(LineSplitter())); + + streamController.add('fbl\nthp'); + unawaited(streamController.close()); + expect(await transformed.stream.toList(), equals(['fbl\nthp'])); + + transformed.sink.add('hello world'); + transformed.sink.add(' what\nis'); + transformed.sink.add('\nup'); + unawaited(transformed.sink.close()); + expect(sinkController.stream.toList(), + completion(equals(['hello world what', 'is', 'up']))); + }); + + test('changeStream() changes the stream', () { + var streamController = StreamController(); + var sinkController = StreamController(); + var channel = StreamChannel(streamController.stream, sinkController.sink); + + var newController = StreamController(); + var changed = channel.changeStream((stream) { + expect(stream, equals(channel.stream)); + return newController.stream; + }); + + newController.add(10); + newController.close(); + + streamController.add(20); + streamController.close(); + + expect(changed.stream.toList(), completion(equals([10]))); + }); + + test('changeSink() changes the sink', () { + var streamController = StreamController(); + var sinkController = StreamController(); + var channel = StreamChannel(streamController.stream, sinkController.sink); + + var newController = StreamController(); + var changed = channel.changeSink((sink) { + expect(sink, equals(channel.sink)); + return newController.sink; + }); + + expect(newController.stream.toList(), completion(equals([10]))); + streamController.stream.listen(expectAsync1((_) {}, count: 0)); + + changed.sink.add(10); + changed.sink.close(); + }); +} diff --git a/pkgs/stream_channel/test/with_close_guarantee_test.dart b/pkgs/stream_channel/test/with_close_guarantee_test.dart new file mode 100644 index 000000000..9c0b72998 --- /dev/null +++ b/pkgs/stream_channel/test/with_close_guarantee_test.dart @@ -0,0 +1,69 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:async/async.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +final _delayTransformer = StreamTransformer.fromHandlers( + handleData: (data, sink) => Future.microtask(() => sink.add(data)), + handleDone: (sink) => Future.microtask(() => sink.close())); + +final _delaySinkTransformer = + StreamSinkTransformer.fromStreamTransformer(_delayTransformer); + +void main() { + late StreamChannelController controller; + late StreamChannel channel; + setUp(() { + controller = StreamChannelController(); + + // Add a bunch of layers of asynchronous dispatch between the channel and + // the underlying controllers. + var stream = controller.foreign.stream; + var sink = controller.foreign.sink; + for (var i = 0; i < 10; i++) { + stream = stream.transform(_delayTransformer); + sink = _delaySinkTransformer.bind(sink); + } + + channel = StreamChannel.withCloseGuarantee(stream, sink); + }); + + test( + 'closing the event sink causes the stream to close before it emits any ' + 'more events', () async { + controller.local.sink.add(1); + controller.local.sink.add(2); + controller.local.sink.add(3); + + expect( + channel.stream + .listen(expectAsync1((event) { + if (event == 2) channel.sink.close(); + }, count: 2)) + .asFuture(), + completes); + + await pumpEventQueue(); + }); + + test( + 'closing the event sink before events are emitted causes the stream to ' + 'close immediately', () async { + unawaited(channel.sink.close()); + channel.stream.listen(expectAsync1((_) {}, count: 0), + onError: expectAsync2((_, __) {}, count: 0), + onDone: expectAsync0(() {})); + + controller.local.sink.add(1); + controller.local.sink.add(2); + controller.local.sink.add(3); + unawaited(controller.local.sink.close()); + + await pumpEventQueue(); + }); +} diff --git a/pkgs/stream_channel/test/with_guarantees_test.dart b/pkgs/stream_channel/test/with_guarantees_test.dart new file mode 100644 index 000000000..f026079fb --- /dev/null +++ b/pkgs/stream_channel/test/with_guarantees_test.dart @@ -0,0 +1,200 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +void main() { + late StreamController streamController; + late StreamController sinkController; + late StreamChannel channel; + setUp(() { + streamController = StreamController(); + sinkController = StreamController(); + channel = StreamChannel.withGuarantees( + streamController.stream, sinkController.sink); + }); + + group('with a broadcast stream', () { + setUp(() { + streamController = StreamController.broadcast(); + channel = StreamChannel.withGuarantees( + streamController.stream, sinkController.sink); + }); + + test('buffers events', () async { + streamController.add(1); + streamController.add(2); + streamController.add(3); + await pumpEventQueue(); + + expect(channel.stream.toList(), completion(equals([1, 2, 3]))); + unawaited(streamController.close()); + }); + + test('only allows a single subscription', () { + channel.stream.listen(null); + expect(() => channel.stream.listen(null), throwsStateError); + }); + }); + + test( + 'closing the event sink causes the stream to close before it emits any ' + 'more events', () { + streamController.add(1); + streamController.add(2); + streamController.add(3); + + expect( + channel.stream + .listen(expectAsync1((event) { + if (event == 2) channel.sink.close(); + }, count: 2)) + .asFuture(), + completes); + }); + + test('after the stream closes, the sink ignores events', () async { + unawaited(streamController.close()); + + // Wait for the done event to be delivered. + await channel.stream.toList(); + channel.sink.add(1); + channel.sink.add(2); + channel.sink.add(3); + unawaited(channel.sink.close()); + + // None of our channel.sink additions should make it to the other endpoint. + sinkController.stream.listen(expectAsync1((_) {}, count: 0), + onDone: expectAsync0(() {}, count: 0)); + await pumpEventQueue(); + }); + + test("canceling the stream's subscription has no effect on the sink", + () async { + unawaited(channel.stream.listen(null).cancel()); + await pumpEventQueue(); + + channel.sink.add(1); + channel.sink.add(2); + channel.sink.add(3); + unawaited(channel.sink.close()); + expect(sinkController.stream.toList(), completion(equals([1, 2, 3]))); + }); + + test("canceling the stream's subscription doesn't stop a done event", + () async { + unawaited(channel.stream.listen(null).cancel()); + await pumpEventQueue(); + + unawaited(streamController.close()); + await pumpEventQueue(); + + channel.sink.add(1); + channel.sink.add(2); + channel.sink.add(3); + unawaited(channel.sink.close()); + + // The sink should be ignoring events because the stream closed. + sinkController.stream.listen(expectAsync1((_) {}, count: 0), + onDone: expectAsync0(() {}, count: 0)); + await pumpEventQueue(); + }); + + test('forwards errors to the other endpoint', () { + channel.sink.addError('error'); + expect(sinkController.stream.first, throwsA('error')); + }); + + test('Sink.done completes once the stream is done', () { + channel.stream.listen(null); + expect(channel.sink.done, completes); + streamController.close(); + }); + + test("events can't be added to an explicitly-closed sink", () { + sinkController.stream.listen(null); // Work around sdk#19095. + + expect(channel.sink.close(), completes); + expect(() => channel.sink.add(1), throwsStateError); + expect(() => channel.sink.addError('oh no'), throwsStateError); + expect(() => channel.sink.addStream(Stream.fromIterable([])), + throwsStateError); + }); + + test("events can't be added while a stream is being added", () { + var controller = StreamController(); + channel.sink.addStream(controller.stream); + + expect(() => channel.sink.add(1), throwsStateError); + expect(() => channel.sink.addError('oh no'), throwsStateError); + expect(() => channel.sink.addStream(Stream.fromIterable([])), + throwsStateError); + expect(() => channel.sink.close(), throwsStateError); + + controller.close(); + }); + + group('with allowSinkErrors: false', () { + setUp(() { + streamController = StreamController(); + sinkController = StreamController(); + channel = StreamChannel.withGuarantees( + streamController.stream, sinkController.sink, + allowSinkErrors: false); + }); + + test('forwards errors to Sink.done but not the stream', () { + channel.sink.addError('oh no'); + expect(channel.sink.done, throwsA('oh no')); + sinkController.stream + .listen(null, onError: expectAsync1((dynamic _) {}, count: 0)); + }); + + test('adding an error causes the stream to emit a done event', () { + expect(channel.sink.done, throwsA('oh no')); + + streamController.add(1); + streamController.add(2); + streamController.add(3); + + expect( + channel.stream + .listen(expectAsync1((event) { + if (event == 2) channel.sink.addError('oh no'); + }, count: 2)) + .asFuture(), + completes); + }); + + test('adding an error closes the inner sink', () { + channel.sink.addError('oh no'); + expect(channel.sink.done, throwsA('oh no')); + expect(sinkController.stream.toList(), completion(isEmpty)); + }); + + test( + 'adding an error via via addStream causes the stream to emit a done ' + 'event', () async { + var canceled = false; + var controller = StreamController(onCancel: () { + canceled = true; + }); + + // This future shouldn't get the error, because it's sent to [Sink.done]. + expect(channel.sink.addStream(controller.stream), completes); + + controller.addError('oh no'); + expect(channel.sink.done, throwsA('oh no')); + await pumpEventQueue(); + expect(canceled, isTrue); + + // Even though the sink is closed, this shouldn't throw an error because + // the user didn't explicitly close it. + channel.sink.add(1); + }); + }); +}