Skip to content

Commit

Permalink
feat!: rework Subscription handling
Browse files Browse the repository at this point in the history
  • Loading branch information
JKRhb committed May 24, 2024
1 parent 0f1b0ce commit 0904ca2
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 35 deletions.
1 change: 0 additions & 1 deletion lib/src/binding_coap/coap_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ final class CoapClient implements ProtocolClient {

if (subprotocol == CoapSubprotocol.observe) {
final observeClientRelation = await coapClient.observe(request);
observeClientRelation.listen(handleResponse);
return CoapSubscription(coapClient, observeClientRelation, complete);
}

Expand Down
16 changes: 7 additions & 9 deletions lib/src/binding_coap/coap_subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@

import "package:coap/coap.dart";

import "../../core.dart";
import "../core/protocol_interfaces/subscription.dart";

/// [Subscription] to a CoAP resource, based on the observe option ([RFC 7641]).
/// [ProtocolSubscription] to a CoAP resource, based on the observe option
/// ([RFC 7641]).
///
/// [RFC 7641]: https://datatracker.ietf.org/doc/html/rfc7641
class CoapSubscription implements Subscription {
class CoapSubscription extends ProtocolSubscription {
/// Constructor
CoapSubscription(
this._coapClient,
this._observeClientRelation,
this._complete,
super._complete,
) : _active = true;

final CoapClient _coapClient;
Expand All @@ -28,10 +29,6 @@ class CoapSubscription implements Subscription {
@override
bool get active => _active;

/// Callback used to pass by the servient that is used to signal it that an
/// observation has been cancelled.
final void Function() _complete;

@override
Future<void> stop({
int? formIndex,
Expand All @@ -48,6 +45,7 @@ class CoapSubscription implements Subscription {
await _coapClient.cancelObserveProactive(observeClientRelation);
}
_coapClient.close();
_complete();
await super
.stop(formIndex: formIndex, uriVariables: uriVariables, data: data);
}
}
8 changes: 7 additions & 1 deletion lib/src/binding_mqtt/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,13 @@ final class MqttClient implements ProtocolClient {
throw MqttBindingException("Subscription to topic $topic failed");
}

return MqttSubscription(form, client, complete, next: next, error: error);
return MqttSubscription(
form,
client,
complete,
next: next,
error: error,
);
}

@override
Expand Down
16 changes: 9 additions & 7 deletions lib/src/binding_mqtt/mqtt_subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
import "package:mqtt_client/mqtt_client.dart" hide Subscription;
import "package:mqtt_client/mqtt_server_client.dart";

import "../../core.dart";
import "../core/definitions.dart";
import "../core/implementation/content.dart";
import "../core/protocol_interfaces/subscription.dart";

/// [Subscription] for the MQTT protocol.
class MqttSubscription implements Subscription {
/// [ProtocolSubscription] for the MQTT protocol.
class MqttSubscription extends ProtocolSubscription {
/// Constructor.
MqttSubscription(
this._form,
this._client,
this._complete, {
super._complete, {
required void Function(Content content) next,
void Function(Exception error)? error,
}) : _active = true {
Expand Down Expand Up @@ -52,8 +54,6 @@ class MqttSubscription implements Subscription {

bool _active = true;

final void Function() _complete;

@override
bool get active => _active;

Expand All @@ -63,8 +63,10 @@ class MqttSubscription implements Subscription {
Map<String, Object>? uriVariables,
Object? data,
}) async {
await super
.stop(formIndex: formIndex, uriVariables: uriVariables, data: data);

_client.disconnect();
_active = false;
_complete();
}
}
36 changes: 19 additions & 17 deletions lib/src/core/implementation/consumed_thing.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class ConsumedThing implements scripting_api.ConsumedThing {
/// The [title] of the Thing.
final String title;

final Map<String, scripting_api.Subscription> _subscribedEvents = {};
final Set<String> _subscribedEvents = {};

final Map<String, scripting_api.Subscription> _observedProperties = {};
final Set<String> _observedProperties = {};

/// Determines the id of this [ConsumedThing].
String get identifier => thingDescription.identifier;
Expand Down Expand Up @@ -229,7 +229,7 @@ class ConsumedThing implements scripting_api.ConsumedThing {
);
}

if (_observedProperties.containsKey(propertyName)) {
if (_observedProperties.contains(propertyName)) {
throw StateError(
"ConsumedThing '$title' already has a function "
"subscribed to $propertyName. You can only observe once",
Expand Down Expand Up @@ -259,15 +259,12 @@ class ConsumedThing implements scripting_api.ConsumedThing {
required Map<String, Object>? uriVariables,
}) async {
final OperationType operationType;
final Map<String, scripting_api.Subscription> subscriptions;

switch (subscriptionType) {
case scripting_api.SubscriptionType.property:
operationType = OperationType.observeproperty;
subscriptions = _observedProperties;
case scripting_api.SubscriptionType.event:
operationType = OperationType.subscribeevent;
subscriptions = _subscribedEvents;
}

final (client, form) = _getClientFor(
Expand All @@ -288,17 +285,10 @@ class ConsumedThing implements scripting_api.ConsumedThing {
onError(error);
}
},
complete: () => removeSubscription(affordanceName, subscriptionType),
complete: () => _removeSubscription(affordanceName, subscriptionType),
);

switch (subscriptionType) {
case scripting_api.SubscriptionType.property:
_observedProperties[affordanceName] = subscription;
case scripting_api.SubscriptionType.event:
_subscribedEvents[affordanceName] = subscription;
}

subscriptions[affordanceName] = subscription;
_addSubscription(affordanceName, subscriptionType);

return subscription;
}
Expand Down Expand Up @@ -376,7 +366,7 @@ class ConsumedThing implements scripting_api.ConsumedThing {
);
}

if (_subscribedEvents.containsKey(eventName)) {
if (_subscribedEvents.contains(eventName)) {
throw DartWotException(
"ConsumedThing '$title' already has a function "
"subscribed to $eventName. You can only subscribe once.",
Expand Down Expand Up @@ -407,8 +397,20 @@ class ConsumedThing implements scripting_api.ConsumedThing {
);
}

void _addSubscription(
String key,
scripting_api.SubscriptionType type,
) {
switch (type) {
case scripting_api.SubscriptionType.property:
_observedProperties.add(key);
case scripting_api.SubscriptionType.event:
_subscribedEvents.add(key);
}
}

/// Removes a subscription with a specified [key] and [type].
void removeSubscription(String key, scripting_api.SubscriptionType type) {
void _removeSubscription(String key, scripting_api.SubscriptionType type) {
switch (type) {
case scripting_api.SubscriptionType.property:
_observedProperties.remove(key);
Expand Down
1 change: 1 addition & 0 deletions lib/src/core/protocol_interfaces.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
export "protocol_interfaces/protocol_client.dart";
export "protocol_interfaces/protocol_client_factory.dart";
export "protocol_interfaces/protocol_server.dart";
export "protocol_interfaces/subscription.dart";
19 changes: 19 additions & 0 deletions lib/src/core/protocol_interfaces/subscription.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import "package:meta/meta.dart";

import "../scripting_api.dart";

abstract class ProtocolSubscription implements Subscription {
ProtocolSubscription(this._complete);

final void Function() _complete;

@override
@mustCallSuper
Future<void> stop({
int? formIndex,
Map<String, Object>? uriVariables,
Object? data,
}) async {
_complete();
}
}

0 comments on commit 0904ca2

Please sign in to comment.