Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor message Ack/Nack using events #2068

Merged
merged 5 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions news/2068-feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor message Ack/Nack using events
19 changes: 19 additions & 0 deletions src/SmiServices/Common/Events/MessageResultHandlers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using RabbitMQ.Client.Events;
using SmiServices.Common.Messages;

namespace SmiServices.Common.Events;

public delegate void AckEventHandler(object sender, BasicAckEventArgs args);

public delegate void NackEventHandler(object sender, BasicNackEventArgs args);

public delegate void SmiAckEventHandler(object sender, SmiAckEventArgs args);

/// <summary>
/// Subclass of <see cref="BasicAckEventArgs"/> including the relevant <see cref="IMessageHeader"/>
/// </summary>
/// <param name="header"></param>
public class SmiAckEventArgs(IMessageHeader header) : BasicAckEventArgs
{
public IMessageHeader Header { get; set; } = header;
}
25 changes: 10 additions & 15 deletions src/SmiServices/Common/Messaging/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public abstract class Consumer<TMessage> : IConsumer where TMessage : IMessage
/// Event raised when Fatal method called
/// </summary>
public event ConsumerFatalHandler? OnFatal;

public event AckEventHandler? OnAck;
public event NackEventHandler? OnNack;

protected readonly ILogger Logger;

Expand Down Expand Up @@ -191,29 +192,22 @@ protected bool SafeDeserializeToMessage<T>(IMessageHeader header, BasicDeliverEv
/// Instructs RabbitMQ to discard a single message and not requeue it
/// </summary>
/// <param name="tag"></param>
protected void DiscardSingleMessage(ulong tag)
private void DiscardSingleMessage(ulong tag)
{
Model!.BasicNack(tag, multiple: false, requeue: false);
OnNack?.Invoke(this, new BasicNackEventArgs { DeliveryTag = tag, Multiple = false, Requeue = false });
NackCount++;
}

protected virtual void ErrorAndNack(IMessageHeader header, ulong tag, string message, Exception exception)
{
header?.Log(Logger, LogLevel.Error, message, exception);

header.Log(Logger, LogLevel.Error, message, exception);
DiscardSingleMessage(tag);
}

/// <summary>
///
/// </summary>
/// <param name="header"></param>
/// <param name="tag"></param>
protected void Ack(IMessageHeader header, ulong tag)
protected void Ack(IMessageHeader header, ulong deliveryTag)
{
header?.Log(Logger, LogLevel.Trace, $"Acknowledged {header.MessageGuid}");

Model!.BasicAck(tag, false);
OnAck?.Invoke(this, new BasicAckEventArgs { DeliveryTag = deliveryTag, Multiple = false });
header.Log(Logger, LogLevel.Trace, $"Acknowledged {header.MessageGuid}");
AckCount++;
}

Expand All @@ -228,8 +222,9 @@ protected void Ack(IList<IMessageHeader> batchHeaders, ulong latestDeliveryTag)
foreach (IMessageHeader header in batchHeaders)
header.Log(Logger, LogLevel.Trace, "Acknowledged");

Model!.BasicAck(latestDeliveryTag, true);
AckCount += batchHeaders.Count;

OnAck?.Invoke(this, new BasicAckEventArgs { DeliveryTag = latestDeliveryTag, Multiple = true });
}

/// <summary>
Expand Down
10 changes: 10 additions & 0 deletions src/SmiServices/Common/Messaging/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ public interface IConsumer
/// <param name="basicDeliverEventArgs">The message and all associated information.</param>
void ProcessMessage(BasicDeliverEventArgs basicDeliverEventArgs);

/// <summary>
/// Callback raised when Ack-ing a message
/// </summary>
event AckEventHandler OnAck;

/// <summary>
/// Callback raised when Nack-ing a message
/// </summary>
event NackEventHandler OnNack;

/// <summary>
///
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions src/SmiServices/Common/Messaging/RabbitMQBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ void shutdown(object? o, ShutdownEventArgs a)
if (consumerOptions.HoldUnprocessableMessages && !consumerOptions.AutoAck)
consumer.HoldUnprocessableMessages = true;

consumer.OnAck += (_, a) => { ebc.Model.BasicAck(a.DeliveryTag, a.Multiple); };
consumer.OnNack += (_, a) => { ebc.Model.BasicNack(a.DeliveryTag, a.Multiple, a.Requeue); };

model.BasicConsume(ebc, consumerOptions.QueueName, consumerOptions.AutoAck);
_logger.Debug($"Consumer task started [QueueName={consumerOptions?.QueueName}]");
return taskId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public MongoDbPopulatorMessageConsumer(MongoDbOptions mongoDbOptions, MongoDbPop

ConsumerOptions = consumerOptions;
Logger.Debug(_messageTypePrefix + "Constructed for " + typeof(T).Name);

Processor.OnAck += (o, a) => { Ack(a.Header, a.DeliveryTag); };
}

private void ExceptionCallback(Exception e)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using RabbitMQ.Client;
using SmiServices.Common.Events;
using SmiServices.Common.Messages;

namespace SmiServices.Microservices.MongoDBPopulator.Processing
Expand Down Expand Up @@ -30,6 +31,8 @@ public interface IMessageProcessor
/// </summary>
IModel? Model { get; set; }

event SmiAckEventHandler? OnAck;

/// <summary>
/// Count of the total number of acknowledged messages during this processors lifetime
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public ImageMessageProcessor(MongoDbPopulatorOptions options, IMongoDbAdapter mo
: base(options, mongoDbAdapter, maxQueueSize, exceptionCallback) { }


public override void AddToWriteQueue(DicomFileMessage message, IMessageHeader? header, ulong deliveryTag)
public override void AddToWriteQueue(DicomFileMessage message, IMessageHeader header, ulong deliveryTag)
{
// Only time we are not processing is if we are shutting down anyway
if (IsStopping)
Expand Down Expand Up @@ -80,7 +80,7 @@ public override void AddToWriteQueue(DicomFileMessage message, IMessageHeader? h

lock (LockObj)
{
ToProcess.Enqueue(new Tuple<BsonDocument, ulong>(document, deliveryTag));
ToProcess.Enqueue(new Tuple<BsonDocument, IMessageHeader, ulong>(document, header, deliveryTag));

if (ToProcess.Count >= MaxQueueSize)
forceProcess = true;
Expand Down Expand Up @@ -123,11 +123,12 @@ protected override void ProcessQueue()
Logger.Debug($"Wrote {modalityDocs.Count} documents successfully, sending ACKs");

// Hopefully this uses ReferenceEquals, otherwise will be slow...
foreach (ulong deliveryTag in ToProcess
.Where(x => modalityDocs.Contains(x.Item1))
.Select(x => x.Item2))
foreach (
var (_, header, deliveryTag) in
ToProcess.Where(x => modalityDocs.Contains(x.Item1))
)
{
Model.BasicAck(deliveryTag, false);
Ack(header, deliveryTag);
}

AckCount += modalityDocs.Count;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using MongoDB.Bson;
using NLog;
using RabbitMQ.Client;
using SmiServices.Common.Events;
using SmiServices.Common.Messages;
using SmiServices.Common.Options;
using System;
Expand Down Expand Up @@ -40,6 +41,8 @@ public abstract class MessageProcessor<T> : IMessageProcessor<T> where T : IMess
/// </summary>
public IModel? Model { get; set; }

public event SmiAckEventHandler? OnAck;

/// <inheritdoc />
/// <summary>
/// Indicates if the object is actively processing messages
Expand All @@ -61,7 +64,7 @@ public abstract class MessageProcessor<T> : IMessageProcessor<T> where T : IMess
protected int FailedWriteAttempts;
protected readonly int FailedWriteLimit;

protected readonly Queue<Tuple<BsonDocument, ulong>> ToProcess = new();
protected readonly Queue<Tuple<BsonDocument, IMessageHeader, ulong>> ToProcess = new();
protected readonly int MaxQueueSize;
protected readonly object LockObj = new();
private readonly SysTimers.Timer _processTimer;
Expand Down Expand Up @@ -120,6 +123,11 @@ protected void StopProcessing()
Logger.Debug("Lock released, no more messages will be processed");
}

protected void Ack(IMessageHeader header, ulong deliveryTag)
{
OnAck?.Invoke(this, new SmiAckEventArgs(header) { DeliveryTag = deliveryTag, Multiple = false });
}

#endregion
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public override void AddToWriteQueue(SeriesMessage message, IMessageHeader heade

lock (LockObj)
{
ToProcess.Enqueue(new Tuple<BsonDocument, ulong>(document, deliveryTag));
ToProcess.Enqueue(new Tuple<BsonDocument, IMessageHeader, ulong>(document, header, deliveryTag));

if (ToProcess.Count >= MaxQueueSize)
forceProcess = true;
Expand Down Expand Up @@ -114,8 +114,8 @@ protected override void ProcessQueue()
{
Logger.Debug("SeriesMessageProcessor: Wrote " + ToProcess.Count + " messages successfully, sending ACKs");

foreach (ulong deliveryTag in ToProcess.Select(t => t.Item2))
Model.BasicAck(deliveryTag, false);
foreach (var (_, header, deliveryTag) in ToProcess)
Ack(header, deliveryTag);

AckCount += ToProcess.Count;
ToProcess.Clear();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using SmiServices.Common.Options;
using SmiServices.Microservices.MongoDBPopulator;
using SmiServices.Microservices.MongoDBPopulator.Processing;
using SmiServices.UnitTests.Common;
using SmiServices.UnitTests.Microservices.MongoDbPopulator;
using System;
using System.Threading;
Expand Down Expand Up @@ -64,7 +63,7 @@ public TestMessageProcessor(MongoDbPopulatorOptions options, IMongoDbAdapter mon

public override void AddToWriteQueue(SeriesMessage message, IMessageHeader header, ulong deliveryTag)
{
ToProcess.Enqueue(new Tuple<BsonDocument, ulong>(new BsonDocument { { "hello", "world" } }, deliveryTag));
ToProcess.Enqueue(new Tuple<BsonDocument, IMessageHeader, ulong>(new BsonDocument { { "hello", "world" } }, new MessageHeader(), deliveryTag));
}

public override void StopProcessing(string reason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,13 @@ private void AssertMessagePublishedWithSpecifiedKey(GlobalOptions globals, bool

var mockModel = new Mock<IModel>(MockBehavior.Strict);
mockModel.Setup(x => x.IsClosed).Returns(false);
mockModel.Setup(x => x.BasicAck(It.IsAny<ulong>(), It.IsAny<bool>())).Verifiable();

consumer.SetModel(mockModel.Object);
consumer.TestMessage(msg);

Thread.Sleep(500); // Fatal call is race-y
Assert.That(fatalCalled, Is.False, $"Fatal was called with {fatalErrorEventArgs}");
mockModel.Verify(x => x.BasicAck(It.IsAny<ulong>(), It.IsAny<bool>()), Times.Once);
Assert.That(consumer.AckCount, Is.EqualTo(1));
Assert.That(fileMessageRoutingKey, Is.EqualTo(expectedRoutingKey));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using SmiServices.Common.Messages.Extraction;
using SmiServices.Microservices.CohortPackager;
using SmiServices.Microservices.CohortPackager.ExtractJobStorage;
using SmiServices.UnitTests.Common;
using System;
using System.Collections.Concurrent;
using System.Threading;
Expand Down Expand Up @@ -52,7 +51,7 @@ public void SetUp()
{
while (_writeQueueCount > 0)
{
_processedList.Enqueue(new Tuple<IMessageHeader, ulong>(null!, 0));
_processedList.Enqueue(new Tuple<IMessageHeader, ulong>(new MessageHeader(), 0));
--_writeQueueCount;
}
});
Expand All @@ -64,7 +63,9 @@ public void TearDown() { }
private AnonVerificationMessageConsumer NewConsumer(bool processBatches, int maxUnacknowledgedMessages, TimeSpan verificationMessageQueueFlushTime)
{
var consumer = new AnonVerificationMessageConsumer(_mockJobStore.Object, processBatches, maxUnacknowledgedMessages, verificationMessageQueueFlushTime);
consumer.SetModel(new Mock<IModel>(MockBehavior.Loose).Object);
var mockModel = new Mock<IModel>(MockBehavior.Strict);
mockModel.Setup(x => x.IsClosed).Returns(false);
consumer.SetModel(mockModel.Object);
return consumer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using SmiServices.Common.Options;
using SmiServices.Microservices.MongoDBPopulator;
using SmiServices.Microservices.MongoDBPopulator.Processing;
using SmiServices.UnitTests.Common;
using System;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -64,7 +63,7 @@ public void ImageProcessor_FailInModalityBatch_AcksWrittenDocuments()
string modality = testModalities[i];
ds.AddOrUpdate(DicomTag.Modality, modality);
msg.DicomDataset = DicomTypeTranslater.SerializeDatasetToJson(ds);
processor.AddToWriteQueue(msg, null, (ulong)i);
processor.AddToWriteQueue(msg, new MessageHeader(), (ulong)i);
}

ds.AddOrUpdate(DicomTag.Modality, "CT");
Expand Down
Loading