Skip to content

Commit

Permalink
Track progress during file processing operations more accurately
Browse files Browse the repository at this point in the history
Summary: Some VRS files have empty timestamp range, which can skew progress tracking heavily. By tracking progress using record index, we hope to improve such situations. Note that it's still possible to build weird files with very large records all grouped and otherwise tons of small records, large records that can take the vast majority of the time to process, and get a loosy progress estimation, but such files are very rare, much more rare than files with huge time range, because a few records are way outside of the time range of the other records.

Reviewed By: finik

Differential Revision: D60616414

fbshipit-source-id: 925f57586cd0bc108ffdbc889732ac4d06c95693
  • Loading branch information
Georges Berenger authored and facebook-github-bot committed Aug 7, 2024
1 parent b4cc00a commit 5adf845
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 10 deletions.
2 changes: 1 addition & 1 deletion vrs/utils/FilterCopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ int filterCopy(
if (copyResult == 0) {
// Init tracker progress early, to be sure we track the background thread queue size
filteredReader.preRollConfigAndState(); // make sure to copy most recent config & state records
throttledWriter.initTimeRange(startTimestamp, endTimestamp);
throttledWriter.initTimeRange(startTimestamp, endTimestamp, &filteredReader.reader);
filteredReader.iterateAdvanced(&throttledWriter);
for (auto& filter : filters) {
filter->flush();
Expand Down
33 changes: 29 additions & 4 deletions vrs/utils/ThrottleHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <iomanip>
#include <iostream>

#include <vrs/RecordFileReader.h>
#include <vrs/helpers/Strings.h>
#include <vrs/os/Time.h>

Expand Down Expand Up @@ -73,9 +74,24 @@ RecordFileWriter& ThrottledWriter::getWriter() {
return writer_;
}

void ThrottledWriter::initTimeRange(double minTimestamp, double maxTimestamp) {
void ThrottledWriter::initTimeRange(double minTimestamp, double maxTimestamp, RecordFileReader* r) {
minTimestamp_ = minTimestamp;
duration_ = maxTimestamp - minTimestamp;
reader_ = r;
if (reader_ != nullptr && minTimestamp < maxTimestamp) {
auto* minRecord = reader_->getRecordByTime(minTimestamp);
if (minRecord != nullptr) {
minIndex_ = reader_->getRecordIndex(minRecord);
auto* maxRecord = reader_->getRecordByTime(maxTimestamp);
uint32_t maxIndex =
(maxRecord != nullptr) ? reader_->getRecordIndex(maxRecord) : reader_->getRecordCount();
indexRange_ = maxIndex - minIndex_;
} else {
indexRange_ = 0;
}
} else {
indexRange_ = 0;
}
}

void ThrottledWriter::onRecordDecoded(double timestamp, double writeGraceWindow) {
Expand All @@ -102,9 +118,18 @@ void ThrottledWriter::onRecordDecoded(double timestamp, double writeGraceWindow)
if (showProgress()) {
double now = os::getTimestampSec();
if (now >= nextUpdateTime_) {
double progress = duration_ > 0.0001 ? (timestamp - minTimestamp_) / duration_ : 1.;
// timestamp ranges only include data records, but config & state records might be beyond
percent_ = max<int32_t>(static_cast<int32_t>(progress * 100), 0);
// timestamp range only includes data records, and config & state records might be beyond
if (reader_ == nullptr || indexRange_ == 0) {
double progress = duration_ > 0.0001 ? (timestamp - minTimestamp_) / duration_ : 1.;
percent_ = static_cast<int32_t>(progress * 100);
} else {
auto* record = reader_->getRecordByTime(timestamp);
uint32_t index = std::max<uint32_t>(
record != nullptr ? reader_->getRecordIndex(record) : reader_->getRecordCount(),
minIndex_);
percent_ = (100 * (index - minIndex_)) / indexRange_;
}
percent_ = max<int32_t>(percent_, 0);
percent_ = min<int32_t>(percent_, 100);
printPercentAndQueueSize(writer_.getBackgroundThreadQueueByteSize(), false);
nextUpdateTime_ = now + kRefreshDelaySec;
Expand Down
8 changes: 6 additions & 2 deletions vrs/utils/ThrottleHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ class ThrottledWriter {
/// @return The RecordFileWriter used to write the output file.
RecordFileWriter& getWriter();

/// Set the range of timestamps expected, to track progress on the time range.
/// Set the range of timestamps expected, to track operation progress.
/// @param minTimestamp: earliest timestamp of the operation
/// @param maxTimestamp: latest timestamp of the operation
void initTimeRange(double minTimestamp, double maxTimestamp);
/// @param reader: the source file reader, which might allow us to track progress more precisely
void initTimeRange(double minTimestamp, double maxTimestamp, RecordFileReader* reader = nullptr);

/// Called when a record is read, which can allow you to slow down decoding by adding a mere sleep
/// in the callback itself. This is the main use case of this callback, as data is queued for
Expand Down Expand Up @@ -81,6 +82,9 @@ class ThrottledWriter {
int32_t percent_ = 0;
double minTimestamp_ = 0;
double duration_ = 0;
RecordFileReader* reader_ = nullptr;
uint32_t minIndex_ = 0;
uint32_t indexRange_ = 0;
};

/// Default handling of file creation & closing, offering customization opportunities
Expand Down
6 changes: 3 additions & 3 deletions vrs/utils/Validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ string decodeValidation(FilteredFileReader& filteredReader, const CopyOptions& c
filteredReader.preRollConfigAndState(); // make sure to copy most recent config & state records

ThrottledWriter throttledWriter(copyOptions);
throttledWriter.initTimeRange(startTimestamp, endTimestamp);
throttledWriter.initTimeRange(startTimestamp, endTimestamp, &filteredReader.reader);

size_t readRecordCount = 0;
bool noError = true;
Expand Down Expand Up @@ -382,7 +382,7 @@ string checkRecords(
filteredReader.preRollConfigAndState(); // make sure to copy most recent config & state records

ThrottledWriter throttledWriter(copyOptions);
throttledWriter.initTimeRange(startTimestamp, endTimestamp);
throttledWriter.initTimeRange(startTimestamp, endTimestamp, &filteredReader.reader);

size_t decodedCount = 0;
bool noError = true;
Expand Down Expand Up @@ -755,7 +755,7 @@ bool compareVRSfiles(
first.preRollConfigAndState(); // make sure to copy most recent config & state records

ThrottledWriter throttledWriter(copyOptions);
throttledWriter.initTimeRange(startTimestamp, endTimestamp);
throttledWriter.initTimeRange(startTimestamp, endTimestamp, &first.reader);

map<StreamId, StreamId> idMap;
if (!buildIdMap(first, second, idMap)) {
Expand Down

0 comments on commit 5adf845

Please sign in to comment.