Skip to content

Commit

Permalink
Fix #5: Reading for irregular data sources is quite slow because the …
Browse files Browse the repository at this point in the history
…current algorithm does not fast forward to the next available file
  • Loading branch information
Apollo3zehn committed Aug 26, 2024
1 parent 96f31d5 commit 4b51555
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
5 changes: 5 additions & 0 deletions src/Nexus.Sources.StructuredFile/DateTimeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,9 @@ public static DateTime RoundDown(this DateTime dateTime, TimeSpan timeSpan)
{
return new DateTime(dateTime.Ticks - (dateTime.Ticks % timeSpan.Ticks), dateTime.Kind);
}

public static TimeSpan RoundDown(this TimeSpan timespan, TimeSpan timeSpan2)
{
return new TimeSpan(timespan.Ticks - (timespan.Ticks % timeSpan2.Ticks));
}
}
54 changes: 40 additions & 14 deletions src/Nexus.Sources.StructuredFile/StructuredFileDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -402,14 +402,24 @@ protected virtual async Task ReadAsync(
var consumedFilePeriod = currentBegin - regularUtcFileBegin;
var remainingFilePeriod = fileSource.FilePeriod - consumedFilePeriod;
var currentPeriod = TimeSpan.FromTicks(Math.Min(remainingFilePeriod.Ticks, remainingPeriod.Ticks));

Logger.LogTrace("Process period {CurrentBegin} to {CurrentEnd}", currentBegin, currentBegin + currentPeriod);

var fileBlock = (int)(currentPeriod.Ticks / samplePeriod.Ticks);
var fileOffset = consumedFilePeriod.Ticks / samplePeriod.Ticks;

foreach (var (filePath, fileBeginOffset) in fileInfos)
{
/* This happens when FindFileBeginAndPathsAsync did not find files for the current time period */
if (fileBeginOffset >= fileSource.FilePeriod)
{
/* Modify loop status update variables and leave the loop */
currentPeriod = fileBeginOffset.RoundDown(fileSource.FilePeriod);
fileBlock = (int)(currentPeriod.Ticks / samplePeriod.Ticks);

break;
}

if (File.Exists(filePath))
{
// compensate offsets and lengths in case of incomplete or irregular file
Expand All @@ -423,7 +433,7 @@ protected virtual async Task ReadAsync(
? 0

/* The irregular or incomplete file contains no data for the current buffer position, so compensate for it */
: + fileCompensation
: +fileCompensation
);

/* The irregular or incomplete file contains not enough data, so make the file block smaller */
Expand All @@ -435,15 +445,15 @@ protected virtual async Task ReadAsync(
fileCompensation < 0 /* = irregular file */

/* The irregular file starts earlier than expected, so compensate for it */
? - fileCompensation
? -fileCompensation

/* The irregular or incomplete file starts later than expected */
: - fileCompensation
: -fileCompensation
);

/* The maximum value for fileCompensation is MaxFileBlock = FilePeriod / SamplePeriod
* so there is no need to check for actualFileOffset >= MaxFileBlock.
* However, it might happen that actualFileOffset < 0. This must be compensated. */
* so there is no need to check for actualFileOffset >= MaxFileBlock.
* However, it might happen that actualFileOffset < 0. This must be compensated. */
if (actualFileOffset < 0)
actualFileOffset = 0;

Expand All @@ -458,13 +468,13 @@ protected virtual async Task ReadAsync(

var slicedData = request.Data
.Slice(
start: actualBufferOffset * representation.ElementSize,
start: actualBufferOffset * representation.ElementSize,
length: actualFileBlock * representation.ElementSize
);

var slicedStatus = request.Status
.Slice(
start: actualBufferOffset,
start: actualBufferOffset,
length: actualFileBlock
);

Expand Down Expand Up @@ -534,7 +544,7 @@ protected virtual async Task ReadAsync(
Logger.LogError(ex, "Could not read file source group");
}

++fileSourceGroupIndex;
fileSourceGroupIndex++;
}
}

Expand All @@ -550,7 +560,7 @@ protected abstract Task ReadAsync(
StructuredFileReadRequest[] readRequests,
CancellationToken cancellationToken);

private protected Task<(DateTime RegularUtcFileBegin, IEnumerable<(string FilePath, TimeSpan FileBeginOffset)>)>
private protected Task<(DateTime RegularUtcFileBegin, (string FilePath, TimeSpan FileBeginOffset)[])>
FindFileBeginAndPathsAsync(DateTime begin, FileSource fileSource)
{
/* This implementation assumes that files are stored in regular time intervals.
Expand Down Expand Up @@ -588,7 +598,7 @@ protected abstract Task ReadAsync(
fileSource.UtcOffset
).UtcDateTime;

IEnumerable<(string FilePath, TimeSpan FileBeginOffset)> fileInfos;
(string FilePath, TimeSpan FileBeginOffset)[] fileInfos;

if (fileSource.FileTemplate.Contains('?') || fileSource.FileTemplate.Contains('*'))
{
Expand All @@ -598,14 +608,18 @@ protected abstract Task ReadAsync(

var regularUtcFileEnd = regularUtcFileBegin + fileSource.FilePeriod;

fileInfos = GetCandidateFiles(
// get all files that match the pattern
var tmpFileInfos = GetCandidateFiles(
rootPath: Root,
begin: regularUtcFileBegin,
end: regularUtcFileBegin + fileSource.FilePeriod,
fileSource,
CancellationToken.None
)
.Where(current =>
.ToArray();

// keep only files for current period
fileInfos = tmpFileInfos.Where(current =>
{
if (fileSource.IrregularTimeInterval)
{
Expand All @@ -621,7 +635,19 @@ protected abstract Task ReadAsync(
current.DateTimeOffset.UtcDateTime < regularUtcFileEnd;
}
})
.Select(current => (current.FilePath, current.DateTimeOffset.UtcDateTime - regularUtcFileBegin));
.Select(current => (current.FilePath, current.DateTimeOffset.UtcDateTime - regularUtcFileBegin))
.ToArray();

// no files left for current time period - try to find next file after this period
if (!fileInfos.Any())
{
fileInfos = tmpFileInfos
.OrderBy(x => x.DateTimeOffset.DateTime)
.Where(current => regularUtcFileEnd <= current.DateTimeOffset.UtcDateTime)
.Select(current => (current.FilePath, current.DateTimeOffset.UtcDateTime - regularUtcFileBegin))
.Take(1)
.ToArray();
}
}

else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public StructuredFileDataSourceTester(

public Dictionary<string, Dictionary<string, IReadOnlyList<FileSource>>> Config { get; private set; } = default!;

public new Task<(DateTime RegularUtcFileBegin, IEnumerable<(string FilePath, TimeSpan FileBeginOffset)>)>
public new Task<(DateTime RegularUtcFileBegin, (string FilePath, TimeSpan FileBeginOffset)[])>
FindFileBeginAndPathsAsync(DateTime begin, FileSource fileSource)
{
return base.FindFileBeginAndPathsAsync(begin, fileSource);
Expand Down

0 comments on commit 4b51555

Please sign in to comment.