diff --git a/.vscode/settings.json b/.vscode/settings.json index 05c8f928..7940f672 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,5 +6,6 @@ "python.testing.pytestEnabled": true, "python.analysis.extraPaths": [ "src/clients/python-client" - ] + ], + "dotnet.defaultSolution": "Nexus.sln" } \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index d874d9bf..4f6059da 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## v2.0.0-beta.16 - 2023-09-18 + +### Bugs fixed: +- Fixed a multithreading bug affecting the aggregation calculations. This bug very likely caused incorrect aggregation data probably for a long period of time for datasets with many NaN values. + ## v2.0.0-beta.15 - 2023-07-13 ### Bugs fixed: diff --git a/notes/system-resources.md b/notes/system-resources.md new file mode 100644 index 00000000..39408f5c --- /dev/null +++ b/notes/system-resources.md @@ -0,0 +1,24 @@ +# How to avoid issues with Nexus being killed by the Linux OOM killer? + +- The value of `GcInfo.TotalAvailableMemoryBytes` is equal to the available physical memory when the current container the process is running inside has no specific memory limit. If it has a limit, the value of `GcInfo.TotalAvailableMemoryBytes` will be equal to `75%` of that value as long as `System.GC.HeapHardLimitPercent` is not set [[learn.microsoft.com]](https://learn.microsoft.com/en-us/dotnet/core/runtime-config/garbage-collector#heap-limit). This behavior has been confirmed by multiple tests using a simple memory allocation application and Docker Compose. + +- When the application allocates memory which is not used immediately, the OS (Linux) commits the memory but does allocate it only when the associated memory page is accessed for the first time. This behavior has the advantage that an underutilization of physical RAM is avoided but the disadvantage is that the system may suddenly run out of memory when the commited memory is actually being used. Since this happens when the memory page is accessed for the first time, it is hard to determine which line of code will have a high chance of triggering the OOM killer of Linux. + +- There is also the `GcInfo.HighMemoryLoadThresholdBytes` setting which is at 90 % of the physical memory. I don't know exactly why this value is not adapted to the value of `System.GC.HeapHardLimitPercent` [[github.com]](https://github.com/dotnet/runtime/issues/58974). Microsoft writes the following about the limit: *[...] for the dominant process on a machine with 64GB of memory, it's reasonable for GC to start reacting when there's 10% of memory available.* [[learn.microsoft.com]](https://learn.microsoft.com/en-us/dotnet/core/runtime-config/garbage-collector#high-memory-percent). This may be an explanation why Nexus does not always OOM but only when there is more than 10% of the memory available and Nexus tries to allocat an array > 10 % of the available memory. This would prevent the GC from becoming active and the OS runs the OOM killer. + +- On Unix-based OS, there is currently no low memory notification [[github.com]](https://github.com/dotnet/runtime/issues/6051). + +- With the above mentioned memory limits set to a value less than the available physical memory (e.g. by using the `DOTNET_GCHeapHardLimit` variable), either the GC runs in time and the allocation succeeds or we get an `OutOfMemoryException` (I don't know why the GC is not always able to free enough memory) but the application stays alive reliably now. + +- So the simplest solution to avoid OOM issues might be to + - configure Docker Compose to apply a memory limit to the container (*Docker resource limits are built on top of cgroups, which is a Linux kernel capability* [[devblogs.microsoft.com/]](https://devblogs.microsoft.com/dotnet/using-net-and-docker-together-dockercon-2019-update/)), + - use `MemoryPool.Shared.Rent` whereever possible to avoid allocations and + - catch `OutOfMemoryException` in potentially large array allocations to run the GC and retry once + +More resources +- **.NET Core application running in docker gets OOMKilled if swapping is disabled** [[github.com]](https://github.com/dotnet/runtime/issues/851) + +- **net5.0 console apps on linux don't show OutOfMemoryExceptions before being OOM-killed** - *It is not possible for .NET runtime to reliably throw OutOfMemoryException on Linux, unless you disable oom killer. +Note that average .NET process uses number of unmanaged libraries. .NET runtime does not have control over allocations +done by these libraries. If the library happens to make an allocation that overshoots the memory killer limit, +the Linux OS will happily make this allocation succeed, only to kill the process later.* [[github.com]](https://github.com/dotnet/runtime/issues/46147#issuecomment-747471498) \ No newline at end of file diff --git a/src/Nexus/API/JobsController.cs b/src/Nexus/API/JobsController.cs index deda01a8..e2906472 100644 --- a/src/Nexus/API/JobsController.cs +++ b/src/Nexus/API/JobsController.cs @@ -234,7 +234,7 @@ public async Task> ExportAsync( catch (Exception ex) { _logger.LogError(ex, "Unable to export the requested data."); - throw new Exception("Unable to export the requested data.", ex); + throw; } }); @@ -267,7 +267,7 @@ public ActionResult RefreshDatabase() catch (Exception ex) { _logger.LogError(ex, "Unable to reload extensions and reset the resource catalog."); - throw new Exception("Unable to reload extensions and reset the resource catalog.", ex); + throw; } }); @@ -307,7 +307,7 @@ public async Task> ClearCacheAsync( catch (Exception ex) { _logger.LogError(ex, "Unable to clear the cache."); - throw new Exception("Unable to clear the cache.", ex); + throw; } }); diff --git a/src/Nexus/Core/NexusClaims.cs b/src/Nexus/Core/NexusClaims.cs index b1d89ef5..59a1c0bc 100644 --- a/src/Nexus/Core/NexusClaims.cs +++ b/src/Nexus/Core/NexusClaims.cs @@ -7,4 +7,4 @@ internal static class NexusClaims public const string CAN_READ_CATALOG_GROUP = "CanReadCatalogGroup"; public const string CAN_WRITE_CATALOG_GROUP = "CanWriteCatalogGroup"; } -} +} \ No newline at end of file diff --git a/src/Nexus/Extensibility/DataSource/DataSourceController.cs b/src/Nexus/Extensibility/DataSource/DataSourceController.cs index e55bfe02..937f16d1 100644 --- a/src/Nexus/Extensibility/DataSource/DataSourceController.cs +++ b/src/Nexus/Extensibility/DataSource/DataSourceController.cs @@ -1,6 +1,7 @@ using System.Buffers; using System.Collections.Concurrent; using System.ComponentModel.DataAnnotations; +using System.Diagnostics; using System.Reflection; using System.Text.Json; using System.Text.Json.Nodes; @@ -485,6 +486,10 @@ await DataSource.ReadAsync( progress, cancellationToken); } + catch (OutOfMemoryException) + { + throw; + } catch (Exception ex) { Logger.LogError(ex, "Read original data period {Begin} to {End} failed", begin, end); @@ -647,6 +652,10 @@ await _cacheService.UpdateAsync( cancellationToken); } } + catch (OutOfMemoryException) + { + throw; + } catch (Exception ex) { Logger.LogError(ex, "Read aggregation data period {Begin} to {End} failed", begin, end); @@ -743,6 +752,10 @@ await DataSource.ReadAsync( blockSize, offset); } + catch (OutOfMemoryException) + { + throw; + } catch (Exception ex) { Logger.LogError(ex, "Read resampling data period {Begin} to {End} failed", roundedBegin, roundedEnd); @@ -1006,6 +1019,10 @@ await controller.ReadAsync( dataSourceProgress, cancellationToken); } + catch (OutOfMemoryException) + { + throw; + } catch (Exception ex) { logger.LogError(ex, "Process period {Begin} to {End} failed", currentBegin, currentEnd); diff --git a/src/Nexus/Nexus.csproj b/src/Nexus/Nexus.csproj index 6038c421..8ef6599b 100644 --- a/src/Nexus/Nexus.csproj +++ b/src/Nexus/Nexus.csproj @@ -5,7 +5,6 @@ - diff --git a/src/Nexus/Program.cs b/src/Nexus/Program.cs index 9e020f81..270b087f 100644 --- a/src/Nexus/Program.cs +++ b/src/Nexus/Program.cs @@ -54,10 +54,16 @@ // architecture if (!BitConverter.IsLittleEndian) { - Log.Information("This software runs on little-endian systems."); + Log.Information("This software runs only on little-endian systems."); return; } + // memory info + var memoryInfo = GC.GetGCMemoryInfo(); + + Console.WriteLine($"GC: Total available memory: {memoryInfo.TotalAvailableMemoryBytes / 1024 / 1024} MB"); + Console.WriteLine($"GC: High memory load threshold: {memoryInfo.HighMemoryLoadThresholdBytes / 1024 / 1024} MB"); + Log.Information("Start host"); var builder = WebApplication.CreateBuilder(args); diff --git a/src/Nexus/Services/DataService.cs b/src/Nexus/Services/DataService.cs index 791ac21a..f8566788 100644 --- a/src/Nexus/Services/DataService.cs +++ b/src/Nexus/Services/DataService.cs @@ -1,8 +1,8 @@ -using System.ComponentModel.DataAnnotations; +using System.Buffers; +using System.ComponentModel.DataAnnotations; using System.IO.Compression; using System.IO.Pipelines; using System.Security.Claims; -using Microsoft.Extensions.Options; using Nexus.Core; using Nexus.Extensibility; using Nexus.Utilities; @@ -140,7 +140,9 @@ public async Task> ReadAsDoubleArrayAsync( end, cancellationToken); - var result = new double[stream.Length / 8]; + var elementCount = (int)(stream.Length / 8); // TODO is this cast safe? + using var memoryOwner = MemoryPool.Shared.Rent(elementCount); + var result = memoryOwner.Memory.Slice(0, elementCount); var byteBuffer = new CastMemoryManager(result).Memory; int bytesRead; diff --git a/src/Nexus/Services/MemoryTracker.cs b/src/Nexus/Services/MemoryTracker.cs index 3301fc83..fc9fea6d 100644 --- a/src/Nexus/Services/MemoryTracker.cs +++ b/src/Nexus/Services/MemoryTracker.cs @@ -43,8 +43,12 @@ public MemoryTracker(IOptions dataOptions, ILogger { _dataOptions = dataOptions.Value; _logger = logger; + + _ = Task.Run(MonitorFullGC); } + internal int Factor { get; set; } = 8; + public async Task RegisterAllocationAsync(long minimumByteCount, long maximumByteCount, CancellationToken cancellationToken) { if (minimumByteCount > _dataOptions.TotalBufferMemoryConsumption) @@ -58,26 +62,26 @@ public async Task RegisterAllocationAsync(long minimumBy // get exclusive access to _consumedBytes and _retrySemaphores lock (this) { - var halfOfRemainingBytes = _consumedBytes >= _dataOptions.TotalBufferMemoryConsumption + var fractionOfRemainingBytes = _consumedBytes >= _dataOptions.TotalBufferMemoryConsumption ? 0 - : (_dataOptions.TotalBufferMemoryConsumption - _consumedBytes) / 2; + : (_dataOptions.TotalBufferMemoryConsumption - _consumedBytes) / Factor /* normal = 8, tests = 2 */; long actualByteCount = 0; - if (halfOfRemainingBytes >= maximumByteCount) + if (fractionOfRemainingBytes >= maximumByteCount) actualByteCount = maximumByteCount; - else if (halfOfRemainingBytes >= minimumByteCount) - actualByteCount = halfOfRemainingBytes; + else if (fractionOfRemainingBytes >= minimumByteCount) + actualByteCount = fractionOfRemainingBytes; // success - if (actualByteCount > 0) + if (actualByteCount >= minimumByteCount) { // remove semaphore from list if (myRetrySemaphore is not null) _retrySemaphores.Remove(myRetrySemaphore); - _logger.LogTrace("Allocate {ByteCount} bytes", actualByteCount); + _logger.LogTrace("Allocate {ByteCount} bytes ({MegaByteCount} MB)", actualByteCount, actualByteCount / 1024 / 1024); SetConsumedBytesAndTriggerWaitingTasks(actualByteCount); return new AllocationRegistration(this, actualByteCount); @@ -96,7 +100,7 @@ public async Task RegisterAllocationAsync(long minimumBy } // wait until _consumedBytes changes - _logger.LogTrace("Wait until {ByteCount} bytes are available", minimumByteCount); + _logger.LogTrace("Wait until {ByteCount} bytes ({MegaByteCount} MB) are available", minimumByteCount, minimumByteCount / 1024 / 1024); await myRetrySemaphore.WaitAsync(timeout: TimeSpan.FromMinutes(1), cancellationToken); } } @@ -106,7 +110,7 @@ public void UnregisterAllocation(AllocationRegistration allocationRegistration) // get exclusive access to _consumedBytes and _retrySemaphores lock (this) { - _logger.LogTrace("Release {ByteCount} bytes", allocationRegistration.ActualByteCount); + _logger.LogTrace("Release {ByteCount} bytes ({MegaByteCount} MB)", allocationRegistration.ActualByteCount, allocationRegistration.ActualByteCount / 1024 / 1024); SetConsumedBytesAndTriggerWaitingTasks(-allocationRegistration.ActualByteCount); } } @@ -122,7 +126,26 @@ private void SetConsumedBytesAndTriggerWaitingTasks(long difference) retrySemaphore.Release(); } - _logger.LogTrace("{ByteCount} bytes are currently in use", _consumedBytes); + _logger.LogTrace("{ByteCount} bytes ({MegaByteCount} MB) are currently in use", _consumedBytes, _consumedBytes / 1024 / 1024); + } + + private void MonitorFullGC() + { + _logger.LogDebug("Register for full GC notifications"); + GC.RegisterForFullGCNotification(1, 1); + + while (true) + { + var status = GC.WaitForFullGCApproach(); + + if (status == GCNotificationStatus.Succeeded) + _logger.LogDebug("Full GC is approaching"); + + status = GC.WaitForFullGCComplete(); + + if (status == GCNotificationStatus.Succeeded) + _logger.LogDebug("Full GC has completed"); + } } } } diff --git a/src/Nexus/Services/ProcessingService.cs b/src/Nexus/Services/ProcessingService.cs index f28d2003..90f51762 100644 --- a/src/Nexus/Services/ProcessingService.cs +++ b/src/Nexus/Services/ProcessingService.cs @@ -1,155 +1,180 @@ -using MathNet.Numerics.LinearAlgebra; -using MathNet.Numerics.Statistics; -using Microsoft.Extensions.Options; +using Microsoft.Extensions.Options; +using MudBlazor; using Nexus.Core; using Nexus.DataModel; using Nexus.Utilities; +using System.Buffers; using System.Reflection; +using System.Runtime.CompilerServices; -namespace Nexus.Services +namespace Nexus.Services; + +internal interface IProcessingService +{ + void Resample( + NexusDataType dataType, + ReadOnlyMemory data, + ReadOnlyMemory status, + Memory targetBuffer, + int blockSize, + int offset); + + void Aggregate( + NexusDataType dataType, + RepresentationKind kind, + Memory data, + ReadOnlyMemory status, + Memory targetBuffer, + int blockSize); +} + +internal class ProcessingService : IProcessingService { - internal interface IProcessingService + private readonly double _nanThreshold; + + public ProcessingService(IOptions dataOptions) { - void Resample( - NexusDataType dataType, - ReadOnlyMemory data, - ReadOnlyMemory status, - Memory targetBuffer, - int blockSize, - int offset); - - void Aggregate( - NexusDataType dataType, - RepresentationKind kind, - Memory data, - ReadOnlyMemory status, - Memory targetBuffer, - int blockSize); + _nanThreshold = dataOptions.Value.AggregationNaNThreshold; } - internal class ProcessingService : IProcessingService + public void Resample( + NexusDataType dataType, + ReadOnlyMemory data, + ReadOnlyMemory status, + Memory targetBuffer, + int blockSize, + int offset) { - private readonly double _nanThreshold; - - public ProcessingService(IOptions dataOptions) - { - _nanThreshold = dataOptions.Value.AggregationNaNThreshold; - } - - public void Resample( - NexusDataType dataType, - ReadOnlyMemory data, - ReadOnlyMemory status, - Memory targetBuffer, - int blockSize, - int offset) - { - var doubleData = new double[status.Length]; - - BufferUtilities.ApplyRepresentationStatusByDataType( - dataType, - data, - status, - target: doubleData); + using var memoryOwner = MemoryPool.Shared.Rent(status.Length); + var doubleData = memoryOwner.Memory.Slice(0, status.Length); - var targetBufferSpan = targetBuffer.Span; - var length = targetBuffer.Length; + BufferUtilities.ApplyRepresentationStatusByDataType( + dataType, + data, + status, + target: doubleData); - for (int i = 0; i < length; i++) - { - targetBufferSpan[i] = doubleData[(i + offset) / blockSize]; - } - } + var sourceBufferSpan = doubleData.Span; + var targetBufferSpan = targetBuffer.Span; + var length = targetBuffer.Length; - public void Aggregate( - NexusDataType dataType, - RepresentationKind kind, - Memory data, - ReadOnlyMemory status, - Memory targetBuffer, - int blockSize) + for (int i = 0; i < length; i++) { - var targetType = NexusUtilities.GetTypeFromNexusDataType(dataType); + targetBufferSpan[i] = sourceBufferSpan[(i + offset) / blockSize]; + } + } - var method = typeof(ProcessingService) - .GetMethod(nameof(ProcessingService.GenericProcess), BindingFlags.Instance | BindingFlags.NonPublic)! - .MakeGenericMethod(targetType); + public void Aggregate( + NexusDataType dataType, + RepresentationKind kind, + Memory data, + ReadOnlyMemory status, + Memory targetBuffer, + int blockSize) + { + var targetType = NexusUtilities.GetTypeFromNexusDataType(dataType); - method.Invoke(this, new object[] { kind, data, status, targetBuffer, blockSize }); - } + // TODO: cache + var method = typeof(ProcessingService) + .GetMethod(nameof(GenericProcess), BindingFlags.Instance | BindingFlags.NonPublic)! + .MakeGenericMethod(targetType); - private void GenericProcess( - RepresentationKind kind, - Memory data, - ReadOnlyMemory status, - Memory targetBuffer, - int blockSize) where T : unmanaged - { - var Tdata = new CastMemoryManager(data).Memory; + method.Invoke(this, new object[] { kind, data, status, targetBuffer, blockSize }); + } - switch (kind) - { - case RepresentationKind.Mean: - case RepresentationKind.MeanPolarDeg: - case RepresentationKind.Min: - case RepresentationKind.Max: - case RepresentationKind.Std: - case RepresentationKind.Rms: - case RepresentationKind.Sum: + private void GenericProcess( + RepresentationKind kind, + Memory data, + ReadOnlyMemory status, + Memory targetBuffer, + int blockSize) where T : unmanaged + { + var Tdata = new CastMemoryManager(data).Memory; - var doubleData2 = new double[Tdata.Length]; + switch (kind) + { + case RepresentationKind.Mean: + case RepresentationKind.MeanPolarDeg: + case RepresentationKind.Min: + case RepresentationKind.Max: + case RepresentationKind.Std: + case RepresentationKind.Rms: + case RepresentationKind.Sum: + + using (var memoryOwner = MemoryPool.Shared.Rent(Tdata.Length)) + { + var doubleData2 = memoryOwner.Memory.Slice(0, Tdata.Length); BufferUtilities.ApplyRepresentationStatus(Tdata, status, target: doubleData2); ApplyAggregationFunction(kind, blockSize, doubleData2, targetBuffer); + } - break; + break; - case RepresentationKind.MinBitwise: - case RepresentationKind.MaxBitwise: + case RepresentationKind.MinBitwise: + case RepresentationKind.MaxBitwise: - ApplyAggregationFunction(kind, blockSize, Tdata, status, targetBuffer); + ApplyAggregationFunction(kind, blockSize, Tdata, status, targetBuffer); - break; + break; - default: - throw new Exception($"The representation kind {kind} is not supported."); - } + default: + throw new Exception($"The representation kind {kind} is not supported."); } + } - private void ApplyAggregationFunction( - RepresentationKind kind, - int blockSize, - Memory data, - Memory targetBuffer) + private void ApplyAggregationFunction( + RepresentationKind kind, + int blockSize, + Memory data, + Memory targetBuffer) + { + switch (kind) { - switch (kind) - { - case RepresentationKind.Mean: + case RepresentationKind.Mean: - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; + Parallel.For(0, targetBuffer.Length, x => + { + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; - if (isHighQuality) - targetBuffer.Span[x] = ArrayStatistics.Mean(chunkData); + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; - else - targetBuffer.Span[x] = double.NaN; - }); + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; - break; + if (isHighQuality) + targetBuffer.Span[x] = Mean(chunkData); - case RepresentationKind.MeanPolarDeg: + else + targetBuffer.Span[x] = double.NaN; + }); - var sin = new double[targetBuffer.Length]; - var cos = new double[targetBuffer.Length]; + break; + + case RepresentationKind.MeanPolarDeg: + + using (var memoryOwner_sin = MemoryPool.Shared.Rent(targetBuffer.Length)) + using (var memoryOwner_cos = MemoryPool.Shared.Rent(targetBuffer.Length)) + { + var sinBuffer = memoryOwner_sin.Memory.Slice(0, targetBuffer.Length); + var cosBuffer = memoryOwner_cos.Memory.Slice(0, targetBuffer.Length); var limit = 360; var factor = 2 * Math.PI / limit; Parallel.For(0, targetBuffer.Length, x => { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); + var sin = sinBuffer.Span; + var cos = cosBuffer.Span; + + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; + + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; + var length = chunkData.Length; var isHighQuality = (length / (double)blockSize) >= _nanThreshold; @@ -171,214 +196,373 @@ private void ApplyAggregationFunction( targetBuffer.Span[x] = double.NaN; } }); + } - break; + break; - case RepresentationKind.Min: + case RepresentationKind.Min: - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; + Parallel.For(0, targetBuffer.Length, x => + { + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; - if (isHighQuality) - targetBuffer.Span[x] = ArrayStatistics.Minimum(chunkData); + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; - else - targetBuffer.Span[x] = double.NaN; - }); + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; - break; + if (isHighQuality) + targetBuffer.Span[x] = Minimum(chunkData); - case RepresentationKind.Max: + else + targetBuffer.Span[x] = double.NaN; + }); - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; + break; - if (isHighQuality) - targetBuffer.Span[x] = ArrayStatistics.Maximum(chunkData); + case RepresentationKind.Max: - else - targetBuffer.Span[x] = double.NaN; - }); + Parallel.For(0, targetBuffer.Length, x => + { + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; - break; + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; - case RepresentationKind.Std: + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; + if (isHighQuality) + targetBuffer.Span[x] = Maximum(chunkData); - if (isHighQuality) - targetBuffer.Span[x] = ArrayStatistics.StandardDeviation(chunkData); + else + targetBuffer.Span[x] = double.NaN; + }); - else - targetBuffer.Span[x] = double.NaN; - }); + break; - break; + case RepresentationKind.Std: - case RepresentationKind.Rms: + Parallel.For(0, targetBuffer.Length, x => + { + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; - if (isHighQuality) - targetBuffer.Span[x] = ArrayStatistics.RootMeanSquare(chunkData); + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; - else - targetBuffer.Span[x] = double.NaN; - }); + if (isHighQuality) + targetBuffer.Span[x] = StandardDeviation(chunkData); - break; + else + targetBuffer.Span[x] = double.NaN; + }); - case RepresentationKind.Sum: + break; - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; + case RepresentationKind.Rms: - if (isHighQuality) - targetBuffer.Span[x] = Vector.Build.Dense(chunkData).Sum(); + Parallel.For(0, targetBuffer.Length, x => + { + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; + + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; - else - targetBuffer.Span[x] = double.NaN; - }); + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; - break; + if (isHighQuality) + targetBuffer.Span[x] = RootMeanSquare(chunkData); - default: - throw new Exception($"The representation kind {kind} is not supported."); + else + targetBuffer.Span[x] = double.NaN; + }); + + break; + + case RepresentationKind.Sum: + + Parallel.For(0, targetBuffer.Length, x => + { + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; + + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; + + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; + + if (isHighQuality) + targetBuffer.Span[x] = Sum(chunkData); + + else + targetBuffer.Span[x] = double.NaN; + }); + + break; + + default: + throw new Exception($"The representation kind {kind} is not supported."); - } } + } - private void ApplyAggregationFunction( - RepresentationKind kind, - int blockSize, - Memory data, - ReadOnlyMemory status, - Memory targetBuffer) where T : unmanaged + private void ApplyAggregationFunction( + RepresentationKind kind, + int blockSize, + Memory data, + ReadOnlyMemory status, + Memory targetBuffer) where T : unmanaged + { + switch (kind) { - switch (kind) - { - case RepresentationKind.MinBitwise: + case RepresentationKind.MinBitwise: - T[] bitField_and = new T[targetBuffer.Length]; + T[] bitField_and = new T[targetBuffer.Length]; - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData( - data.Slice(x * blockSize, blockSize), - status.Slice(x * blockSize, blockSize)).Span; + Parallel.For(0, targetBuffer.Length, x => + { + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; - var targetBufferSpan = targetBuffer.Span; - var length = chunkData.Length; - var isHighQuality = (length / (double)blockSize) >= _nanThreshold; + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + status: status.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; - if (isHighQuality) - { - for (int i = 0; i < length; i++) - { - if (i == 0) - bitField_and[x] = GenericBitOr.BitOr(bitField_and[x], chunkData[i]); + var targetBufferSpan = targetBuffer.Span; + var length = chunkData.Length; + var isHighQuality = (length / (double)blockSize) >= _nanThreshold; - else - bitField_and[x] = GenericBitAnd.BitAnd(bitField_and[x], chunkData[i]); - } + if (isHighQuality) + { + for (int i = 0; i < length; i++) + { + if (i == 0) + bitField_and[x] = GenericBitOr.BitOr(bitField_and[x], chunkData[i]); - targetBuffer.Span[x] = Convert.ToDouble(bitField_and[x]); + else + bitField_and[x] = GenericBitAnd.BitAnd(bitField_and[x], chunkData[i]); } - else - { - targetBuffer.Span[x] = double.NaN; - } - }); + targetBuffer.Span[x] = Convert.ToDouble(bitField_and[x]); + } - break; + else + { + targetBuffer.Span[x] = double.NaN; + } + }); - case RepresentationKind.MaxBitwise: + break; - T[] bitField_or = new T[targetBuffer.Length]; + case RepresentationKind.MaxBitwise: - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData(data - .Slice(x * blockSize, blockSize), status - .Slice(x * blockSize, blockSize)).Span; + T[] bitField_or = new T[targetBuffer.Length]; - var length = chunkData.Length; - var isHighQuality = (length / (double)blockSize) >= _nanThreshold; + Parallel.For(0, targetBuffer.Length, x => + { + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; - if (isHighQuality) - { - for (int i = 0; i < length; i++) - { - bitField_or[x] = GenericBitOr.BitOr(bitField_or[x], chunkData[i]); - } + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + status: status.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; - targetBuffer.Span[x] = Convert.ToDouble(bitField_or[x]); - } + var length = chunkData.Length; + var isHighQuality = (length / (double)blockSize) >= _nanThreshold; - else + if (isHighQuality) + { + for (int i = 0; i < length; i++) { - targetBuffer.Span[x] = double.NaN; + bitField_or[x] = GenericBitOr.BitOr(bitField_or[x], chunkData[i]); } - }); - break; + targetBuffer.Span[x] = Convert.ToDouble(bitField_or[x]); + } - default: - throw new Exception($"The representation kind {kind} is not supported."); + else + { + targetBuffer.Span[x] = double.NaN; + } + }); + + break; + + default: + throw new Exception($"The representation kind {kind} is not supported."); + + } + } + private static Memory GetNaNFreeData( + ReadOnlyMemory source, + ReadOnlyMemory status, + Memory target) where T : unmanaged + { + var targetLength = 0; + var sourceLength = source.Length; + var sourceSpan = source.Span; + var targetSpan = target.Span; + var statusSpan = status.Span; + + for (int i = 0; i < sourceLength; i++) + { + if (statusSpan[i] == 1) + { + targetSpan[targetLength] = sourceSpan[i]; + targetLength++; } } - private static Memory GetNaNFreeData(Memory data, ReadOnlyMemory status) where T : unmanaged + return target[..targetLength]; + } + + private static Memory GetNaNFreeData( + ReadOnlyMemory source, + Memory target) + { + var targetLength = 0; + var sourceLength = source.Length; + var sourceSpan = source.Span; + var targetSpan = target.Span; + + for (int i = 0; i < sourceLength; i++) { - var targetLength = 0; - var sourceLength = data.Length; - var spanData = data.Span; - var spanStatus = status.Span; + var value = sourceSpan[i]; - for (int i = 0; i < sourceLength; i++) + if (!double.IsNaN(value)) { - if (spanStatus[i] == 1) - { - spanData[targetLength] = spanData[i]; - targetLength++; - } + targetSpan[targetLength] = value; + targetLength++; } + } - return data[..targetLength]; + return target[..targetLength]; + } + + // TODO: vectorize + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double Sum(Span data) + { + if (data.Length == 0) + return double.NaN; + + var sum = 0.0; + + for (int i = 0; i < data.Length; i++) + { + sum += data[i]; } - private static Memory GetNaNFreeData(Memory data) + return sum; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double Mean(Span data) + { + if (data.Length == 0) + return double.NaN; + + var mean = 0.0; + var m = 0UL; + + for (int i = 0; i < data.Length; i++) { - var targetLength = 0; - var sourceLength = data.Length; - var spanData = data.Span; + mean += (data[i] - mean)/++m; + } + + return mean; + } - for (int i = 0; i < sourceLength; i++) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double Minimum(Span data) + { + if (data.Length == 0) + return double.NaN; + + var min = double.PositiveInfinity; + + for (int i = 0; i < data.Length; i++) + { + if (data[i] < min || double.IsNaN(data[i])) { - var value = spanData[i]; + min = data[i]; + } + } - if (!double.IsNaN(value)) - { - spanData[targetLength] = value; - targetLength++; - } + return min; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double Maximum(Span data) + { + if (data.Length == 0) + return double.NaN; + + var max = double.NegativeInfinity; + + for (int i = 0; i < data.Length; i++) + { + if (data[i] > max || double.IsNaN(data[i])) + { + max = data[i]; } + } + + return max; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double StandardDeviation(Span samples) + { + return Math.Sqrt(Variance(samples)); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double Variance(Span samples) + { + if (samples.Length <= 1) + return double.NaN; + + var variance = 0.0; + var t = samples[0]; - return data[..targetLength]; + for (int i = 1; i < samples.Length; i++) + { + t += samples[i]; + var diff = ((i + 1)*samples[i]) - t; + variance += diff * diff / ((i + 1.0) * i); } + + return variance/(samples.Length - 1); } -} + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double RootMeanSquare(Span data) + { + if (data.Length == 0) + return double.NaN; + + var mean = 0.0; + var m = 0UL; + + for (int i = 0; i < data.Length; i++) + { + mean += (data[i]*data[i] - mean)/++m; + } + + return Math.Sqrt(mean); + } +} \ No newline at end of file diff --git a/src/Nexus/Utilities/NexusUtilities.cs b/src/Nexus/Utilities/NexusUtilities.cs index 78a8020e..7de4cd3f 100644 --- a/src/Nexus/Utilities/NexusUtilities.cs +++ b/src/Nexus/Utilities/NexusUtilities.cs @@ -1,7 +1,7 @@ using Nexus.Core; using Nexus.DataModel; -using System.Reflection; using System.Runtime.CompilerServices; +using System.Runtime.ExceptionServices; namespace Nexus.Utilities { @@ -88,7 +88,7 @@ public static async Task WhenAllFailFastAsync(List tasks, CancellationToke .ThrowIfCancellationRequested(); if (task.Exception is not null) - throw task.Exception.InnerException ?? task.Exception; + ExceptionDispatchInfo.Capture(task.Exception.InnerException ?? task.Exception).Throw(); tasks.Remove(task); } diff --git a/tests/Nexus.Tests/Services/MemoryTrackerTests.cs b/tests/Nexus.Tests/Services/MemoryTrackerTests.cs index a0d06e48..d40a1f10 100644 --- a/tests/Nexus.Tests/Services/MemoryTrackerTests.cs +++ b/tests/Nexus.Tests/Services/MemoryTrackerTests.cs @@ -14,7 +14,13 @@ public async Task CanHandleMultipleRequests() // Arrange var weAreWaiting = new AutoResetEvent(initialState: false); var dataOptions = new DataOptions() { TotalBufferMemoryConsumption = 200 }; - var memoryTracker = new MemoryTracker(Options.Create(dataOptions), NullLogger.Instance); + + var memoryTracker = new MemoryTracker(Options.Create(dataOptions), NullLogger.Instance) + { + // TODO: remove this property and test with factor 8 + Factor = 2 + }; + var firstRegistration = default(AllocationRegistration); var secondRegistration = default(AllocationRegistration); diff --git a/version.json b/version.json index 73c135e6..b143d82d 100644 --- a/version.json +++ b/version.json @@ -1,4 +1,4 @@ { "version": "2.0.0", - "suffix": "beta.15" + "suffix": "beta.16" } \ No newline at end of file