Skip to content

Commit

Permalink
qos and retain added
Browse files Browse the repository at this point in the history
  • Loading branch information
aaart committed Oct 19, 2024
1 parent 1807f12 commit 35ecc33
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Sod.Model/Events/Outgoing/Mqtt/MqttOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ public class MqttOptions
public string User { get; init; } = null!;
public string Password { get; init; } = null!;
public string? CrtPath { get; init; } = null;
public bool Retain { get; init; } = true;
public int QoS { get; init; } = 1;
}
17 changes: 17 additions & 0 deletions Sod.Model/Events/Outgoing/Mqtt/MqttOutgoingEventPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client.Publishing;
using MQTTnet.Protocol;
using Sod.Infrastructure.Capabilities;
using Sod.Model.CommonTypes;

Expand All @@ -14,13 +15,27 @@ public class MqttOutgoingEventPublisher : LoggingCapability, IOutgoingEventPubli
{
private readonly IApplicationMessagePublisher _publisher;
private readonly OutgoingEventMappings _mappings;
private bool _retain = false;
private int _qos = 1;

public MqttOutgoingEventPublisher(IApplicationMessagePublisher publisher, OutgoingEventMappings mappings)
{
_publisher = publisher;
_mappings = mappings;
}

public MqttOutgoingEventPublisher Retain(bool retain)
{
_retain = retain;
return this;
}

public MqttOutgoingEventPublisher QoS(int qos)
{
_qos = qos;
return this;
}

public async Task<IEnumerable<FailedOutgoingEvent>> PublishAsync(OutgoingEvent evnt)
{
var failed = new List<FailedOutgoingEvent>();
Expand All @@ -30,6 +45,8 @@ public async Task<IEnumerable<FailedOutgoingEvent>> PublishAsync(OutgoingEvent e
var msg = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(evnt.Value)
.WithRetainFlag(_retain)
.WithQualityOfServiceLevel((MqttQualityOfServiceLevel)_qos)
.Build();
var publishResult = await _publisher.PublishAsync(msg, CancellationToken.None);
if (publishResult.ReasonCode != MqttClientPublishReasonCode.Success) failed.Add(new FailedOutgoingEvent(evnt, FailedOutgoingEventReason.CommunicationError));
Expand Down
10 changes: 9 additions & 1 deletion Sod.Worker/Modules/InfrastructureModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,15 @@ protected override void Load(ContainerBuilder builder)
client.UseApplicationMessageReceivedHandler(x => { broker.Process(x.ApplicationMessage.Topic, Encoding.UTF8.GetString(x.ApplicationMessage.Payload)); });
});

builder.RegisterType<MqttOutgoingEventPublisher>().As<IOutgoingEventPublisher>().SingleInstance();
builder
.RegisterType<MqttOutgoingEventPublisher>()
.OnActivated(activatedEventArgs =>
{
var opt = activatedEventArgs.Context.Resolve<MqttOptions>();
activatedEventArgs.Instance.Retain(opt.Retain).QoS(opt.QoS);
})
.As<IOutgoingEventPublisher>()
.SingleInstance();

builder.RegisterType<InMemoryTaskQueue>().As<ITaskQueue>().SingleInstance();
builder.RegisterType<TaskPlanner>().As<ITaskPlanner>().SingleInstance();
Expand Down
4 changes: 3 additions & 1 deletion Sod.Worker/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
"Port": 0,
"User": "",
"Password": "",
"CrtPath": null
"CrtPath": null,
"Retain": true,
"Qos": 1
},
"Satel": {
"Address": "",
Expand Down

0 comments on commit 35ecc33

Please sign in to comment.