Skip to content

Commit

Permalink
Pass exitCode via message and eagerly exit
Browse files Browse the repository at this point in the history
  • Loading branch information
ntkme committed Nov 16, 2024
1 parent 98b6ff2 commit d3c7314
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 48 deletions.
5 changes: 3 additions & 2 deletions lib/src/embedded/compilation_dispatcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// https://opensource.org/licenses/MIT.

import 'dart:convert';
import 'dart:io' if (dart.library.js) 'js/io.dart';
import 'dart:isolate' if (dart.library.js) 'js/isolate.dart';
import 'dart:typed_data';

Expand Down Expand Up @@ -366,14 +367,14 @@ final class CompilationDispatcher {
message.writeToCodedBufferWriter(protobufWriter);

// Add one additional byte to the beginning to indicate whether or not the
// compilation has finished (1) or encountered a fatal error (2), so the
// compilation has finished (1) or encountered a fatal error (exitCode), so the
// [IsolateDispatcher] knows whether to treat this isolate as inactive or
// close out entirely.
var packet = Uint8List(
1 + _compilationIdVarint.length + protobufWriter.lengthInBytes);
packet[0] = switch (message.whichMessage()) {
OutboundMessage_Message.compileResponse => 1,
OutboundMessage_Message.error => 2,
OutboundMessage_Message.error => exitCode,
_ => 0
};
packet.setAll(1, _compilationIdVarint);
Expand Down
3 changes: 1 addition & 2 deletions lib/src/embedded/executable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ void main(List<String> args) {
if (parseOptions(args)) {
IsolateDispatcher(
StreamChannel.withGuarantees(stdin, stdout, allowSinkErrors: false)
.transform(lengthDelimited),
gracefulShutdown: false)
.transform(lengthDelimited))
.listen();
}
}
41 changes: 6 additions & 35 deletions lib/src/embedded/isolate_dispatcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,6 @@ class IsolateDispatcher {
/// The channel of encoded protocol buffers, connected to the host.
final StreamChannel<Uint8List> _channel;

/// Whether to wait for all worker isolates to exit before exiting the main
/// isolate or not.
final bool _gracefulShutdown;

/// All isolates that have been spawned to dispatch to.
///
/// Only used for cleaning up the process when the underlying channel closes.
final _allIsolates = StreamController<ReusableIsolate>(sync: true);

/// The isolates that aren't currently running compilations
final _inactiveIsolates = <ReusableIsolate>{};

Expand All @@ -42,11 +33,7 @@ class IsolateDispatcher {
/// may be live at once.
final _isolatePool = Pool(concurrencyLimit);

/// Whether [_channel] has been closed or not.
var _closed = false;

IsolateDispatcher(this._channel, {bool gracefulShutdown = true})
: _gracefulShutdown = gracefulShutdown;
IsolateDispatcher(this._channel);

void listen() {
_channel.stream.listen((packet) async {
Expand All @@ -60,9 +47,6 @@ class IsolateDispatcher {
var isolate = await _activeIsolates.putIfAbsent(
compilationId, () => _getIsolate(compilationId!));

// The shutdown may have started by the time the isolate is spawned
if (_closed) return;

try {
isolate.send(packet);
return;
Expand Down Expand Up @@ -95,12 +79,7 @@ class IsolateDispatcher {
}, onError: (Object error, StackTrace stackTrace) {
_handleError(error, stackTrace);
}, onDone: () {
if (_gracefulShutdown) {
_closed = true;
_allIsolates.stream.listen((isolate) => isolate.kill());
} else {
exit(exitCode);
}
exit(exitCode);
});
}

Expand All @@ -120,7 +99,6 @@ class IsolateDispatcher {
_handleError(error, stackTrace);
});
isolate = await future;
_allIsolates.add(isolate);
}

isolate.borrow((message) {
Expand All @@ -145,13 +123,10 @@ class IsolateDispatcher {
_inactiveIsolates.add(isolate);
resource.release();
_channel.sink.add(packet);
case 2:
default:
_channel.sink.add(packet);
if (_gracefulShutdown) {
_channel.sink.close();
} else {
exit(exitCode);
}
exitCode = category;
exit(exitCode);
}
});

Expand All @@ -175,11 +150,7 @@ class IsolateDispatcher {
{int? compilationId, int? messageId}) {
sendError(compilationId ?? errorId,
handleError(error, stackTrace, messageId: messageId));
if (_gracefulShutdown) {
_channel.sink.close();
} else {
exit(exitCode);
}
exit(exitCode);
}

/// Sends [message] to the host.
Expand Down
9 changes: 0 additions & 9 deletions lib/src/embedded/js/reusable_isolate.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import 'dart:async';
import 'dart:js_interop';
import 'dart:typed_data';

import 'io.dart';
import 'js.dart';
import 'sync_message_port.dart';
import 'worker_threads.dart';
Expand Down Expand Up @@ -46,14 +45,6 @@ class ReusableIsolate {
workerData: channel.port2,
transferList: [channel.port2].toJS,
argv: argv));
worker.once(
'exit',
(int code) {
// Worker exit code 1 means it is killed by worker.terminate()
if (code > exitCode && code != 1) {
exitCode = code;
}
}.toJS);
var controller = StreamController<dynamic>(sync: true);
var sendPort = SyncMessagePort(channel.port1);
var receivePort = channel.port1;
Expand Down

0 comments on commit d3c7314

Please sign in to comment.