diff --git a/Sod.Model/Events/Outgoing/Mqtt/MqttOptions.cs b/Sod.Model/Events/Outgoing/Mqtt/MqttOptions.cs index 1a1331e..0a4be4b 100644 --- a/Sod.Model/Events/Outgoing/Mqtt/MqttOptions.cs +++ b/Sod.Model/Events/Outgoing/Mqtt/MqttOptions.cs @@ -7,4 +7,6 @@ public class MqttOptions public string User { get; init; } = null!; public string Password { get; init; } = null!; public string? CrtPath { get; init; } = null; + public bool Retain { get; init; } = true; + public int QoS { get; init; } = 1; } \ No newline at end of file diff --git a/Sod.Model/Events/Outgoing/Mqtt/MqttOutgoingEventPublisher.cs b/Sod.Model/Events/Outgoing/Mqtt/MqttOutgoingEventPublisher.cs index 93efeab..03f754a 100644 --- a/Sod.Model/Events/Outgoing/Mqtt/MqttOutgoingEventPublisher.cs +++ b/Sod.Model/Events/Outgoing/Mqtt/MqttOutgoingEventPublisher.cs @@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging; using MQTTnet; using MQTTnet.Client.Publishing; +using MQTTnet.Protocol; using Sod.Infrastructure.Capabilities; using Sod.Model.CommonTypes; @@ -14,6 +15,8 @@ public class MqttOutgoingEventPublisher : LoggingCapability, IOutgoingEventPubli { private readonly IApplicationMessagePublisher _publisher; private readonly OutgoingEventMappings _mappings; + private bool _retain = false; + private int _qos = 1; public MqttOutgoingEventPublisher(IApplicationMessagePublisher publisher, OutgoingEventMappings mappings) { @@ -21,6 +24,18 @@ public MqttOutgoingEventPublisher(IApplicationMessagePublisher publisher, Outgoi _mappings = mappings; } + public MqttOutgoingEventPublisher Retain(bool retain) + { + _retain = retain; + return this; + } + + public MqttOutgoingEventPublisher QoS(int qos) + { + _qos = qos; + return this; + } + public async Task> PublishAsync(OutgoingEvent evnt) { var failed = new List(); @@ -30,6 +45,8 @@ public async Task> PublishAsync(OutgoingEvent e var msg = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(evnt.Value) + .WithRetainFlag(_retain) + .WithQualityOfServiceLevel((MqttQualityOfServiceLevel)_qos) .Build(); var publishResult = await _publisher.PublishAsync(msg, CancellationToken.None); if (publishResult.ReasonCode != MqttClientPublishReasonCode.Success) failed.Add(new FailedOutgoingEvent(evnt, FailedOutgoingEventReason.CommunicationError)); diff --git a/Sod.Worker/Modules/InfrastructureModule.cs b/Sod.Worker/Modules/InfrastructureModule.cs index 7c0bb5b..278bc58 100644 --- a/Sod.Worker/Modules/InfrastructureModule.cs +++ b/Sod.Worker/Modules/InfrastructureModule.cs @@ -112,7 +112,15 @@ protected override void Load(ContainerBuilder builder) client.UseApplicationMessageReceivedHandler(x => { broker.Process(x.ApplicationMessage.Topic, Encoding.UTF8.GetString(x.ApplicationMessage.Payload)); }); }); - builder.RegisterType().As().SingleInstance(); + builder + .RegisterType() + .OnActivated(activatedEventArgs => + { + var opt = activatedEventArgs.Context.Resolve(); + activatedEventArgs.Instance.Retain(opt.Retain).QoS(opt.QoS); + }) + .As() + .SingleInstance(); builder.RegisterType().As().SingleInstance(); builder.RegisterType().As().SingleInstance(); diff --git a/Sod.Worker/appsettings.json b/Sod.Worker/appsettings.json index de17695..9c00478 100644 --- a/Sod.Worker/appsettings.json +++ b/Sod.Worker/appsettings.json @@ -9,7 +9,9 @@ "Port": 0, "User": "", "Password": "", - "CrtPath": null + "CrtPath": null, + "Retain": true, + "Qos": 1 }, "Satel": { "Address": "",