diff --git a/CentralHub.Api.Tests/MockAggregatedMeasurementRepository.cs b/CentralHub.Api.Tests/MockAggregatedMeasurementRepository.cs index eb62605..615ab52 100644 --- a/CentralHub.Api.Tests/MockAggregatedMeasurementRepository.cs +++ b/CentralHub.Api.Tests/MockAggregatedMeasurementRepository.cs @@ -105,4 +105,17 @@ await RecentMeasurementGroupsMutex.Lock(recentMeasurementGroups => return aggregatedMeasurements.Min(m => m.StartTime); }, cancellationToken); } + + public async Task AddAggregatedMeasurementAsync(List measurementDtos, CancellationToken cancellationToken) + { + await AggregatedMeasurementsMutex.Lock(stuff => + { + stuff.AggregatedMeasurements.AddRange(measurementDtos); + measurementDtos.ForEach(am => + { + am.AggregatedMeasurementDtoId = stuff.NextId; + stuff.NextId++; + }); + }, cancellationToken); + } } \ No newline at end of file diff --git a/CentralHub.Api/Program.cs b/CentralHub.Api/Program.cs index 5caca14..6c3ccaf 100644 --- a/CentralHub.Api/Program.cs +++ b/CentralHub.Api/Program.cs @@ -38,7 +38,7 @@ private static void Main(string[] args) // Ignore } #endif - var connectionString = $"Data Source={dbPath}"; + var connectionString = $"Data Source={dbPath}; Pooling=False"; builder.Services.AddDbContext(options => options.UseSqlite(connectionString)); builder.Services.AddScoped(); diff --git a/CentralHub.Api/Services/IMeasurementRepository.cs b/CentralHub.Api/Services/IMeasurementRepository.cs index 9ce77a4..2f1d802 100644 --- a/CentralHub.Api/Services/IMeasurementRepository.cs +++ b/CentralHub.Api/Services/IMeasurementRepository.cs @@ -6,6 +6,8 @@ namespace CentralHub.Api.Services; public interface IMeasurementRepository { + Task AddAggregatedMeasurementAsync(List measurementDtos, CancellationToken cancellationToken); + Task AddAggregatedMeasurementAsync(AggregatedMeasurementDto measurementDto, CancellationToken cancellationToken); Task RemoveAggregatedMeasurementAsync(AggregatedMeasurementDto measurementDto, CancellationToken cancellationToken); diff --git a/CentralHub.Api/Services/LocalizationService.cs b/CentralHub.Api/Services/LocalizationService.cs index 06beea5..62a07af 100644 --- a/CentralHub.Api/Services/LocalizationService.cs +++ b/CentralHub.Api/Services/LocalizationService.cs @@ -3,6 +3,7 @@ using CentralHub.Api.Model; using CentralHub.Api.Model.Responses.Measurement; using CentralHub.Api.Threading; +using Microsoft.Data.Sqlite; namespace CentralHub.Api.Services; @@ -44,6 +45,8 @@ public async Task AggregateMeasurementsAsync(CancellationToken stoppingToken) var roomMeasurementGroups = await aggregatorRepository.GetRoomMeasurementGroupsAsync(stoppingToken); var allRooms = await roomRepository.GetRoomsAsync(stoppingToken); using var measuredRoomsMutex = new CancellableMutex>(new List()); + using var aggregatedMeasurementsMutex = new CancellableMutex>(new List()); + await Parallel.ForEachAsync( roomMeasurementGroups, @@ -60,21 +63,25 @@ await Parallel.ForEachAsync( await measuredRoomsMutex.Lock(m => m.Add(room), cancellationToken); - await aggregatorRepository.AddAggregatedMeasurementAsync( - CreateAggregatedMeasurement(room, roomMeasurementGroup.Value), - cancellationToken); + var aggregatedMeasurement = CreateAggregatedMeasurement(room, roomMeasurementGroup.Value); + await aggregatedMeasurementsMutex.Lock(am => am.Add(aggregatedMeasurement), cancellationToken); }); await measuredRoomsMutex.Lock(async m => { foreach (var room in allRooms.Where(r => !m.Contains(r))) { - await aggregatorRepository.AddAggregatedMeasurementAsync( - CreateAggregatedMeasurement(room, new List()), + await aggregatedMeasurementsMutex.Lock(am => + am.Add(CreateAggregatedMeasurement(room, new List())), stoppingToken); + } }, stoppingToken); + + await aggregatedMeasurementsMutex.Lock(async am => + await aggregatorRepository.AddAggregatedMeasurementAsync(am, stoppingToken), + stoppingToken); } private static AggregatedMeasurementDto CreateAggregatedMeasurement(RoomDto room, IReadOnlyCollection measurementGroups) diff --git a/CentralHub.Api/Services/MeasurementRepository.cs b/CentralHub.Api/Services/MeasurementRepository.cs index b61b607..b25cca8 100644 --- a/CentralHub.Api/Services/MeasurementRepository.cs +++ b/CentralHub.Api/Services/MeasurementRepository.cs @@ -22,6 +22,22 @@ public MeasurementRepository(ApplicationDbContext applicationDbContext) _applicationDbContext.Database.EnsureCreated(); } + public async Task AddAggregatedMeasurementAsync(List measurementDtos, CancellationToken cancellationToken) + { + _applicationDbContext.AggregatedMeasurements.AddRange(measurementDtos); + try + { + await _applicationDbContext.SaveChangesAsync(cancellationToken); + return; + } + catch (OperationCanceledException) + { + // Remove the roomDto from the collection as the operation was cancelled. + _applicationDbContext.AggregatedMeasurements.RemoveRange(measurementDtos); + throw; + } + } + public async Task AddAggregatedMeasurementAsync(AggregatedMeasurementDto measurementDto, CancellationToken cancellationToken) { _applicationDbContext.AggregatedMeasurements.Add(measurementDto);