diff --git a/packages/celest/lib/src/runtime/sse/sse_handler.dart b/packages/celest/lib/src/runtime/sse/sse_handler.dart index 48181a86..167d2b95 100644 --- a/packages/celest/lib/src/runtime/sse/sse_handler.dart +++ b/packages/celest/lib/src/runtime/sse/sse_handler.dart @@ -5,6 +5,7 @@ library; import 'dart:async'; import 'dart:convert'; import 'dart:io'; +import 'dart:typed_data'; import 'package:async/async.dart'; import 'package:celest_core/_internal.dart'; @@ -205,6 +206,9 @@ final class _SseHandler { }); } + static final _jsonUtf8Decoder = + utf8.decoder.fuse(json.decoder).cast(); + Future _handleIncomingMessage(Request request) async { final clientId = request.url.queryParameters['sseClientId']; if (clientId == null) { @@ -222,11 +226,7 @@ final class _SseHandler { request.url.queryParameters['messageId'] ?? '0', ); try { - final message = await request - .read() - .transform(utf8.decoder) - .transform(json.decoder) - .single; + final message = await request.read().transform(_jsonUtf8Decoder).first; connection._handleIncoming(messageId, message); return Response(HttpStatus.accepted); } on Object catch (e, st) { diff --git a/packages/celest/lib/src/runtime/targets.dart b/packages/celest/lib/src/runtime/targets.dart index 1e3d74ab..9e31951f 100644 --- a/packages/celest/lib/src/runtime/targets.dart +++ b/packages/celest/lib/src/runtime/targets.dart @@ -79,7 +79,7 @@ abstract base class CloudEventSourceTarget extends CloudFunctionTarget { StreamChannelTransformer( StreamTransformer.fromHandlers( handleData: (data, sink) { - sink.add(JsonUtf8.decodeMap(data)); + sink.add(JsonUtf8.decodeAny(data)); }, ), StreamSinkTransformer.fromHandlers( diff --git a/packages/celest_core/lib/src/base/base_protocol.dart b/packages/celest_core/lib/src/base/base_protocol.dart index 96c3cabe..8434454c 100644 --- a/packages/celest_core/lib/src/base/base_protocol.dart +++ b/packages/celest_core/lib/src/base/base_protocol.dart @@ -55,7 +55,7 @@ mixin BaseProtocol { }; } - Stream> connect( + Stream connect( String path, { required Map payload, }) { @@ -84,10 +84,10 @@ mixin BaseProtocol { ); } final json = jsonDecode(response.body) as Map; - final error = json['error'] as Map?; + final error = json['@error'] as Map?; throw createError( - error?['message'] as String?, - details: switch (error?['details']) { + error?['message'] as String? ?? json['message'] as String?, + details: switch (json['details']) { null => null, final Object details => JsonValue(details), }, diff --git a/packages/celest_core/lib/src/events/event_channel.dart b/packages/celest_core/lib/src/events/event_channel.dart index 5d0b2aa8..33b797ac 100644 --- a/packages/celest_core/lib/src/events/event_channel.dart +++ b/packages/celest_core/lib/src/events/event_channel.dart @@ -4,7 +4,7 @@ import 'package:celest_core/src/events/event_channel.vm.dart' import 'package:http/http.dart' as http; import 'package:stream_channel/stream_channel.dart'; -abstract class EventChannel with StreamChannelMixin> { +abstract class EventChannel with StreamChannelMixin { EventChannel(); factory EventChannel.connect( Uri uri, { diff --git a/packages/celest_core/lib/src/events/event_channel.vm.dart b/packages/celest_core/lib/src/events/event_channel.vm.dart index 21e665cc..9b0f1492 100644 --- a/packages/celest_core/lib/src/events/event_channel.vm.dart +++ b/packages/celest_core/lib/src/events/event_channel.vm.dart @@ -31,8 +31,7 @@ final class EventChannelPlatform extends EventChannel { } final WebSocketChannel _ws; - late final StreamSink> _wsSink = - _ws.sink.rejectErrors().transform( + late final StreamSink _wsSink = _ws.sink.rejectErrors().transform( StreamSinkTransformer.fromHandlers( handleData: (data, sink) { sink.add(jsonEncode(data)); @@ -41,10 +40,10 @@ final class EventChannelPlatform extends EventChannel { ); @override - Stream> get stream => _ws.stream.map(JsonUtf8.decodeMap); + Stream get stream => _ws.stream.map(JsonUtf8.decodeAny); @override - StreamSink> get sink => _wsSink; + StreamSink get sink => _wsSink; @override void close() { diff --git a/packages/celest_core/lib/src/events/event_channel.web.dart b/packages/celest_core/lib/src/events/event_channel.web.dart index db7fccd6..8c323302 100644 --- a/packages/celest_core/lib/src/events/event_channel.web.dart +++ b/packages/celest_core/lib/src/events/event_channel.web.dart @@ -5,8 +5,7 @@ import 'package:celest_core/src/events/event_channel.dart'; import 'package:http/http.dart' as http; import 'package:stream_channel/stream_channel.dart'; -final class EventChannelPlatform - extends DelegatingStreamChannel> +final class EventChannelPlatform extends DelegatingStreamChannel implements EventChannel { EventChannelPlatform._(super._inner); @@ -16,7 +15,7 @@ final class EventChannelPlatform http.Client? httpClient, }) { final sseClient = SseClient(serverUri: uri, httpClient: httpClient); - final completer = StreamChannelCompleter>(); + final completer = StreamChannelCompleter(); sseClient.onConnected .then((_) => completer.setChannel(sseClient)) .onError((e, st) { diff --git a/packages/celest_core/lib/src/events/sse/sse_client.dart b/packages/celest_core/lib/src/events/sse/sse_client.dart index be925f1c..89da6f7d 100644 --- a/packages/celest_core/lib/src/events/sse/sse_client.dart +++ b/packages/celest_core/lib/src/events/sse/sse_client.dart @@ -7,7 +7,7 @@ import 'package:stream_channel/stream_channel.dart'; /// {@template celest.runtime.sse_client} /// A Server-Sent Events (SSE) client. /// {@endtemplate} -abstract class SseClient with StreamChannelMixin> { +abstract class SseClient with StreamChannelMixin { /// Creates a new [SseClient] connected to the server at [serverUri]. factory SseClient({ required Uri serverUri, diff --git a/packages/celest_core/lib/src/events/sse/sse_client.web.dart b/packages/celest_core/lib/src/events/sse/sse_client.web.dart index daa95e10..5801a01a 100644 --- a/packages/celest_core/lib/src/events/sse/sse_client.web.dart +++ b/packages/celest_core/lib/src/events/sse/sse_client.web.dart @@ -56,18 +56,16 @@ final class SseClientPlatform extends SseClient { var _lastMessageId = -1; - late final StreamController> _incomingController = + late final StreamController _incomingController = StreamController(onCancel: close); @override - Stream> get stream => _incomingController.stream; + Stream get stream => _incomingController.stream; - late final StreamController> _outgoingController = - StreamController(); + late final StreamController _outgoingController = StreamController(); @override - late final StreamSink> sink = - _outgoingController.sink.transform( + late final StreamSink sink = _outgoingController.sink.transform( StreamSinkTransformer.fromHandlers( handleError: (error, stackTrace, sink) { _closeWithError(error, stackTrace); @@ -118,9 +116,6 @@ final class SseClientPlatform extends SseClient { _logger.finest('Message event: $data'); try { final message = jsonDecode(data); - if (message is! Map) { - throw FormatException('Expected a Map, got ${message.runtimeType}'); - } _incomingController.add(message); } on Object catch (e, st) { _logger.severe('Invalid message: $data', e, st); @@ -131,7 +126,7 @@ final class SseClientPlatform extends SseClient { } } - Future _onOutgoingMessage(Map message) async { + Future _onOutgoingMessage(Object? message) async { final uri = serverUri.replace( queryParameters: { ...serverUri.queryParametersAll, diff --git a/packages/celest_core/lib/src/util/json.dart b/packages/celest_core/lib/src/util/json.dart index ebe947c3..8b20d018 100644 --- a/packages/celest_core/lib/src/util/json.dart +++ b/packages/celest_core/lib/src/util/json.dart @@ -27,6 +27,23 @@ extension JsonUtf8 on Object { ); } + /// Decodes a JSON map [body] from a `List` or [String]. + static Object? decodeAny(Object? body) { + return switch (body) { + List() => body.isEmpty ? const {} : decode(body), + String() => body.isEmpty + ? const {} + : () { + try { + return jsonDecode(body); + } on Object { + return body; + } + }(), + _ => body, + }; + } + /// Decodes a JSON map [body] from a `List` or [String]. static Map decodeMap(Object? body) { Object? decoded; @@ -36,7 +53,11 @@ extension JsonUtf8 on Object { decoded = decode(body); case String(): if (body.isEmpty) return const {}; - decoded = jsonDecode(body); + try { + decoded = jsonDecode(body); + } on Object { + decoded = body; + } default: decoded = body; }