From df7e97da1a824056f299767da4414783fb59def5 Mon Sep 17 00:00:00 2001 From: Apollo3zehn Date: Thu, 7 Sep 2023 20:22:50 +0200 Subject: [PATCH 1/4] Improve memory management --- .vscode/settings.json | 3 +- notes/system-resources.md | 24 +++++++++ src/Nexus/Core/NexusClaims.cs | 2 +- src/Nexus/Program.cs | 8 ++- src/Nexus/Services/DataService.cs | 8 +-- src/Nexus/Services/MemoryTracker.cs | 29 ++++++++-- src/Nexus/Services/ProcessingService.cs | 72 ++++++++++++++----------- src/Nexus/Utilities/NexusUtilities.cs | 4 +- 8 files changed, 108 insertions(+), 42 deletions(-) create mode 100644 notes/system-resources.md diff --git a/.vscode/settings.json b/.vscode/settings.json index 05c8f928..7940f672 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,5 +6,6 @@ "python.testing.pytestEnabled": true, "python.analysis.extraPaths": [ "src/clients/python-client" - ] + ], + "dotnet.defaultSolution": "Nexus.sln" } \ No newline at end of file diff --git a/notes/system-resources.md b/notes/system-resources.md new file mode 100644 index 00000000..39408f5c --- /dev/null +++ b/notes/system-resources.md @@ -0,0 +1,24 @@ +# How to avoid issues with Nexus being killed by the Linux OOM killer? + +- The value of `GcInfo.TotalAvailableMemoryBytes` is equal to the available physical memory when the current container the process is running inside has no specific memory limit. If it has a limit, the value of `GcInfo.TotalAvailableMemoryBytes` will be equal to `75%` of that value as long as `System.GC.HeapHardLimitPercent` is not set [[learn.microsoft.com]](https://learn.microsoft.com/en-us/dotnet/core/runtime-config/garbage-collector#heap-limit). This behavior has been confirmed by multiple tests using a simple memory allocation application and Docker Compose. + +- When the application allocates memory which is not used immediately, the OS (Linux) commits the memory but does allocate it only when the associated memory page is accessed for the first time. This behavior has the advantage that an underutilization of physical RAM is avoided but the disadvantage is that the system may suddenly run out of memory when the commited memory is actually being used. Since this happens when the memory page is accessed for the first time, it is hard to determine which line of code will have a high chance of triggering the OOM killer of Linux. + +- There is also the `GcInfo.HighMemoryLoadThresholdBytes` setting which is at 90 % of the physical memory. I don't know exactly why this value is not adapted to the value of `System.GC.HeapHardLimitPercent` [[github.com]](https://github.com/dotnet/runtime/issues/58974). Microsoft writes the following about the limit: *[...] for the dominant process on a machine with 64GB of memory, it's reasonable for GC to start reacting when there's 10% of memory available.* [[learn.microsoft.com]](https://learn.microsoft.com/en-us/dotnet/core/runtime-config/garbage-collector#high-memory-percent). This may be an explanation why Nexus does not always OOM but only when there is more than 10% of the memory available and Nexus tries to allocat an array > 10 % of the available memory. This would prevent the GC from becoming active and the OS runs the OOM killer. + +- On Unix-based OS, there is currently no low memory notification [[github.com]](https://github.com/dotnet/runtime/issues/6051). + +- With the above mentioned memory limits set to a value less than the available physical memory (e.g. by using the `DOTNET_GCHeapHardLimit` variable), either the GC runs in time and the allocation succeeds or we get an `OutOfMemoryException` (I don't know why the GC is not always able to free enough memory) but the application stays alive reliably now. + +- So the simplest solution to avoid OOM issues might be to + - configure Docker Compose to apply a memory limit to the container (*Docker resource limits are built on top of cgroups, which is a Linux kernel capability* [[devblogs.microsoft.com/]](https://devblogs.microsoft.com/dotnet/using-net-and-docker-together-dockercon-2019-update/)), + - use `MemoryPool.Shared.Rent` whereever possible to avoid allocations and + - catch `OutOfMemoryException` in potentially large array allocations to run the GC and retry once + +More resources +- **.NET Core application running in docker gets OOMKilled if swapping is disabled** [[github.com]](https://github.com/dotnet/runtime/issues/851) + +- **net5.0 console apps on linux don't show OutOfMemoryExceptions before being OOM-killed** - *It is not possible for .NET runtime to reliably throw OutOfMemoryException on Linux, unless you disable oom killer. +Note that average .NET process uses number of unmanaged libraries. .NET runtime does not have control over allocations +done by these libraries. If the library happens to make an allocation that overshoots the memory killer limit, +the Linux OS will happily make this allocation succeed, only to kill the process later.* [[github.com]](https://github.com/dotnet/runtime/issues/46147#issuecomment-747471498) \ No newline at end of file diff --git a/src/Nexus/Core/NexusClaims.cs b/src/Nexus/Core/NexusClaims.cs index b1d89ef5..59a1c0bc 100644 --- a/src/Nexus/Core/NexusClaims.cs +++ b/src/Nexus/Core/NexusClaims.cs @@ -7,4 +7,4 @@ internal static class NexusClaims public const string CAN_READ_CATALOG_GROUP = "CanReadCatalogGroup"; public const string CAN_WRITE_CATALOG_GROUP = "CanWriteCatalogGroup"; } -} +} \ No newline at end of file diff --git a/src/Nexus/Program.cs b/src/Nexus/Program.cs index 9e020f81..270b087f 100644 --- a/src/Nexus/Program.cs +++ b/src/Nexus/Program.cs @@ -54,10 +54,16 @@ // architecture if (!BitConverter.IsLittleEndian) { - Log.Information("This software runs on little-endian systems."); + Log.Information("This software runs only on little-endian systems."); return; } + // memory info + var memoryInfo = GC.GetGCMemoryInfo(); + + Console.WriteLine($"GC: Total available memory: {memoryInfo.TotalAvailableMemoryBytes / 1024 / 1024} MB"); + Console.WriteLine($"GC: High memory load threshold: {memoryInfo.HighMemoryLoadThresholdBytes / 1024 / 1024} MB"); + Log.Information("Start host"); var builder = WebApplication.CreateBuilder(args); diff --git a/src/Nexus/Services/DataService.cs b/src/Nexus/Services/DataService.cs index 791ac21a..f8566788 100644 --- a/src/Nexus/Services/DataService.cs +++ b/src/Nexus/Services/DataService.cs @@ -1,8 +1,8 @@ -using System.ComponentModel.DataAnnotations; +using System.Buffers; +using System.ComponentModel.DataAnnotations; using System.IO.Compression; using System.IO.Pipelines; using System.Security.Claims; -using Microsoft.Extensions.Options; using Nexus.Core; using Nexus.Extensibility; using Nexus.Utilities; @@ -140,7 +140,9 @@ public async Task> ReadAsDoubleArrayAsync( end, cancellationToken); - var result = new double[stream.Length / 8]; + var elementCount = (int)(stream.Length / 8); // TODO is this cast safe? + using var memoryOwner = MemoryPool.Shared.Rent(elementCount); + var result = memoryOwner.Memory.Slice(0, elementCount); var byteBuffer = new CastMemoryManager(result).Memory; int bytesRead; diff --git a/src/Nexus/Services/MemoryTracker.cs b/src/Nexus/Services/MemoryTracker.cs index 3301fc83..a1d2da80 100644 --- a/src/Nexus/Services/MemoryTracker.cs +++ b/src/Nexus/Services/MemoryTracker.cs @@ -43,6 +43,8 @@ public MemoryTracker(IOptions dataOptions, ILogger { _dataOptions = dataOptions.Value; _logger = logger; + + _ = Task.Run(MonitorFullGC); } public async Task RegisterAllocationAsync(long minimumByteCount, long maximumByteCount, CancellationToken cancellationToken) @@ -77,7 +79,7 @@ public async Task RegisterAllocationAsync(long minimumBy if (myRetrySemaphore is not null) _retrySemaphores.Remove(myRetrySemaphore); - _logger.LogTrace("Allocate {ByteCount} bytes", actualByteCount); + _logger.LogTrace("Allocate {ByteCount} bytes ({MegaByteCount} MB)", actualByteCount, actualByteCount / 1024 / 1024); SetConsumedBytesAndTriggerWaitingTasks(actualByteCount); return new AllocationRegistration(this, actualByteCount); @@ -96,7 +98,7 @@ public async Task RegisterAllocationAsync(long minimumBy } // wait until _consumedBytes changes - _logger.LogTrace("Wait until {ByteCount} bytes are available", minimumByteCount); + _logger.LogTrace("Wait until {ByteCount} bytes ({MegaByteCount} MB) are available", minimumByteCount, minimumByteCount / 1024 / 1024); await myRetrySemaphore.WaitAsync(timeout: TimeSpan.FromMinutes(1), cancellationToken); } } @@ -106,7 +108,7 @@ public void UnregisterAllocation(AllocationRegistration allocationRegistration) // get exclusive access to _consumedBytes and _retrySemaphores lock (this) { - _logger.LogTrace("Release {ByteCount} bytes", allocationRegistration.ActualByteCount); + _logger.LogTrace("Release {ByteCount} bytes ({MegaByteCount} MB)", allocationRegistration.ActualByteCount, allocationRegistration.ActualByteCount / 1024 / 1024); SetConsumedBytesAndTriggerWaitingTasks(-allocationRegistration.ActualByteCount); } } @@ -122,7 +124,26 @@ private void SetConsumedBytesAndTriggerWaitingTasks(long difference) retrySemaphore.Release(); } - _logger.LogTrace("{ByteCount} bytes are currently in use", _consumedBytes); + _logger.LogTrace("{ByteCount} bytes ({MegaByteCount} MB) are currently in use", _consumedBytes, _consumedBytes / 1024 / 1024); + } + + private void MonitorFullGC() + { + _logger.LogDebug("Register for full GC notifications"); + GC.RegisterForFullGCNotification(1, 1); + + while (true) + { + var status = GC.WaitForFullGCApproach(); + + if (status == GCNotificationStatus.Succeeded) + _logger.LogDebug("Full GC is approaching"); + + status = GC.WaitForFullGCComplete(); + + if (status == GCNotificationStatus.Succeeded) + _logger.LogDebug("Full GC has completed"); + } } } } diff --git a/src/Nexus/Services/ProcessingService.cs b/src/Nexus/Services/ProcessingService.cs index f28d2003..9b74f8d0 100644 --- a/src/Nexus/Services/ProcessingService.cs +++ b/src/Nexus/Services/ProcessingService.cs @@ -4,6 +4,7 @@ using Nexus.Core; using Nexus.DataModel; using Nexus.Utilities; +using System.Buffers; using System.Reflection; namespace Nexus.Services @@ -44,7 +45,8 @@ public void Resample( int blockSize, int offset) { - var doubleData = new double[status.Length]; + using var memoryOwner = MemoryPool.Shared.Rent(status.Length); + var doubleData = memoryOwner.Memory.Slice(0, status.Length); BufferUtilities.ApplyRepresentationStatusByDataType( dataType, @@ -52,12 +54,13 @@ public void Resample( status, target: doubleData); + var sourceBufferSpan = doubleData.Span; var targetBufferSpan = targetBuffer.Span; var length = targetBuffer.Length; for (int i = 0; i < length; i++) { - targetBufferSpan[i] = doubleData[(i + offset) / blockSize]; + targetBufferSpan[i] = sourceBufferSpan[(i + offset) / blockSize]; } } @@ -72,7 +75,7 @@ public void Aggregate( var targetType = NexusUtilities.GetTypeFromNexusDataType(dataType); var method = typeof(ProcessingService) - .GetMethod(nameof(ProcessingService.GenericProcess), BindingFlags.Instance | BindingFlags.NonPublic)! + .GetMethod(nameof(GenericProcess), BindingFlags.Instance | BindingFlags.NonPublic)! .MakeGenericMethod(targetType); method.Invoke(this, new object[] { kind, data, status, targetBuffer, blockSize }); @@ -97,17 +100,20 @@ private void GenericProcess( case RepresentationKind.Rms: case RepresentationKind.Sum: - var doubleData2 = new double[Tdata.Length]; + using (var memoryOwner = MemoryPool.Shared.Rent(Tdata.Length)) + { + var doubleData2 = memoryOwner.Memory.Slice(0, Tdata.Length); - BufferUtilities.ApplyRepresentationStatus(Tdata, status, target: doubleData2); - ApplyAggregationFunction(kind, blockSize, doubleData2, targetBuffer); + BufferUtilities.ApplyRepresentationStatus(Tdata, status, target: doubleData2); + ApplyAggregationFunction(kind, blockSize, doubleData2, targetBuffer); + } break; case RepresentationKind.MinBitwise: case RepresentationKind.MaxBitwise: - ApplyAggregationFunction(kind, blockSize, Tdata, status, targetBuffer); + ApplyAggregationFunction(kind, blockSize, Tdata, status, targetBuffer); break; @@ -142,35 +148,41 @@ private void ApplyAggregationFunction( case RepresentationKind.MeanPolarDeg: - var sin = new double[targetBuffer.Length]; - var cos = new double[targetBuffer.Length]; - var limit = 360; - var factor = 2 * Math.PI / limit; - - Parallel.For(0, targetBuffer.Length, x => + using (var memoryOwner_sin = MemoryPool.Shared.Rent(targetBuffer.Length)) + using (var memoryOwner_cos = MemoryPool.Shared.Rent(targetBuffer.Length)) { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var length = chunkData.Length; - var isHighQuality = (length / (double)blockSize) >= _nanThreshold; + var sinBuffer = memoryOwner_sin.Memory.Slice(0, targetBuffer.Length); + var cosBuffer = memoryOwner_cos.Memory.Slice(0, targetBuffer.Length); + var limit = 360; + var factor = 2 * Math.PI / limit; - if (isHighQuality) + Parallel.For(0, targetBuffer.Length, x => { - for (int i = 0; i < chunkData.Length; i++) + var sin = sinBuffer.Span; + var cos = cosBuffer.Span; + var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); + var length = chunkData.Length; + var isHighQuality = (length / (double)blockSize) >= _nanThreshold; + + if (isHighQuality) { - sin[x] += Math.Sin(chunkData[i] * factor); - cos[x] += Math.Cos(chunkData[i] * factor); - } + for (int i = 0; i < chunkData.Length; i++) + { + sin[x] += Math.Sin(chunkData[i] * factor); + cos[x] += Math.Cos(chunkData[i] * factor); + } - targetBuffer.Span[x] = Math.Atan2(sin[x], cos[x]) / factor; + targetBuffer.Span[x] = Math.Atan2(sin[x], cos[x]) / factor; - if (targetBuffer.Span[x] < 0) - targetBuffer.Span[x] += limit; - } - else - { - targetBuffer.Span[x] = double.NaN; - } - }); + if (targetBuffer.Span[x] < 0) + targetBuffer.Span[x] += limit; + } + else + { + targetBuffer.Span[x] = double.NaN; + } + }); + } break; diff --git a/src/Nexus/Utilities/NexusUtilities.cs b/src/Nexus/Utilities/NexusUtilities.cs index 78a8020e..7de4cd3f 100644 --- a/src/Nexus/Utilities/NexusUtilities.cs +++ b/src/Nexus/Utilities/NexusUtilities.cs @@ -1,7 +1,7 @@ using Nexus.Core; using Nexus.DataModel; -using System.Reflection; using System.Runtime.CompilerServices; +using System.Runtime.ExceptionServices; namespace Nexus.Utilities { @@ -88,7 +88,7 @@ public static async Task WhenAllFailFastAsync(List tasks, CancellationToke .ThrowIfCancellationRequested(); if (task.Exception is not null) - throw task.Exception.InnerException ?? task.Exception; + ExceptionDispatchInfo.Capture(task.Exception.InnerException ?? task.Exception).Throw(); tasks.Remove(task); } From a137e2eff63e2692b5b3df03e5e9d6691bc2664b Mon Sep 17 00:00:00 2001 From: Apollo3zehn Date: Mon, 11 Sep 2023 09:50:03 +0200 Subject: [PATCH 2/4] Improve memory management and OOM exception handling --- src/Nexus/API/JobsController.cs | 6 +- .../DataSource/DataSourceController.cs | 17 + src/Nexus/Nexus.csproj | 1 - src/Nexus/Services/MemoryTracker.cs | 12 +- src/Nexus/Services/ProcessingService.cs | 684 ++++++++++-------- 5 files changed, 425 insertions(+), 295 deletions(-) diff --git a/src/Nexus/API/JobsController.cs b/src/Nexus/API/JobsController.cs index deda01a8..e2906472 100644 --- a/src/Nexus/API/JobsController.cs +++ b/src/Nexus/API/JobsController.cs @@ -234,7 +234,7 @@ public async Task> ExportAsync( catch (Exception ex) { _logger.LogError(ex, "Unable to export the requested data."); - throw new Exception("Unable to export the requested data.", ex); + throw; } }); @@ -267,7 +267,7 @@ public ActionResult RefreshDatabase() catch (Exception ex) { _logger.LogError(ex, "Unable to reload extensions and reset the resource catalog."); - throw new Exception("Unable to reload extensions and reset the resource catalog.", ex); + throw; } }); @@ -307,7 +307,7 @@ public async Task> ClearCacheAsync( catch (Exception ex) { _logger.LogError(ex, "Unable to clear the cache."); - throw new Exception("Unable to clear the cache.", ex); + throw; } }); diff --git a/src/Nexus/Extensibility/DataSource/DataSourceController.cs b/src/Nexus/Extensibility/DataSource/DataSourceController.cs index e55bfe02..937f16d1 100644 --- a/src/Nexus/Extensibility/DataSource/DataSourceController.cs +++ b/src/Nexus/Extensibility/DataSource/DataSourceController.cs @@ -1,6 +1,7 @@ using System.Buffers; using System.Collections.Concurrent; using System.ComponentModel.DataAnnotations; +using System.Diagnostics; using System.Reflection; using System.Text.Json; using System.Text.Json.Nodes; @@ -485,6 +486,10 @@ await DataSource.ReadAsync( progress, cancellationToken); } + catch (OutOfMemoryException) + { + throw; + } catch (Exception ex) { Logger.LogError(ex, "Read original data period {Begin} to {End} failed", begin, end); @@ -647,6 +652,10 @@ await _cacheService.UpdateAsync( cancellationToken); } } + catch (OutOfMemoryException) + { + throw; + } catch (Exception ex) { Logger.LogError(ex, "Read aggregation data period {Begin} to {End} failed", begin, end); @@ -743,6 +752,10 @@ await DataSource.ReadAsync( blockSize, offset); } + catch (OutOfMemoryException) + { + throw; + } catch (Exception ex) { Logger.LogError(ex, "Read resampling data period {Begin} to {End} failed", roundedBegin, roundedEnd); @@ -1006,6 +1019,10 @@ await controller.ReadAsync( dataSourceProgress, cancellationToken); } + catch (OutOfMemoryException) + { + throw; + } catch (Exception ex) { logger.LogError(ex, "Process period {Begin} to {End} failed", currentBegin, currentEnd); diff --git a/src/Nexus/Nexus.csproj b/src/Nexus/Nexus.csproj index 6038c421..8ef6599b 100644 --- a/src/Nexus/Nexus.csproj +++ b/src/Nexus/Nexus.csproj @@ -5,7 +5,6 @@ - diff --git a/src/Nexus/Services/MemoryTracker.cs b/src/Nexus/Services/MemoryTracker.cs index a1d2da80..a9355e3a 100644 --- a/src/Nexus/Services/MemoryTracker.cs +++ b/src/Nexus/Services/MemoryTracker.cs @@ -60,20 +60,20 @@ public async Task RegisterAllocationAsync(long minimumBy // get exclusive access to _consumedBytes and _retrySemaphores lock (this) { - var halfOfRemainingBytes = _consumedBytes >= _dataOptions.TotalBufferMemoryConsumption + var eightsOfRemainingBytes = _consumedBytes >= _dataOptions.TotalBufferMemoryConsumption ? 0 - : (_dataOptions.TotalBufferMemoryConsumption - _consumedBytes) / 2; + : (_dataOptions.TotalBufferMemoryConsumption - _consumedBytes) / 8; long actualByteCount = 0; - if (halfOfRemainingBytes >= maximumByteCount) + if (eightsOfRemainingBytes >= maximumByteCount) actualByteCount = maximumByteCount; - else if (halfOfRemainingBytes >= minimumByteCount) - actualByteCount = halfOfRemainingBytes; + else if (eightsOfRemainingBytes >= minimumByteCount) + actualByteCount = eightsOfRemainingBytes; // success - if (actualByteCount > 0) + if (actualByteCount >= minimumByteCount) { // remove semaphore from list if (myRetrySemaphore is not null) diff --git a/src/Nexus/Services/ProcessingService.cs b/src/Nexus/Services/ProcessingService.cs index 9b74f8d0..50b980ab 100644 --- a/src/Nexus/Services/ProcessingService.cs +++ b/src/Nexus/Services/ProcessingService.cs @@ -1,396 +1,510 @@ -using MathNet.Numerics.LinearAlgebra; -using MathNet.Numerics.Statistics; -using Microsoft.Extensions.Options; +using Microsoft.Extensions.Options; +using MudBlazor; using Nexus.Core; using Nexus.DataModel; using Nexus.Utilities; using System.Buffers; using System.Reflection; +using System.Runtime.CompilerServices; -namespace Nexus.Services +namespace Nexus.Services; + +internal interface IProcessingService +{ + void Resample( + NexusDataType dataType, + ReadOnlyMemory data, + ReadOnlyMemory status, + Memory targetBuffer, + int blockSize, + int offset); + + void Aggregate( + NexusDataType dataType, + RepresentationKind kind, + Memory data, + ReadOnlyMemory status, + Memory targetBuffer, + int blockSize); +} + +internal class ProcessingService : IProcessingService { - internal interface IProcessingService + private readonly double _nanThreshold; + + public ProcessingService(IOptions dataOptions) { - void Resample( - NexusDataType dataType, - ReadOnlyMemory data, - ReadOnlyMemory status, - Memory targetBuffer, - int blockSize, - int offset); - - void Aggregate( - NexusDataType dataType, - RepresentationKind kind, - Memory data, - ReadOnlyMemory status, - Memory targetBuffer, - int blockSize); + _nanThreshold = dataOptions.Value.AggregationNaNThreshold; } - internal class ProcessingService : IProcessingService + public void Resample( + NexusDataType dataType, + ReadOnlyMemory data, + ReadOnlyMemory status, + Memory targetBuffer, + int blockSize, + int offset) { - private readonly double _nanThreshold; + using var memoryOwner = MemoryPool.Shared.Rent(status.Length); + var doubleData = memoryOwner.Memory.Slice(0, status.Length); + + BufferUtilities.ApplyRepresentationStatusByDataType( + dataType, + data, + status, + target: doubleData); + + var sourceBufferSpan = doubleData.Span; + var targetBufferSpan = targetBuffer.Span; + var length = targetBuffer.Length; - public ProcessingService(IOptions dataOptions) + for (int i = 0; i < length; i++) { - _nanThreshold = dataOptions.Value.AggregationNaNThreshold; + targetBufferSpan[i] = sourceBufferSpan[(i + offset) / blockSize]; } + } - public void Resample( - NexusDataType dataType, - ReadOnlyMemory data, - ReadOnlyMemory status, - Memory targetBuffer, - int blockSize, - int offset) - { - using var memoryOwner = MemoryPool.Shared.Rent(status.Length); - var doubleData = memoryOwner.Memory.Slice(0, status.Length); + public void Aggregate( + NexusDataType dataType, + RepresentationKind kind, + Memory data, + ReadOnlyMemory status, + Memory targetBuffer, + int blockSize) + { + var targetType = NexusUtilities.GetTypeFromNexusDataType(dataType); - BufferUtilities.ApplyRepresentationStatusByDataType( - dataType, - data, - status, - target: doubleData); + // TODO: cache + var method = typeof(ProcessingService) + .GetMethod(nameof(GenericProcess), BindingFlags.Instance | BindingFlags.NonPublic)! + .MakeGenericMethod(targetType); - var sourceBufferSpan = doubleData.Span; - var targetBufferSpan = targetBuffer.Span; - var length = targetBuffer.Length; + method.Invoke(this, new object[] { kind, data, status, targetBuffer, blockSize }); + } - for (int i = 0; i < length; i++) - { - targetBufferSpan[i] = sourceBufferSpan[(i + offset) / blockSize]; - } - } + private void GenericProcess( + RepresentationKind kind, + Memory data, + ReadOnlyMemory status, + Memory targetBuffer, + int blockSize) where T : unmanaged + { + var Tdata = new CastMemoryManager(data).Memory; - public void Aggregate( - NexusDataType dataType, - RepresentationKind kind, - Memory data, - ReadOnlyMemory status, - Memory targetBuffer, - int blockSize) + switch (kind) { - var targetType = NexusUtilities.GetTypeFromNexusDataType(dataType); + case RepresentationKind.Mean: + case RepresentationKind.MeanPolarDeg: + case RepresentationKind.Min: + case RepresentationKind.Max: + case RepresentationKind.Std: + case RepresentationKind.Rms: + case RepresentationKind.Sum: + + using (var memoryOwner = MemoryPool.Shared.Rent(Tdata.Length)) + { + var doubleData2 = memoryOwner.Memory.Slice(0, Tdata.Length); - var method = typeof(ProcessingService) - .GetMethod(nameof(GenericProcess), BindingFlags.Instance | BindingFlags.NonPublic)! - .MakeGenericMethod(targetType); + BufferUtilities.ApplyRepresentationStatus(Tdata, status, target: doubleData2); + ApplyAggregationFunction(kind, blockSize, doubleData2, targetBuffer); + } - method.Invoke(this, new object[] { kind, data, status, targetBuffer, blockSize }); - } + break; - private void GenericProcess( - RepresentationKind kind, - Memory data, - ReadOnlyMemory status, - Memory targetBuffer, - int blockSize) where T : unmanaged - { - var Tdata = new CastMemoryManager(data).Memory; + case RepresentationKind.MinBitwise: + case RepresentationKind.MaxBitwise: - switch (kind) - { - case RepresentationKind.Mean: - case RepresentationKind.MeanPolarDeg: - case RepresentationKind.Min: - case RepresentationKind.Max: - case RepresentationKind.Std: - case RepresentationKind.Rms: - case RepresentationKind.Sum: - - using (var memoryOwner = MemoryPool.Shared.Rent(Tdata.Length)) - { - var doubleData2 = memoryOwner.Memory.Slice(0, Tdata.Length); + ApplyAggregationFunction(kind, blockSize, Tdata, status, targetBuffer); - BufferUtilities.ApplyRepresentationStatus(Tdata, status, target: doubleData2); - ApplyAggregationFunction(kind, blockSize, doubleData2, targetBuffer); - } + break; - break; + default: + throw new Exception($"The representation kind {kind} is not supported."); + } + } - case RepresentationKind.MinBitwise: - case RepresentationKind.MaxBitwise: + private void ApplyAggregationFunction( + RepresentationKind kind, + int blockSize, + Memory data, + Memory targetBuffer) + { + switch (kind) + { + case RepresentationKind.Mean: - ApplyAggregationFunction(kind, blockSize, Tdata, status, targetBuffer); + Parallel.For(0, targetBuffer.Length, x => + { + var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; - break; + if (isHighQuality) + targetBuffer.Span[x] = Mean(chunkData); - default: - throw new Exception($"The representation kind {kind} is not supported."); - } - } + else + targetBuffer.Span[x] = double.NaN; + }); - private void ApplyAggregationFunction( - RepresentationKind kind, - int blockSize, - Memory data, - Memory targetBuffer) - { - switch (kind) - { - case RepresentationKind.Mean: + break; + + case RepresentationKind.MeanPolarDeg: + + using (var memoryOwner_sin = MemoryPool.Shared.Rent(targetBuffer.Length)) + using (var memoryOwner_cos = MemoryPool.Shared.Rent(targetBuffer.Length)) + { + var sinBuffer = memoryOwner_sin.Memory.Slice(0, targetBuffer.Length); + var cosBuffer = memoryOwner_cos.Memory.Slice(0, targetBuffer.Length); + var limit = 360; + var factor = 2 * Math.PI / limit; Parallel.For(0, targetBuffer.Length, x => { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; + var sin = sinBuffer.Span; + var cos = cosBuffer.Span; + var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + var length = chunkData.Length; + var isHighQuality = (length / (double)blockSize) >= _nanThreshold; if (isHighQuality) - targetBuffer.Span[x] = ArrayStatistics.Mean(chunkData); + { + for (int i = 0; i < chunkData.Length; i++) + { + sin[x] += Math.Sin(chunkData[i] * factor); + cos[x] += Math.Cos(chunkData[i] * factor); + } + + targetBuffer.Span[x] = Math.Atan2(sin[x], cos[x]) / factor; + if (targetBuffer.Span[x] < 0) + targetBuffer.Span[x] += limit; + } else + { targetBuffer.Span[x] = double.NaN; + } }); + } - break; - - case RepresentationKind.MeanPolarDeg: - - using (var memoryOwner_sin = MemoryPool.Shared.Rent(targetBuffer.Length)) - using (var memoryOwner_cos = MemoryPool.Shared.Rent(targetBuffer.Length)) - { - var sinBuffer = memoryOwner_sin.Memory.Slice(0, targetBuffer.Length); - var cosBuffer = memoryOwner_cos.Memory.Slice(0, targetBuffer.Length); - var limit = 360; - var factor = 2 * Math.PI / limit; + break; - Parallel.For(0, targetBuffer.Length, x => - { - var sin = sinBuffer.Span; - var cos = cosBuffer.Span; - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var length = chunkData.Length; - var isHighQuality = (length / (double)blockSize) >= _nanThreshold; + case RepresentationKind.Min: - if (isHighQuality) - { - for (int i = 0; i < chunkData.Length; i++) - { - sin[x] += Math.Sin(chunkData[i] * factor); - cos[x] += Math.Cos(chunkData[i] * factor); - } + Parallel.For(0, targetBuffer.Length, x => + { + var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; - targetBuffer.Span[x] = Math.Atan2(sin[x], cos[x]) / factor; + if (isHighQuality) + targetBuffer.Span[x] = Minimum(chunkData); - if (targetBuffer.Span[x] < 0) - targetBuffer.Span[x] += limit; - } - else - { - targetBuffer.Span[x] = double.NaN; - } - }); - } + else + targetBuffer.Span[x] = double.NaN; + }); - break; + break; - case RepresentationKind.Min: + case RepresentationKind.Max: - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; + Parallel.For(0, targetBuffer.Length, x => + { + var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; - if (isHighQuality) - targetBuffer.Span[x] = ArrayStatistics.Minimum(chunkData); + if (isHighQuality) + targetBuffer.Span[x] = Maximum(chunkData); - else - targetBuffer.Span[x] = double.NaN; - }); + else + targetBuffer.Span[x] = double.NaN; + }); - break; + break; - case RepresentationKind.Max: + case RepresentationKind.Std: - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; + Parallel.For(0, targetBuffer.Length, x => + { + var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; - if (isHighQuality) - targetBuffer.Span[x] = ArrayStatistics.Maximum(chunkData); + if (isHighQuality) + targetBuffer.Span[x] = StandardDeviation(chunkData); - else - targetBuffer.Span[x] = double.NaN; - }); + else + targetBuffer.Span[x] = double.NaN; + }); - break; + break; - case RepresentationKind.Std: + case RepresentationKind.Rms: - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; + Parallel.For(0, targetBuffer.Length, x => + { + var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; - if (isHighQuality) - targetBuffer.Span[x] = ArrayStatistics.StandardDeviation(chunkData); + if (isHighQuality) + targetBuffer.Span[x] = RootMeanSquare(chunkData); - else - targetBuffer.Span[x] = double.NaN; - }); + else + targetBuffer.Span[x] = double.NaN; + }); - break; + break; - case RepresentationKind.Rms: + case RepresentationKind.Sum: - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; + Parallel.For(0, targetBuffer.Length, x => + { + var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; - if (isHighQuality) - targetBuffer.Span[x] = ArrayStatistics.RootMeanSquare(chunkData); + if (isHighQuality) + targetBuffer.Span[x] = Sum(chunkData); - else - targetBuffer.Span[x] = double.NaN; - }); + else + targetBuffer.Span[x] = double.NaN; + }); - break; + break; - case RepresentationKind.Sum: + default: + throw new Exception($"The representation kind {kind} is not supported."); - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).ToArray(); - var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; + } + } - if (isHighQuality) - targetBuffer.Span[x] = Vector.Build.Dense(chunkData).Sum(); + private void ApplyAggregationFunction( + RepresentationKind kind, + int blockSize, + Memory data, + ReadOnlyMemory status, + Memory targetBuffer) where T : unmanaged + { + switch (kind) + { + case RepresentationKind.MinBitwise: - else - targetBuffer.Span[x] = double.NaN; - }); + T[] bitField_and = new T[targetBuffer.Length]; - break; + Parallel.For(0, targetBuffer.Length, x => + { + var chunkData = GetNaNFreeData( + data.Slice(x * blockSize, blockSize), + status.Slice(x * blockSize, blockSize)).Span; - default: - throw new Exception($"The representation kind {kind} is not supported."); + var targetBufferSpan = targetBuffer.Span; + var length = chunkData.Length; + var isHighQuality = (length / (double)blockSize) >= _nanThreshold; - } - } + if (isHighQuality) + { + for (int i = 0; i < length; i++) + { + if (i == 0) + bitField_and[x] = GenericBitOr.BitOr(bitField_and[x], chunkData[i]); - private void ApplyAggregationFunction( - RepresentationKind kind, - int blockSize, - Memory data, - ReadOnlyMemory status, - Memory targetBuffer) where T : unmanaged - { - switch (kind) - { - case RepresentationKind.MinBitwise: + else + bitField_and[x] = GenericBitAnd.BitAnd(bitField_and[x], chunkData[i]); + } - T[] bitField_and = new T[targetBuffer.Length]; + targetBuffer.Span[x] = Convert.ToDouble(bitField_and[x]); + } - Parallel.For(0, targetBuffer.Length, x => + else { - var chunkData = GetNaNFreeData( - data.Slice(x * blockSize, blockSize), - status.Slice(x * blockSize, blockSize)).Span; + targetBuffer.Span[x] = double.NaN; + } + }); - var targetBufferSpan = targetBuffer.Span; - var length = chunkData.Length; - var isHighQuality = (length / (double)blockSize) >= _nanThreshold; + break; - if (isHighQuality) - { - for (int i = 0; i < length; i++) - { - if (i == 0) - bitField_and[x] = GenericBitOr.BitOr(bitField_and[x], chunkData[i]); + case RepresentationKind.MaxBitwise: - else - bitField_and[x] = GenericBitAnd.BitAnd(bitField_and[x], chunkData[i]); - } + T[] bitField_or = new T[targetBuffer.Length]; - targetBuffer.Span[x] = Convert.ToDouble(bitField_and[x]); - } + Parallel.For(0, targetBuffer.Length, x => + { + var chunkData = GetNaNFreeData(data + .Slice(x * blockSize, blockSize), status + .Slice(x * blockSize, blockSize)).Span; - else + var length = chunkData.Length; + var isHighQuality = (length / (double)blockSize) >= _nanThreshold; + + if (isHighQuality) + { + for (int i = 0; i < length; i++) { - targetBuffer.Span[x] = double.NaN; + bitField_or[x] = GenericBitOr.BitOr(bitField_or[x], chunkData[i]); } - }); - break; + targetBuffer.Span[x] = Convert.ToDouble(bitField_or[x]); + } - case RepresentationKind.MaxBitwise: + else + { + targetBuffer.Span[x] = double.NaN; + } + }); - T[] bitField_or = new T[targetBuffer.Length]; + break; - Parallel.For(0, targetBuffer.Length, x => - { - var chunkData = GetNaNFreeData(data - .Slice(x * blockSize, blockSize), status - .Slice(x * blockSize, blockSize)).Span; + default: + throw new Exception($"The representation kind {kind} is not supported."); - var length = chunkData.Length; - var isHighQuality = (length / (double)blockSize) >= _nanThreshold; + } + } - if (isHighQuality) - { - for (int i = 0; i < length; i++) - { - bitField_or[x] = GenericBitOr.BitOr(bitField_or[x], chunkData[i]); - } + private static Memory GetNaNFreeData(Memory data, ReadOnlyMemory status) where T : unmanaged + { + var targetLength = 0; + var sourceLength = data.Length; + var spanData = data.Span; + var spanStatus = status.Span; - targetBuffer.Span[x] = Convert.ToDouble(bitField_or[x]); - } + for (int i = 0; i < sourceLength; i++) + { + if (spanStatus[i] == 1) + { + spanData[targetLength] = spanData[i]; + targetLength++; + } + } - else - { - targetBuffer.Span[x] = double.NaN; - } - }); + return data[..targetLength]; + } - break; + private static Memory GetNaNFreeData(Memory data) + { + var targetLength = 0; + var sourceLength = data.Length; + var spanData = data.Span; - default: - throw new Exception($"The representation kind {kind} is not supported."); + for (int i = 0; i < sourceLength; i++) + { + var value = spanData[i]; + if (!double.IsNaN(value)) + { + spanData[targetLength] = value; + targetLength++; } } - private static Memory GetNaNFreeData(Memory data, ReadOnlyMemory status) where T : unmanaged + return data[..targetLength]; + } + + // TODO: vectorize + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double Sum(Span data) + { + if (data.Length == 0) + return double.NaN; + + var sum = 0.0; + + for (int i = 0; i < data.Length; i++) + { + sum += data[i]; + } + + return sum; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double Mean(Span data) + { + if (data.Length == 0) + return double.NaN; + + var mean = 0.0; + var m = 0UL; + + for (int i = 0; i < data.Length; i++) { - var targetLength = 0; - var sourceLength = data.Length; - var spanData = data.Span; - var spanStatus = status.Span; + mean += (data[i] - mean)/++m; + } - for (int i = 0; i < sourceLength; i++) + return mean; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double Minimum(Span data) + { + if (data.Length == 0) + return double.NaN; + + var min = double.PositiveInfinity; + + for (int i = 0; i < data.Length; i++) + { + if (data[i] < min || double.IsNaN(data[i])) { - if (spanStatus[i] == 1) - { - spanData[targetLength] = spanData[i]; - targetLength++; - } + min = data[i]; } + } + + return min; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double Maximum(Span data) + { + if (data.Length == 0) + return double.NaN; + + var max = double.NegativeInfinity; - return data[..targetLength]; + for (int i = 0; i < data.Length; i++) + { + if (data[i] > max || double.IsNaN(data[i])) + { + max = data[i]; + } } - private static Memory GetNaNFreeData(Memory data) + return max; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double StandardDeviation(Span samples) + { + return Math.Sqrt(Variance(samples)); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double Variance(Span samples) + { + if (samples.Length <= 1) + return double.NaN; + + var variance = 0.0; + var t = samples[0]; + + for (int i = 1; i < samples.Length; i++) { - var targetLength = 0; - var sourceLength = data.Length; - var spanData = data.Span; + t += samples[i]; + var diff = ((i + 1)*samples[i]) - t; + variance += diff * diff / ((i + 1.0) * i); + } - for (int i = 0; i < sourceLength; i++) - { - var value = spanData[i]; + return variance/(samples.Length - 1); + } - if (!double.IsNaN(value)) - { - spanData[targetLength] = value; - targetLength++; - } - } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static double RootMeanSquare(Span data) + { + if (data.Length == 0) + return double.NaN; + + var mean = 0.0; + var m = 0UL; - return data[..targetLength]; + for (int i = 0; i < data.Length; i++) + { + mean += (data[i]*data[i] - mean)/++m; } + + return Math.Sqrt(mean); } -} +} \ No newline at end of file From e8fa5c7bd494a2816d976f6cc881dc7635313082 Mon Sep 17 00:00:00 2001 From: Apollo3zehn Date: Mon, 11 Sep 2023 16:46:15 +0200 Subject: [PATCH 3/4] Improve memory management --- src/Nexus/Services/MemoryTracker.cs | 12 +++++++----- tests/Nexus.Tests/Services/MemoryTrackerTests.cs | 8 +++++++- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/Nexus/Services/MemoryTracker.cs b/src/Nexus/Services/MemoryTracker.cs index a9355e3a..fc9fea6d 100644 --- a/src/Nexus/Services/MemoryTracker.cs +++ b/src/Nexus/Services/MemoryTracker.cs @@ -47,6 +47,8 @@ public MemoryTracker(IOptions dataOptions, ILogger _ = Task.Run(MonitorFullGC); } + internal int Factor { get; set; } = 8; + public async Task RegisterAllocationAsync(long minimumByteCount, long maximumByteCount, CancellationToken cancellationToken) { if (minimumByteCount > _dataOptions.TotalBufferMemoryConsumption) @@ -60,17 +62,17 @@ public async Task RegisterAllocationAsync(long minimumBy // get exclusive access to _consumedBytes and _retrySemaphores lock (this) { - var eightsOfRemainingBytes = _consumedBytes >= _dataOptions.TotalBufferMemoryConsumption + var fractionOfRemainingBytes = _consumedBytes >= _dataOptions.TotalBufferMemoryConsumption ? 0 - : (_dataOptions.TotalBufferMemoryConsumption - _consumedBytes) / 8; + : (_dataOptions.TotalBufferMemoryConsumption - _consumedBytes) / Factor /* normal = 8, tests = 2 */; long actualByteCount = 0; - if (eightsOfRemainingBytes >= maximumByteCount) + if (fractionOfRemainingBytes >= maximumByteCount) actualByteCount = maximumByteCount; - else if (eightsOfRemainingBytes >= minimumByteCount) - actualByteCount = eightsOfRemainingBytes; + else if (fractionOfRemainingBytes >= minimumByteCount) + actualByteCount = fractionOfRemainingBytes; // success if (actualByteCount >= minimumByteCount) diff --git a/tests/Nexus.Tests/Services/MemoryTrackerTests.cs b/tests/Nexus.Tests/Services/MemoryTrackerTests.cs index a0d06e48..d40a1f10 100644 --- a/tests/Nexus.Tests/Services/MemoryTrackerTests.cs +++ b/tests/Nexus.Tests/Services/MemoryTrackerTests.cs @@ -14,7 +14,13 @@ public async Task CanHandleMultipleRequests() // Arrange var weAreWaiting = new AutoResetEvent(initialState: false); var dataOptions = new DataOptions() { TotalBufferMemoryConsumption = 200 }; - var memoryTracker = new MemoryTracker(Options.Create(dataOptions), NullLogger.Instance); + + var memoryTracker = new MemoryTracker(Options.Create(dataOptions), NullLogger.Instance) + { + // TODO: remove this property and test with factor 8 + Factor = 2 + }; + var firstRegistration = default(AllocationRegistration); var secondRegistration = default(AllocationRegistration); From c226821149756c895934eb6412cbc397c7ec053c Mon Sep 17 00:00:00 2001 From: Apollo3zehn Date: Mon, 18 Sep 2023 16:26:57 +0200 Subject: [PATCH 4/4] Fixed a multithreading bug affecting the aggregation calculations --- CHANGELOG.md | 5 ++ src/Nexus/Services/ProcessingService.cs | 108 ++++++++++++++++++------ version.json | 2 +- 3 files changed, 89 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d874d9bf..4f6059da 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## v2.0.0-beta.16 - 2023-09-18 + +### Bugs fixed: +- Fixed a multithreading bug affecting the aggregation calculations. This bug very likely caused incorrect aggregation data probably for a long period of time for datasets with many NaN values. + ## v2.0.0-beta.15 - 2023-07-13 ### Bugs fixed: diff --git a/src/Nexus/Services/ProcessingService.cs b/src/Nexus/Services/ProcessingService.cs index 50b980ab..90f51762 100644 --- a/src/Nexus/Services/ProcessingService.cs +++ b/src/Nexus/Services/ProcessingService.cs @@ -135,7 +135,13 @@ private void ApplyAggregationFunction( Parallel.For(0, targetBuffer.Length, x => { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; + + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; if (isHighQuality) @@ -161,7 +167,14 @@ private void ApplyAggregationFunction( { var sin = sinBuffer.Span; var cos = cosBuffer.Span; - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; + + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; + var length = chunkData.Length; var isHighQuality = (length / (double)blockSize) >= _nanThreshold; @@ -191,7 +204,13 @@ private void ApplyAggregationFunction( Parallel.For(0, targetBuffer.Length, x => { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; + + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; if (isHighQuality) @@ -207,7 +226,13 @@ private void ApplyAggregationFunction( Parallel.For(0, targetBuffer.Length, x => { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; + + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; if (isHighQuality) @@ -223,7 +248,13 @@ private void ApplyAggregationFunction( Parallel.For(0, targetBuffer.Length, x => { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; + + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; if (isHighQuality) @@ -239,7 +270,13 @@ private void ApplyAggregationFunction( Parallel.For(0, targetBuffer.Length, x => { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; + + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; if (isHighQuality) @@ -255,7 +292,13 @@ private void ApplyAggregationFunction( Parallel.For(0, targetBuffer.Length, x => { - var chunkData = GetNaNFreeData(data.Slice(x * blockSize, blockSize)).Span; + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; + + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; + var isHighQuality = (chunkData.Length / (double)blockSize) >= _nanThreshold; if (isHighQuality) @@ -288,9 +331,13 @@ private void ApplyAggregationFunction( Parallel.For(0, targetBuffer.Length, x => { + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; + var chunkData = GetNaNFreeData( - data.Slice(x * blockSize, blockSize), - status.Slice(x * blockSize, blockSize)).Span; + source: data.Slice(x * blockSize, blockSize), + status: status.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; var targetBufferSpan = targetBuffer.Span; var length = chunkData.Length; @@ -324,9 +371,13 @@ private void ApplyAggregationFunction( Parallel.For(0, targetBuffer.Length, x => { - var chunkData = GetNaNFreeData(data - .Slice(x * blockSize, blockSize), status - .Slice(x * blockSize, blockSize)).Span; + using var nanFreeDataOwner = MemoryPool.Shared.Rent(blockSize); + var nanFreeData = nanFreeDataOwner.Memory; + + var chunkData = GetNaNFreeData( + source: data.Slice(x * blockSize, blockSize), + status: status.Slice(x * blockSize, blockSize), + target: nanFreeData).Span; var length = chunkData.Length; var isHighQuality = (length / (double)blockSize) >= _nanThreshold; @@ -355,43 +406,50 @@ private void ApplyAggregationFunction( } } - private static Memory GetNaNFreeData(Memory data, ReadOnlyMemory status) where T : unmanaged + private static Memory GetNaNFreeData( + ReadOnlyMemory source, + ReadOnlyMemory status, + Memory target) where T : unmanaged { var targetLength = 0; - var sourceLength = data.Length; - var spanData = data.Span; - var spanStatus = status.Span; + var sourceLength = source.Length; + var sourceSpan = source.Span; + var targetSpan = target.Span; + var statusSpan = status.Span; for (int i = 0; i < sourceLength; i++) { - if (spanStatus[i] == 1) + if (statusSpan[i] == 1) { - spanData[targetLength] = spanData[i]; + targetSpan[targetLength] = sourceSpan[i]; targetLength++; } } - return data[..targetLength]; + return target[..targetLength]; } - private static Memory GetNaNFreeData(Memory data) + private static Memory GetNaNFreeData( + ReadOnlyMemory source, + Memory target) { var targetLength = 0; - var sourceLength = data.Length; - var spanData = data.Span; + var sourceLength = source.Length; + var sourceSpan = source.Span; + var targetSpan = target.Span; for (int i = 0; i < sourceLength; i++) { - var value = spanData[i]; + var value = sourceSpan[i]; if (!double.IsNaN(value)) { - spanData[targetLength] = value; + targetSpan[targetLength] = value; targetLength++; } } - return data[..targetLength]; + return target[..targetLength]; } // TODO: vectorize diff --git a/version.json b/version.json index 73c135e6..b143d82d 100644 --- a/version.json +++ b/version.json @@ -1,4 +1,4 @@ { "version": "2.0.0", - "suffix": "beta.15" + "suffix": "beta.16" } \ No newline at end of file