From 0904ca2525b196116db948d4e619d8b1a4782f40 Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Fri, 24 May 2024 11:34:50 +0200 Subject: [PATCH] feat!: rework Subscription handling --- lib/src/binding_coap/coap_client.dart | 1 - lib/src/binding_coap/coap_subscription.dart | 16 ++++----- lib/src/binding_mqtt/mqtt_client.dart | 8 ++++- lib/src/binding_mqtt/mqtt_subscription.dart | 16 +++++---- .../core/implementation/consumed_thing.dart | 36 ++++++++++--------- lib/src/core/protocol_interfaces.dart | 1 + .../protocol_interfaces/subscription.dart | 19 ++++++++++ 7 files changed, 62 insertions(+), 35 deletions(-) create mode 100644 lib/src/core/protocol_interfaces/subscription.dart diff --git a/lib/src/binding_coap/coap_client.dart b/lib/src/binding_coap/coap_client.dart index 4d5e7ba5..d9830230 100644 --- a/lib/src/binding_coap/coap_client.dart +++ b/lib/src/binding_coap/coap_client.dart @@ -429,7 +429,6 @@ final class CoapClient implements ProtocolClient { if (subprotocol == CoapSubprotocol.observe) { final observeClientRelation = await coapClient.observe(request); - observeClientRelation.listen(handleResponse); return CoapSubscription(coapClient, observeClientRelation, complete); } diff --git a/lib/src/binding_coap/coap_subscription.dart b/lib/src/binding_coap/coap_subscription.dart index 9a63c012..5b837894 100644 --- a/lib/src/binding_coap/coap_subscription.dart +++ b/lib/src/binding_coap/coap_subscription.dart @@ -6,17 +6,18 @@ import "package:coap/coap.dart"; -import "../../core.dart"; +import "../core/protocol_interfaces/subscription.dart"; -/// [Subscription] to a CoAP resource, based on the observe option ([RFC 7641]). +/// [ProtocolSubscription] to a CoAP resource, based on the observe option +/// ([RFC 7641]). /// /// [RFC 7641]: https://datatracker.ietf.org/doc/html/rfc7641 -class CoapSubscription implements Subscription { +class CoapSubscription extends ProtocolSubscription { /// Constructor CoapSubscription( this._coapClient, this._observeClientRelation, - this._complete, + super._complete, ) : _active = true; final CoapClient _coapClient; @@ -28,10 +29,6 @@ class CoapSubscription implements Subscription { @override bool get active => _active; - /// Callback used to pass by the servient that is used to signal it that an - /// observation has been cancelled. - final void Function() _complete; - @override Future stop({ int? formIndex, @@ -48,6 +45,7 @@ class CoapSubscription implements Subscription { await _coapClient.cancelObserveProactive(observeClientRelation); } _coapClient.close(); - _complete(); + await super + .stop(formIndex: formIndex, uriVariables: uriVariables, data: data); } } diff --git a/lib/src/binding_mqtt/mqtt_client.dart b/lib/src/binding_mqtt/mqtt_client.dart index dec31b7d..ebef64c0 100644 --- a/lib/src/binding_mqtt/mqtt_client.dart +++ b/lib/src/binding_mqtt/mqtt_client.dart @@ -195,7 +195,13 @@ final class MqttClient implements ProtocolClient { throw MqttBindingException("Subscription to topic $topic failed"); } - return MqttSubscription(form, client, complete, next: next, error: error); + return MqttSubscription( + form, + client, + complete, + next: next, + error: error, + ); } @override diff --git a/lib/src/binding_mqtt/mqtt_subscription.dart b/lib/src/binding_mqtt/mqtt_subscription.dart index 5588c13e..258d5673 100644 --- a/lib/src/binding_mqtt/mqtt_subscription.dart +++ b/lib/src/binding_mqtt/mqtt_subscription.dart @@ -7,15 +7,17 @@ import "package:mqtt_client/mqtt_client.dart" hide Subscription; import "package:mqtt_client/mqtt_server_client.dart"; -import "../../core.dart"; +import "../core/definitions.dart"; +import "../core/implementation/content.dart"; +import "../core/protocol_interfaces/subscription.dart"; -/// [Subscription] for the MQTT protocol. -class MqttSubscription implements Subscription { +/// [ProtocolSubscription] for the MQTT protocol. +class MqttSubscription extends ProtocolSubscription { /// Constructor. MqttSubscription( this._form, this._client, - this._complete, { + super._complete, { required void Function(Content content) next, void Function(Exception error)? error, }) : _active = true { @@ -52,8 +54,6 @@ class MqttSubscription implements Subscription { bool _active = true; - final void Function() _complete; - @override bool get active => _active; @@ -63,8 +63,10 @@ class MqttSubscription implements Subscription { Map? uriVariables, Object? data, }) async { + await super + .stop(formIndex: formIndex, uriVariables: uriVariables, data: data); + _client.disconnect(); _active = false; - _complete(); } } diff --git a/lib/src/core/implementation/consumed_thing.dart b/lib/src/core/implementation/consumed_thing.dart index b7b98471..4e72fa81 100644 --- a/lib/src/core/implementation/consumed_thing.dart +++ b/lib/src/core/implementation/consumed_thing.dart @@ -29,9 +29,9 @@ class ConsumedThing implements scripting_api.ConsumedThing { /// The [title] of the Thing. final String title; - final Map _subscribedEvents = {}; + final Set _subscribedEvents = {}; - final Map _observedProperties = {}; + final Set _observedProperties = {}; /// Determines the id of this [ConsumedThing]. String get identifier => thingDescription.identifier; @@ -229,7 +229,7 @@ class ConsumedThing implements scripting_api.ConsumedThing { ); } - if (_observedProperties.containsKey(propertyName)) { + if (_observedProperties.contains(propertyName)) { throw StateError( "ConsumedThing '$title' already has a function " "subscribed to $propertyName. You can only observe once", @@ -259,15 +259,12 @@ class ConsumedThing implements scripting_api.ConsumedThing { required Map? uriVariables, }) async { final OperationType operationType; - final Map subscriptions; switch (subscriptionType) { case scripting_api.SubscriptionType.property: operationType = OperationType.observeproperty; - subscriptions = _observedProperties; case scripting_api.SubscriptionType.event: operationType = OperationType.subscribeevent; - subscriptions = _subscribedEvents; } final (client, form) = _getClientFor( @@ -288,17 +285,10 @@ class ConsumedThing implements scripting_api.ConsumedThing { onError(error); } }, - complete: () => removeSubscription(affordanceName, subscriptionType), + complete: () => _removeSubscription(affordanceName, subscriptionType), ); - switch (subscriptionType) { - case scripting_api.SubscriptionType.property: - _observedProperties[affordanceName] = subscription; - case scripting_api.SubscriptionType.event: - _subscribedEvents[affordanceName] = subscription; - } - - subscriptions[affordanceName] = subscription; + _addSubscription(affordanceName, subscriptionType); return subscription; } @@ -376,7 +366,7 @@ class ConsumedThing implements scripting_api.ConsumedThing { ); } - if (_subscribedEvents.containsKey(eventName)) { + if (_subscribedEvents.contains(eventName)) { throw DartWotException( "ConsumedThing '$title' already has a function " "subscribed to $eventName. You can only subscribe once.", @@ -407,8 +397,20 @@ class ConsumedThing implements scripting_api.ConsumedThing { ); } + void _addSubscription( + String key, + scripting_api.SubscriptionType type, + ) { + switch (type) { + case scripting_api.SubscriptionType.property: + _observedProperties.add(key); + case scripting_api.SubscriptionType.event: + _subscribedEvents.add(key); + } + } + /// Removes a subscription with a specified [key] and [type]. - void removeSubscription(String key, scripting_api.SubscriptionType type) { + void _removeSubscription(String key, scripting_api.SubscriptionType type) { switch (type) { case scripting_api.SubscriptionType.property: _observedProperties.remove(key); diff --git a/lib/src/core/protocol_interfaces.dart b/lib/src/core/protocol_interfaces.dart index a7b7fa00..64e1f031 100644 --- a/lib/src/core/protocol_interfaces.dart +++ b/lib/src/core/protocol_interfaces.dart @@ -7,3 +7,4 @@ export "protocol_interfaces/protocol_client.dart"; export "protocol_interfaces/protocol_client_factory.dart"; export "protocol_interfaces/protocol_server.dart"; +export "protocol_interfaces/subscription.dart"; diff --git a/lib/src/core/protocol_interfaces/subscription.dart b/lib/src/core/protocol_interfaces/subscription.dart new file mode 100644 index 00000000..24586f65 --- /dev/null +++ b/lib/src/core/protocol_interfaces/subscription.dart @@ -0,0 +1,19 @@ +import "package:meta/meta.dart"; + +import "../scripting_api.dart"; + +abstract class ProtocolSubscription implements Subscription { + ProtocolSubscription(this._complete); + + final void Function() _complete; + + @override + @mustCallSuper + Future stop({ + int? formIndex, + Map? uriVariables, + Object? data, + }) async { + _complete(); + } +}