Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework clean up of ConsumedThing #162

Merged
merged 4 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions lib/src/binding_coap/coap_subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import "package:coap/coap.dart";

import "../../core.dart";

/// [Subscription] to a CoAP resource, based on the observe option ([RFC 7641]).
/// [ProtocolSubscription] to a CoAP resource, based on the observe option
/// ([RFC 7641]).
///
/// [RFC 7641]: https://datatracker.ietf.org/doc/html/rfc7641
class CoapSubscription implements Subscription {
final class CoapSubscription extends ProtocolSubscription {
/// Constructor
CoapSubscription(
this._coapClient,
this._observeClientRelation,
this._complete,
super._complete,
) : _active = true;

final CoapClient _coapClient;
Expand All @@ -28,10 +29,6 @@ class CoapSubscription implements Subscription {
@override
bool get active => _active;

/// Callback used to pass by the servient that is used to signal it that an
/// observation has been cancelled.
final void Function() _complete;

@override
Future<void> stop({
int? formIndex,
Expand All @@ -48,6 +45,7 @@ class CoapSubscription implements Subscription {
await _coapClient.cancelObserveProactive(observeClientRelation);
}
_coapClient.close();
_complete();
await super
.stop(formIndex: formIndex, uriVariables: uriVariables, data: data);
}
}
11 changes: 5 additions & 6 deletions lib/src/binding_mqtt/mqtt_subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import "package:mqtt_client/mqtt_server_client.dart";

import "../../core.dart";

/// [Subscription] for the MQTT protocol.
class MqttSubscription implements Subscription {
/// [ProtocolSubscription] for the MQTT protocol.
final class MqttSubscription extends ProtocolSubscription {
/// Constructor.
MqttSubscription(
this._form,
this._client,
this._complete, {
super._complete, {
required void Function(Content content) next,
void Function(Exception error)? error,
}) : _active = true {
Expand Down Expand Up @@ -52,8 +52,6 @@ class MqttSubscription implements Subscription {

bool _active = true;

final void Function() _complete;

@override
bool get active => _active;

Expand All @@ -65,6 +63,7 @@ class MqttSubscription implements Subscription {
}) async {
_client.disconnect();
_active = false;
_complete();
await super
.stop(formIndex: formIndex, uriVariables: uriVariables, data: data);
}
}
54 changes: 19 additions & 35 deletions lib/src/core/implementation/consumed_thing.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class ConsumedThing implements scripting_api.ConsumedThing {
/// The [title] of the Thing.
final String title;

final Map<String, scripting_api.Subscription> _subscribedEvents = {};
final Set<String> _subscribedEvents = {};

final Map<String, scripting_api.Subscription> _observedProperties = {};
final Set<String> _observedProperties = {};

/// Determines the id of this [ConsumedThing].
String get identifier => thingDescription.identifier;
Expand Down Expand Up @@ -229,7 +229,7 @@ class ConsumedThing implements scripting_api.ConsumedThing {
);
}

if (_observedProperties.containsKey(propertyName)) {
if (_observedProperties.contains(propertyName)) {
throw StateError(
"ConsumedThing '$title' already has a function "
"subscribed to $propertyName. You can only observe once",
Expand Down Expand Up @@ -259,15 +259,12 @@ class ConsumedThing implements scripting_api.ConsumedThing {
required Map<String, Object>? uriVariables,
}) async {
final OperationType operationType;
final Map<String, scripting_api.Subscription> subscriptions;

switch (subscriptionType) {
case scripting_api.SubscriptionType.property:
operationType = OperationType.observeproperty;
subscriptions = _observedProperties;
case scripting_api.SubscriptionType.event:
operationType = OperationType.subscribeevent;
subscriptions = _subscribedEvents;
}

final (client, form) = _getClientFor(
Expand All @@ -288,17 +285,10 @@ class ConsumedThing implements scripting_api.ConsumedThing {
onError(error);
}
},
complete: () => removeSubscription(affordanceName, subscriptionType),
complete: () => _removeSubscription(affordanceName, subscriptionType),
);

switch (subscriptionType) {
case scripting_api.SubscriptionType.property:
_observedProperties[affordanceName] = subscription;
case scripting_api.SubscriptionType.event:
_subscribedEvents[affordanceName] = subscription;
}

subscriptions[affordanceName] = subscription;
_addSubscription(affordanceName, subscriptionType);

return subscription;
}
Expand Down Expand Up @@ -376,7 +366,7 @@ class ConsumedThing implements scripting_api.ConsumedThing {
);
}

if (_subscribedEvents.containsKey(eventName)) {
if (_subscribedEvents.contains(eventName)) {
throw DartWotException(
"ConsumedThing '$title' already has a function "
"subscribed to $eventName. You can only subscribe once.",
Expand Down Expand Up @@ -407,31 +397,25 @@ class ConsumedThing implements scripting_api.ConsumedThing {
);
}

/// Removes a subscription with a specified [key] and [type].
void removeSubscription(String key, scripting_api.SubscriptionType type) {
void _addSubscription(
String key,
scripting_api.SubscriptionType type,
) {
switch (type) {
case scripting_api.SubscriptionType.property:
_observedProperties.remove(key);
_observedProperties.add(key);
case scripting_api.SubscriptionType.event:
_subscribedEvents.remove(key);
_subscribedEvents.add(key);
}
}

/// Cleans up the resources used by this [ConsumedThing].
bool destroy({bool external = true}) {
for (final observedProperty in _observedProperties.values) {
observedProperty.stop();
}
_observedProperties.clear();
for (final subscribedEvent in _subscribedEvents.values) {
subscribedEvent.stop();
}
_subscribedEvents.clear();

if (external) {
return servient.deregisterConsumedThing(this);
/// Removes a subscription with a specified [key] and [type].
void _removeSubscription(String key, scripting_api.SubscriptionType type) {
switch (type) {
case scripting_api.SubscriptionType.property:
_observedProperties.remove(key);
case scripting_api.SubscriptionType.event:
_subscribedEvents.remove(key);
}

return false;
}
}
36 changes: 2 additions & 34 deletions lib/src/core/implementation/servient.dart
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ class InternalServient implements Servient {
final List<ProtocolServer> _servers = [];
final Map<String, ProtocolClientFactory> _clientFactories = {};
final Map<String, ExposedThing> _things = {};
final Set<ConsumedThing> _consumedThings = {};

final ServerSecurityCallback? _serverSecurityCallback;

Expand Down Expand Up @@ -127,10 +126,6 @@ class InternalServient implements Servient {
clientFactory.destroy();
}
_clientFactories.clear();
for (final consumedThing in _consumedThings) {
consumedThing.destroy();
}
_consumedThings.clear();

final serverStatuses = _servers.map((server) => server.stop()).toList();
await Future.wait(serverStatuses);
Expand Down Expand Up @@ -177,29 +172,6 @@ class InternalServient implements Servient {
return true;
}

/// Removes and cleans up the resources of a [ConsumedThing].
///
/// If the [ConsumedThing] has not been registered before, `false` is
/// returned, otherwise `true`.
bool destroyConsumedThing(ConsumedThing consumedThing) {
return consumedThing.destroy(external: false);
}

/// De-registers the given [consumedThing].
///
/// If the [ConsumedThing] has not been registered before, `false` is
/// returned, otherwise `true`.
bool deregisterConsumedThing(ConsumedThing consumedThing) {
return _consumedThings.remove(consumedThing);
}

/// Adds a [ConsumedThing] to the servient if it hasn't been registered
/// before.
///
/// Returns `false` if the [thing] has already been registered, otherwise
/// `true`.
bool addConsumedThing(ConsumedThing thing) => _consumedThings.add(thing);

/// Returns an [ExposedThing] with the given [id] if it has been registered.
ExposedThing? thing(String id) => _things[id];

Expand Down Expand Up @@ -270,12 +242,8 @@ class InternalServient implements Servient {
/// Consumes a [ThingDescription] and returns a [scripting_api.ConsumedThing].
Future<scripting_api.ConsumedThing> consume(
ThingDescription thingDescription,
) async {
final newThing = ConsumedThing(this, thingDescription);
addConsumedThing(newThing);

return newThing;
}
) async =>
ConsumedThing(this, thingDescription);

/// Exposes a Thing based on an [scripting_api.ExposedThingInit].
Future<scripting_api.ExposedThing> produce(
Expand Down
1 change: 1 addition & 0 deletions lib/src/core/protocol_interfaces.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
export "protocol_interfaces/protocol_client.dart";
export "protocol_interfaces/protocol_client_factory.dart";
export "protocol_interfaces/protocol_server.dart";
export "protocol_interfaces/protocol_subscription.dart";
30 changes: 30 additions & 0 deletions lib/src/core/protocol_interfaces/protocol_subscription.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024 Contributors to the Eclipse Foundation. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//
// SPDX-License-Identifier: BSD-3-Clause

import "package:meta/meta.dart";

import "../scripting_api.dart";

/// Base class for implementations of the [Subscription] interface.
abstract base class ProtocolSubscription implements Subscription {
/// Instantiates a new [ProtocolSubscription].
///
/// The [_complete] callback will be called when the [ProtocolSubscription]
/// has been [stop]ped (either internally or externally).
ProtocolSubscription(this._complete);

final void Function() _complete;

@override
@mustCallSuper
Future<void> stop({
int? formIndex,
Map<String, Object>? uriVariables,
Object? data,
}) async {
_complete();
}
}
Loading