Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message refactoring #2079

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Globalization;
using System.IO;
using SmiServices.Common.Messages;

namespace SmiServices.Applications.DicomDirectoryProcessor
{
Expand Down Expand Up @@ -53,25 +54,25 @@ public DicomDirectoryProcessorHost(GlobalOptions globals, DicomDirectoryProcesso
Logger.Info("Creating PACS directory finder");

_ddf = new PacsDirectoryFinder(globals.FileSystemOptions!.FileSystemRoot!,
globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer<AccessionDirectoryMessage>(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
break;
case "list":
Logger.Info("Creating accession directory lister");

_ddf = new AccessionDirectoryLister(globals.FileSystemOptions!.FileSystemRoot!,
globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer<AccessionDirectoryMessage>(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
break;
case "default":
Logger.Info("Creating basic directory finder");

_ddf = new BasicDicomDirectoryFinder(globals.FileSystemOptions!.FileSystemRoot!,
globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer<AccessionDirectoryMessage>(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
break;
case "zips":
Logger.Info("Creating zip directory finder");

_ddf = new ZipDicomDirectoryFinder(globals.FileSystemOptions!.FileSystemRoot!,
globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer<AccessionDirectoryMessage>(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
break;
default:
throw new ArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.IO.Abstractions;
using System.Linq;
using System.Text.RegularExpressions;
using SmiServices.Common.Messages;

namespace SmiServices.Applications.DicomDirectoryProcessor.DirectoryFinders
{
Expand All @@ -12,10 +13,10 @@ public class AccessionDirectoryLister : DicomDirectoryFinder
// Regex that matches when we are at the yyyy\mm\dd\xxxxx directory level
private static readonly Regex _accDirectoryRegex = new(@"(20\d{2}[\\\/]\d{2}[\\\/]\d{2}[\\\/][a-zA-Z0-9._-]+[\\\/]$)");

public AccessionDirectoryLister(string fileSystemRoot, IFileSystem fileSystem, string dicomSearchPattern, IProducerModel directoriesProducerModel)
: base(fileSystemRoot, fileSystem, dicomSearchPattern, directoriesProducerModel) { }
public AccessionDirectoryLister(string fileSystemRoot, IFileSystem fileSystem, string dicomSearchPattern, IProducerModel<AccessionDirectoryMessage> directoriesProducerModel)
: base(fileSystemRoot, fileSystem, dicomSearchPattern, directoriesProducerModel) { }

public AccessionDirectoryLister(string fileSystemRoot, string dicomSearchPattern, IProducerModel directoriesProducerModel)
public AccessionDirectoryLister(string fileSystemRoot, string dicomSearchPattern, IProducerModel<AccessionDirectoryMessage> directoriesProducerModel)
: this(fileSystemRoot, new FileSystem(), dicomSearchPattern, directoriesProducerModel) { }


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO.Abstractions;
using System.Linq;
using System.Text;
using SmiServices.Common.Messages;

namespace SmiServices.Applications.DicomDirectoryProcessor.DirectoryFinders
{
Expand All @@ -16,10 +17,10 @@ public class BasicDicomDirectoryFinder : DicomDirectoryFinder
/// </summary>
public bool AlwaysSearchSubdirectories { get; set; }

public BasicDicomDirectoryFinder(string fileSystemRoot, IFileSystem fileSystem, string dicomSearchPattern, IProducerModel directoriesProducerModel)
public BasicDicomDirectoryFinder(string fileSystemRoot, IFileSystem fileSystem, string dicomSearchPattern, IProducerModel<AccessionDirectoryMessage> directoriesProducerModel)
: base(fileSystemRoot, fileSystem, dicomSearchPattern, directoriesProducerModel) { }

public BasicDicomDirectoryFinder(string fileSystemRoot, string dicomSearchPattern, IProducerModel directoriesProducerModel)
public BasicDicomDirectoryFinder(string fileSystemRoot, string dicomSearchPattern, IProducerModel<AccessionDirectoryMessage> directoriesProducerModel)
: this(fileSystemRoot, new FileSystem(), dicomSearchPattern, directoriesProducerModel) { }


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public abstract class DicomDirectoryFinder : IDicomDirectoryFinder
protected readonly string FileSystemRoot;
protected readonly IFileSystem FileSystem;

private readonly IProducerModel _directoriesProducerModel;
private readonly IProducerModel<AccessionDirectoryMessage> _directoriesProducerModel;
protected int TotalSent;

protected bool IsProcessing;
Expand Down Expand Up @@ -54,7 +54,7 @@ protected DicomDirectoryFinder(
string fileSystemRoot,
IFileSystem fileSystem,
string dicomSearchPattern,
IProducerModel directoriesProducerModel
IProducerModel<AccessionDirectoryMessage> directoriesProducerModel
)
{
FileSystemRoot = fileSystemRoot;
Expand Down Expand Up @@ -96,9 +96,9 @@ protected void FoundNewDicomDirectory(string dir)
{
Logger.Debug("DicomDirectoryFinder: Found " + dir);

string dirPath = Path.GetFullPath(dir).TrimEnd(Path.DirectorySeparatorChar);
var dirPath = Path.GetFullPath(dir).TrimEnd(Path.DirectorySeparatorChar);

if (dirPath.StartsWith(FileSystemRoot))
if (dirPath.StartsWith(FileSystemRoot, StringComparison.Ordinal))
dirPath = dirPath.Remove(0, FileSystemRoot.Length);

dirPath = dirPath.TrimStart(Path.DirectorySeparatorChar);
Expand All @@ -108,29 +108,28 @@ protected void FoundNewDicomDirectory(string dir)
DirectoryPath = dirPath,
};

_directoriesProducerModel.SendMessage(message, isInResponseTo: null, routingKey: null);
_directoriesProducerModel.SendMessage(message, null, null);
++TotalSent;
}

protected void LogTime(TimeLabel tl)
{
long elapsed = Stopwatch.ElapsedMilliseconds;
StringBuilder!.Append(tl + "=" + elapsed + "ms ");
Times![(int)tl].Add(elapsed);
var elapsed = Stopwatch.ElapsedMilliseconds;
StringBuilder?.Append($"{tl}={elapsed}ms ");
Times?[(int)tl].Add(elapsed);
Stopwatch.Restart();
}

protected string CalcAverages()
{
var sb = new StringBuilder();
sb.AppendLine("Averages:");
var sb = new StringBuilder("Averages:");

foreach (TimeLabel label in (TimeLabel[])Enum.GetValues(typeof(TimeLabel)))
foreach (var label in Enum.GetValues<TimeLabel>())
{
int count = Times![(int)label].Count;
long average = count == 0 ? 0 : Times[(int)label].Sum() / count;
var count = Times![(int)label].Count;
var average = count == 0 ? 0 : Times[(int)label].Sum() / count;

sb.AppendLine(label + ":\t" + average + "ms");
sb.AppendLine($"{label}:\t{average}ms");
}

return sb.ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.IO.Abstractions;
using System.Linq;
using System.Text.RegularExpressions;
using SmiServices.Common.Messages;

namespace SmiServices.Applications.DicomDirectoryProcessor.DirectoryFinders
{
Expand All @@ -15,10 +16,10 @@ public class PacsDirectoryFinder : DicomDirectoryFinder
private readonly Regex _accDirectoryRegex = new(@"(20\d{2}[\\\/]\d{2}[\\\/]\d{2}[\\\/][a-zA-Z0-9._-]+[\\\/]$)");


public PacsDirectoryFinder(string fileSystemRoot, IFileSystem fileSystem, string dicomSearchPattern, IProducerModel directoriesProducerModel)
public PacsDirectoryFinder(string fileSystemRoot, IFileSystem fileSystem, string dicomSearchPattern, IProducerModel<AccessionDirectoryMessage> directoriesProducerModel)
: base(fileSystemRoot, fileSystem, dicomSearchPattern, directoriesProducerModel) { }

public PacsDirectoryFinder(string fileSystemRoot, string dicomSearchPattern, IProducerModel directoriesProducerModel)
public PacsDirectoryFinder(string fileSystemRoot, string dicomSearchPattern, IProducerModel<AccessionDirectoryMessage> directoriesProducerModel)
: base(fileSystemRoot, new FileSystem(), dicomSearchPattern, directoriesProducerModel) { }


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.IO.Abstractions;
using System.Linq;
using SmiServices.Common.Messages;

namespace SmiServices.Applications.DicomDirectoryProcessor.DirectoryFinders
{
Expand All @@ -12,13 +13,13 @@ namespace SmiServices.Applications.DicomDirectoryProcessor.DirectoryFinders
/// </summary>
public class ZipDicomDirectoryFinder : BasicDicomDirectoryFinder
{
public ZipDicomDirectoryFinder(string fileSystemRoot, IFileSystem fileSystem, string dicomSearchPattern, IProducerModel directoriesProducerModel)
public ZipDicomDirectoryFinder(string fileSystemRoot, IFileSystem fileSystem, string dicomSearchPattern, IProducerModel<AccessionDirectoryMessage> directoriesProducerModel)
: base(fileSystemRoot, fileSystem, dicomSearchPattern, directoriesProducerModel)
{
AlwaysSearchSubdirectories = true;
}

public ZipDicomDirectoryFinder(string fileSystemRoot, string dicomSearchPattern, IProducerModel directoriesProducerModel)
public ZipDicomDirectoryFinder(string fileSystemRoot, string dicomSearchPattern, IProducerModel<AccessionDirectoryMessage> directoriesProducerModel)
: this(fileSystemRoot, new FileSystem(), dicomSearchPattern, directoriesProducerModel) { }


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public ExtractImagesHost(

if (extractionMessageSender == null)
{
IProducerModel extractionRequestProducer = MessageBroker.SetupProducer(options.ExtractionRequestProducerOptions!, isBatch: false);
IProducerModel extractionRequestInfoProducer = MessageBroker.SetupProducer(options.ExtractionRequestInfoProducerOptions!, isBatch: false);
var extractionRequestProducer = MessageBroker.SetupProducer<ExtractionRequestMessage>(options.ExtractionRequestProducerOptions!, isBatch: false);
var extractionRequestInfoProducer = MessageBroker.SetupProducer<ExtractionRequestInfoMessage>(options.ExtractionRequestInfoProducerOptions!, isBatch: false);

_extractionMessageSender = new ExtractionMessageSender(
options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ public class ExtractionMessageSender : IExtractionMessageSender
{
private readonly ILogger _logger = LogManager.GetCurrentClassLogger();

private readonly IProducerModel _extractionRequestProducer;
private readonly IProducerModel _extractionRequestInfoProducer;
private readonly IProducerModel<ExtractionRequestMessage> _extractionRequestProducer;
private readonly IProducerModel<ExtractionRequestInfoMessage> _extractionRequestInfoProducer;
private readonly IFileSystem _fileSystem;
private readonly string _extractionRoot;
private readonly string _extractionDir;
Expand All @@ -36,8 +36,8 @@ public class ExtractionMessageSender : IExtractionMessageSender
public ExtractionMessageSender(
ExtractImagesOptions options,
ExtractImagesCliOptions cliOptions,
IProducerModel extractionRequestProducer,
IProducerModel extractionRequestInfoProducer,
IProducerModel<ExtractionRequestMessage> extractionRequestProducer,
IProducerModel<ExtractionRequestInfoMessage> extractionRequestInfoProducer,
IFileSystem fileSystem,
string extractionRoot,
string extractionDir,
Expand Down Expand Up @@ -71,11 +71,11 @@ public void SendMessages(ExtractionKey extractionKey, List<string> idList)
throw new ArgumentException("ID list is empty");

var jobId = Guid.NewGuid();
DateTime now = _dateTimeProvider.UtcNow();
var now = _dateTimeProvider.UtcNow();

// TODO(rkm 2021-04-01) Change this to a string[] in both messages below
string? modalitiesString = _modalities == null ? null : string.Join(',', _modalities);
string userName = Environment.UserName;
var modalitiesString = _modalities == null ? null : string.Join(',', _modalities);
var userName = Environment.UserName;

var erm = new ExtractionRequestMessage
{
Expand All @@ -95,15 +95,15 @@ public void SendMessages(ExtractionKey extractionKey, List<string> idList)
ExtractionIdentifiers = null!,
};

List<ExtractionRequestMessage> ermList =
var ermList =
idList
.Chunk(_maxIdentifiersPerMessage)
.Select(x =>
new ExtractionRequestMessage(erm)
{
ExtractionIdentifiers = [.. x]
}
).ToList();
.Chunk(_maxIdentifiersPerMessage)
.Select(x =>
new ExtractionRequestMessage(erm)
{
ExtractionIdentifiers = [.. x]
}
).ToList();

var erim = new ExtractionRequestInfoMessage
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using SmiServices.Common;
using SmiServices.Common.Execution;
using SmiServices.Common.Messages.Updating;
using SmiServices.Common.Messaging;
using SmiServices.Common.Options;

Expand All @@ -9,13 +10,13 @@ namespace SmiServices.Applications.TriggerUpdates
public class TriggerUpdatesHost : MicroserviceHost
{
private readonly ITriggerUpdatesSource _source;
private readonly IProducerModel _producer;
private readonly IProducerModel<UpdateValuesMessage> _producer;

public TriggerUpdatesHost(GlobalOptions options, ITriggerUpdatesSource source, IMessageBroker? messageBroker = null)
: base(options, messageBroker)
{
_source = source;
_producer = MessageBroker.SetupProducer(options.TriggerUpdatesOptions!, isBatch: false);
_producer = MessageBroker.SetupProducer<UpdateValuesMessage>(options.TriggerUpdatesOptions!, isBatch: false);
}

public override void Start()
Expand Down
4 changes: 2 additions & 2 deletions src/SmiServices/Common/Execution/MicroserviceHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public abstract class MicroserviceHost : IMicroserviceHost
private bool _auxConnectionsCreated;

private readonly ProducerOptions _fatalLoggingProducerOptions;
private IProducerModel? _fatalLoggingProducer;
private IProducerModel<FatalErrorMessage>? _fatalLoggingProducer;

private readonly ControlMessageConsumer _controlMessageConsumer = null!;

Expand Down Expand Up @@ -119,7 +119,7 @@ public void StartAuxConnections()
if (MessageBroker.HasConsumers)
throw new ApplicationException("Rabbit adapter has consumers before aux. connections created");

_fatalLoggingProducer = MessageBroker.SetupProducer(_fatalLoggingProducerOptions, isBatch: false);
_fatalLoggingProducer = MessageBroker.SetupProducer<FatalErrorMessage>(_fatalLoggingProducerOptions, isBatch: false);
MessageBroker.StartConsumer(_controlMessageConsumer.ControlConsumerOptions, _controlMessageConsumer, isSolo: false);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/SmiServices/Common/IMessageBroker.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using RabbitMQ.Client;
using SmiServices.Common.Messages;
using SmiServices.Common.Messaging;
using SmiServices.Common.Options;
using System;
Expand All @@ -13,7 +14,7 @@ public interface IMessageBroker

void StopConsumer(Guid taskId, TimeSpan timeout);

IProducerModel SetupProducer(ProducerOptions producerOptions, bool isBatch);
IProducerModel<T> SetupProducer<T>(ProducerOptions producerOptions, bool isBatch) where T : IMessage;

IModel GetModel(string connectionName);

Expand Down
4 changes: 2 additions & 2 deletions src/SmiServices/Common/Messaging/BatchProducerModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace SmiServices.Common.Messaging
/// No logging of sent - handle yourself
/// Make sure to WaitForConfirms during host shutdown
/// </summary>
public class BatchProducerModel : ProducerModel
public class BatchProducerModel<T> : ProducerModel<T> where T : IMessage
{
public BatchProducerModel(
string exchangeName,
Expand All @@ -33,7 +33,7 @@ public BatchProducerModel(
/// <param name="inResponseTo"></param>
/// <param name="routingKey"></param>
/// <returns></returns>
public override IMessageHeader SendMessage(IMessage message, IMessageHeader? inResponseTo = null, string? routingKey = null)
public override IMessageHeader SendMessage(T message, IMessageHeader? inResponseTo = null, string? routingKey = null)
{
return SendMessageImpl(message, inResponseTo, routingKey);
}
Expand Down
5 changes: 2 additions & 3 deletions src/SmiServices/Common/Messaging/IProducerModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@

namespace SmiServices.Common.Messaging
{
// TODO(rkm 2021-04-01) Make this generic over the message type it's expected to send(?)
/// <summary>
/// Interface for an object which can send messages to RabbitMQ.
/// </summary>
public interface IProducerModel
public interface IProducerModel<in T> where T : IMessage
{
/// <summary>
/// Sends a <see cref="IMessage"/> to a RabbitMQ exchange with the appropriate <see cref="IMessageHeader"/>.
/// </summary>
/// <param name="message">Message object to serialise and send.</param>
/// <param name="isInResponseTo">If you are responding to a message, pass that messages header in here (otherwise pass null)</param>
/// <param name="routingKey">Routing key for the exchange to direct the message.</param>
IMessageHeader SendMessage(IMessage message, IMessageHeader? isInResponseTo, string? routingKey);
IMessageHeader SendMessage(T message, IMessageHeader? isInResponseTo, string? routingKey);

/// <summary>
/// Waits until all sent messages are confirmed by RabbitMQ
Expand Down
Loading
Loading