Skip to content

Commit

Permalink
fixed a bug where multiple threads tried accessing the db context
Browse files Browse the repository at this point in the history
  • Loading branch information
Casper-NS committed Dec 19, 2023
1 parent 8d1420e commit 4b1eeab
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 6 deletions.
13 changes: 13 additions & 0 deletions CentralHub.Api.Tests/MockAggregatedMeasurementRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,17 @@ await RecentMeasurementGroupsMutex.Lock(recentMeasurementGroups =>
return aggregatedMeasurements.Min(m => m.StartTime);
}, cancellationToken);
}

public async Task AddAggregatedMeasurementAsync(List<AggregatedMeasurementDto> measurementDtos, CancellationToken cancellationToken)
{
await AggregatedMeasurementsMutex.Lock(stuff =>
{
stuff.AggregatedMeasurements.AddRange(measurementDtos);
measurementDtos.ForEach(am =>
{
am.AggregatedMeasurementDtoId = stuff.NextId;
stuff.NextId++;
});
}, cancellationToken);
}
}
2 changes: 1 addition & 1 deletion CentralHub.Api/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private static void Main(string[] args)
// Ignore
}
#endif
var connectionString = $"Data Source={dbPath}";
var connectionString = $"Data Source={dbPath}; Pooling=False";

builder.Services.AddDbContext<ApplicationDbContext>(options => options.UseSqlite(connectionString));
builder.Services.AddScoped<IRoomRepository, RoomRepository>();
Expand Down
2 changes: 2 additions & 0 deletions CentralHub.Api/Services/IMeasurementRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ namespace CentralHub.Api.Services;

public interface IMeasurementRepository
{
Task AddAggregatedMeasurementAsync(List<AggregatedMeasurementDto> measurementDtos, CancellationToken cancellationToken);

Task<int> AddAggregatedMeasurementAsync(AggregatedMeasurementDto measurementDto, CancellationToken cancellationToken);

Task RemoveAggregatedMeasurementAsync(AggregatedMeasurementDto measurementDto, CancellationToken cancellationToken);
Expand Down
17 changes: 12 additions & 5 deletions CentralHub.Api/Services/LocalizationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using CentralHub.Api.Model;
using CentralHub.Api.Model.Responses.Measurement;
using CentralHub.Api.Threading;
using Microsoft.Data.Sqlite;

namespace CentralHub.Api.Services;

Expand Down Expand Up @@ -44,6 +45,8 @@ public async Task AggregateMeasurementsAsync(CancellationToken stoppingToken)
var roomMeasurementGroups = await aggregatorRepository.GetRoomMeasurementGroupsAsync(stoppingToken);
var allRooms = await roomRepository.GetRoomsAsync(stoppingToken);
using var measuredRoomsMutex = new CancellableMutex<List<RoomDto>>(new List<RoomDto>());
using var aggregatedMeasurementsMutex = new CancellableMutex<List<AggregatedMeasurementDto>>(new List<AggregatedMeasurementDto>());


await Parallel.ForEachAsync(
roomMeasurementGroups,
Expand All @@ -60,21 +63,25 @@ await Parallel.ForEachAsync(

await measuredRoomsMutex.Lock(m => m.Add(room), cancellationToken);

await aggregatorRepository.AddAggregatedMeasurementAsync(
CreateAggregatedMeasurement(room, roomMeasurementGroup.Value),
cancellationToken);
var aggregatedMeasurement = CreateAggregatedMeasurement(room, roomMeasurementGroup.Value);
await aggregatedMeasurementsMutex.Lock(am => am.Add(aggregatedMeasurement), cancellationToken);
});

await measuredRoomsMutex.Lock(async m =>
{
foreach (var room in allRooms.Where(r => !m.Contains(r)))
{
await aggregatorRepository.AddAggregatedMeasurementAsync(
CreateAggregatedMeasurement(room, new List<MeasurementGroup>()),
await aggregatedMeasurementsMutex.Lock(am =>
am.Add(CreateAggregatedMeasurement(room, new List<MeasurementGroup>())),
stoppingToken);

}
},
stoppingToken);

await aggregatedMeasurementsMutex.Lock(async am =>
await aggregatorRepository.AddAggregatedMeasurementAsync(am, stoppingToken),
stoppingToken);
}

private static AggregatedMeasurementDto CreateAggregatedMeasurement(RoomDto room, IReadOnlyCollection<MeasurementGroup> measurementGroups)
Expand Down
16 changes: 16 additions & 0 deletions CentralHub.Api/Services/MeasurementRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ public MeasurementRepository(ApplicationDbContext applicationDbContext)
_applicationDbContext.Database.EnsureCreated();
}

public async Task AddAggregatedMeasurementAsync(List<AggregatedMeasurementDto> measurementDtos, CancellationToken cancellationToken)
{
_applicationDbContext.AggregatedMeasurements.AddRange(measurementDtos);
try
{
await _applicationDbContext.SaveChangesAsync(cancellationToken);
return;
}
catch (OperationCanceledException)
{
// Remove the roomDto from the collection as the operation was cancelled.
_applicationDbContext.AggregatedMeasurements.RemoveRange(measurementDtos);
throw;
}
}

public async Task<int> AddAggregatedMeasurementAsync(AggregatedMeasurementDto measurementDto, CancellationToken cancellationToken)
{
_applicationDbContext.AggregatedMeasurements.Add(measurementDto);
Expand Down

0 comments on commit 4b1eeab

Please sign in to comment.