diff --git a/src/Nexus.Sources.StructuredFile/DateTimeExtensions.cs b/src/Nexus.Sources.StructuredFile/DateTimeExtensions.cs index 11c29c3..817d300 100644 --- a/src/Nexus.Sources.StructuredFile/DateTimeExtensions.cs +++ b/src/Nexus.Sources.StructuredFile/DateTimeExtensions.cs @@ -6,4 +6,9 @@ public static DateTime RoundDown(this DateTime dateTime, TimeSpan timeSpan) { return new DateTime(dateTime.Ticks - (dateTime.Ticks % timeSpan.Ticks), dateTime.Kind); } + + public static TimeSpan RoundDown(this TimeSpan timespan, TimeSpan timeSpan2) + { + return new TimeSpan(timespan.Ticks - (timespan.Ticks % timeSpan2.Ticks)); + } } diff --git a/src/Nexus.Sources.StructuredFile/StructuredFileDataSource.cs b/src/Nexus.Sources.StructuredFile/StructuredFileDataSource.cs index db0e2df..d5454d9 100644 --- a/src/Nexus.Sources.StructuredFile/StructuredFileDataSource.cs +++ b/src/Nexus.Sources.StructuredFile/StructuredFileDataSource.cs @@ -402,7 +402,7 @@ protected virtual async Task ReadAsync( var consumedFilePeriod = currentBegin - regularUtcFileBegin; var remainingFilePeriod = fileSource.FilePeriod - consumedFilePeriod; var currentPeriod = TimeSpan.FromTicks(Math.Min(remainingFilePeriod.Ticks, remainingPeriod.Ticks)); - + Logger.LogTrace("Process period {CurrentBegin} to {CurrentEnd}", currentBegin, currentBegin + currentPeriod); var fileBlock = (int)(currentPeriod.Ticks / samplePeriod.Ticks); @@ -410,6 +410,16 @@ protected virtual async Task ReadAsync( foreach (var (filePath, fileBeginOffset) in fileInfos) { + /* This happens when FindFileBeginAndPathsAsync did not find files for the current time period */ + if (fileBeginOffset >= fileSource.FilePeriod) + { + /* Modify loop status update variables and leave the loop */ + currentPeriod = fileBeginOffset.RoundDown(fileSource.FilePeriod); + fileBlock = (int)(currentPeriod.Ticks / samplePeriod.Ticks); + + break; + } + if (File.Exists(filePath)) { // compensate offsets and lengths in case of incomplete or irregular file @@ -423,7 +433,7 @@ protected virtual async Task ReadAsync( ? 0 /* The irregular or incomplete file contains no data for the current buffer position, so compensate for it */ - : + fileCompensation + : +fileCompensation ); /* The irregular or incomplete file contains not enough data, so make the file block smaller */ @@ -435,15 +445,15 @@ protected virtual async Task ReadAsync( fileCompensation < 0 /* = irregular file */ /* The irregular file starts earlier than expected, so compensate for it */ - ? - fileCompensation + ? -fileCompensation /* The irregular or incomplete file starts later than expected */ - : - fileCompensation + : -fileCompensation ); /* The maximum value for fileCompensation is MaxFileBlock = FilePeriod / SamplePeriod - * so there is no need to check for actualFileOffset >= MaxFileBlock. - * However, it might happen that actualFileOffset < 0. This must be compensated. */ + * so there is no need to check for actualFileOffset >= MaxFileBlock. + * However, it might happen that actualFileOffset < 0. This must be compensated. */ if (actualFileOffset < 0) actualFileOffset = 0; @@ -458,13 +468,13 @@ protected virtual async Task ReadAsync( var slicedData = request.Data .Slice( - start: actualBufferOffset * representation.ElementSize, + start: actualBufferOffset * representation.ElementSize, length: actualFileBlock * representation.ElementSize ); var slicedStatus = request.Status .Slice( - start: actualBufferOffset, + start: actualBufferOffset, length: actualFileBlock ); @@ -534,7 +544,7 @@ protected virtual async Task ReadAsync( Logger.LogError(ex, "Could not read file source group"); } - ++fileSourceGroupIndex; + fileSourceGroupIndex++; } } @@ -550,7 +560,7 @@ protected abstract Task ReadAsync( StructuredFileReadRequest[] readRequests, CancellationToken cancellationToken); - private protected Task<(DateTime RegularUtcFileBegin, IEnumerable<(string FilePath, TimeSpan FileBeginOffset)>)> + private protected Task<(DateTime RegularUtcFileBegin, (string FilePath, TimeSpan FileBeginOffset)[])> FindFileBeginAndPathsAsync(DateTime begin, FileSource fileSource) { /* This implementation assumes that files are stored in regular time intervals. @@ -588,7 +598,7 @@ protected abstract Task ReadAsync( fileSource.UtcOffset ).UtcDateTime; - IEnumerable<(string FilePath, TimeSpan FileBeginOffset)> fileInfos; + (string FilePath, TimeSpan FileBeginOffset)[] fileInfos; if (fileSource.FileTemplate.Contains('?') || fileSource.FileTemplate.Contains('*')) { @@ -598,14 +608,18 @@ protected abstract Task ReadAsync( var regularUtcFileEnd = regularUtcFileBegin + fileSource.FilePeriod; - fileInfos = GetCandidateFiles( + // get all files that match the pattern + var tmpFileInfos = GetCandidateFiles( rootPath: Root, begin: regularUtcFileBegin, end: regularUtcFileBegin + fileSource.FilePeriod, fileSource, CancellationToken.None ) - .Where(current => + .ToArray(); + + // keep only files for current period + fileInfos = tmpFileInfos.Where(current => { if (fileSource.IrregularTimeInterval) { @@ -621,7 +635,19 @@ protected abstract Task ReadAsync( current.DateTimeOffset.UtcDateTime < regularUtcFileEnd; } }) - .Select(current => (current.FilePath, current.DateTimeOffset.UtcDateTime - regularUtcFileBegin)); + .Select(current => (current.FilePath, current.DateTimeOffset.UtcDateTime - regularUtcFileBegin)) + .ToArray(); + + // no files left for current time period - try to find next file after this period + if (!fileInfos.Any()) + { + fileInfos = tmpFileInfos + .OrderBy(x => x.DateTimeOffset.DateTime) + .Where(current => regularUtcFileEnd <= current.DateTimeOffset.UtcDateTime) + .Select(current => (current.FilePath, current.DateTimeOffset.UtcDateTime - regularUtcFileBegin)) + .Take(1) + .ToArray(); + } } else diff --git a/tests/Nexus.Sources.StructuredFile.Tests/StructuredFileDataSourceTester.cs b/tests/Nexus.Sources.StructuredFile.Tests/StructuredFileDataSourceTester.cs index 950222b..be1064a 100644 --- a/tests/Nexus.Sources.StructuredFile.Tests/StructuredFileDataSourceTester.cs +++ b/tests/Nexus.Sources.StructuredFile.Tests/StructuredFileDataSourceTester.cs @@ -16,7 +16,7 @@ public StructuredFileDataSourceTester( public Dictionary>> Config { get; private set; } = default!; - public new Task<(DateTime RegularUtcFileBegin, IEnumerable<(string FilePath, TimeSpan FileBeginOffset)>)> + public new Task<(DateTime RegularUtcFileBegin, (string FilePath, TimeSpan FileBeginOffset)[])> FindFileBeginAndPathsAsync(DateTime begin, FileSource fileSource) { return base.FindFileBeginAndPathsAsync(begin, fileSource);