Skip to content

Commit

Permalink
Be more conservative when collecting old records
Browse files Browse the repository at this point in the history
Summary: Because records of different streams may have the same timestamp, when collecting "old" records to write, it's safer to exclude the limit time, so that if a record is later produced with the same timestamp as the limit given, it is more likely to be sorted correctly when written out to disk. This condition can happen when copying files with vrstool.

Reviewed By: kiminoue7

Differential Revision: D52425279

fbshipit-source-id: 166fe9049c5913a238837b5bbebeb598326677b3
  • Loading branch information
Georges Berenger authored and facebook-github-bot committed Dec 28, 2023
1 parent f1cde52 commit 346713e
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
2 changes: 1 addition & 1 deletion vrs/RecordManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ void RecordManager::collectOldRecords(double maxAge, list<Record*>& outCollected
if (!activeRecords_.empty()) {
auto iterator = upper_bound(
activeRecords_.begin(), activeRecords_.end(), maxAge, [](double age, Record* record) {
return age < record->timestamp_;
return record->timestamp_ >= age;
});

// Move a range without copying elements.
Expand Down
8 changes: 5 additions & 3 deletions vrs/test/RecordManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ TEST(RecordManager, CollectOldRecords) {
// Verify that we can pull a subsection of the records.
list<Record*> recordData;
manager.collectOldRecords(1.33, recordData);
EXPECT_EQ(recordData.size(), 134);
EXPECT_EQ(recordData.size(), 133);
EXPECT_DOUBLE_EQ(recordData.front()->getTimestamp(), 0.0);
EXPECT_DOUBLE_EQ(recordData.back()->getTimestamp(), 1.33);
EXPECT_LT(recordData.back()->getTimestamp(), 1.33);
for (auto* r : recordData) {
r->recycle();
}
Expand All @@ -52,9 +52,11 @@ TEST(RecordManager, CollectOldRecords) {
list<Record*> noRecords;
manager.collectOldRecords(0.74, noRecords);
EXPECT_TRUE(noRecords.empty());
manager.collectOldRecords(1.33, noRecords);
EXPECT_TRUE(noRecords.empty());

// Purge the rest of the records.
EXPECT_EQ(1000 - 134, manager.purgeOldRecords(10.0));
EXPECT_EQ(1000 - recordData.size(), manager.purgeOldRecords(10.0));

// Call it again. There shouldn't be anymore.
manager.collectOldRecords(1000.0, noRecords);
Expand Down
14 changes: 7 additions & 7 deletions vrs/test/RecordTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ size_t collect(
list<Record*>& records = batch.back().second;
r.first->collectOldRecords(maxTime, records);
for (auto record : records) {
EXPECT_LE(record->getTimestamp(), maxTime);
EXPECT_LT(record->getTimestamp(), maxTime);
}
count += records.size();
}
Expand Down Expand Up @@ -150,32 +150,32 @@ TEST_F(RecordTester, addRecordBatchesToSortedRecordsTester) {
vector<pair<RecordManager*, StreamId>> recordManagerBOnly{{&recordManagerB, idB}};
vector<pair<RecordManager*, StreamId>> recordManagerCOnly{{&recordManagerC, idC}};

EXPECT_EQ(collect(batches, recordManagersAll, 5), 86);
EXPECT_EQ(collect(batches, recordManagersAll, 5), 85);
RecordFileWriterTester::addRecordBatchesToSortedRecords(batches, sr);
EXPECT_EQ(sr.size(), 86);
EXPECT_EQ(sr.size(), 85);
EXPECT_TRUE(isProperlySorted(sr));
batches.clear();

EXPECT_EQ(collect(batches, recordManagerAB, 8), 33);
EXPECT_EQ(collect(batches, recordManagerCOnly, 8), 20);
recordManagerA.createRecord(6.25, Record::Type::DATA, 1, DataSource());
recordManagerB.createRecord(4, Record::Type::DATA, 1, DataSource());
EXPECT_EQ(collect(batches, recordManagersAll, 10), 38);
EXPECT_EQ(collect(batches, recordManagersAll, 10), 37);
RecordFileWriterTester::addRecordBatchesToSortedRecords(batches, sr);
EXPECT_EQ(sr.size(), 177);
EXPECT_EQ(sr.size(), 175);
EXPECT_TRUE(isProperlySorted(sr));
batches.clear();

// don't collect anything this time
EXPECT_EQ(collect(batches, recordManagersAll, 10), 0);
RecordFileWriterTester::addRecordBatchesToSortedRecords(batches, sr);
EXPECT_EQ(sr.size(), 177);
EXPECT_EQ(sr.size(), 175);
EXPECT_TRUE(isProperlySorted(sr));
batches.clear();

recordManagerA.createRecord(2.5, Record::Type::DATA, 1, DataSource());
recordManagerA.createRecord(3.5, Record::Type::DATA, 1, DataSource());
EXPECT_EQ(collect(batches, recordManagersAll, 100), 277);
EXPECT_EQ(collect(batches, recordManagersAll, 100), 279);
RecordFileWriterTester::addRecordBatchesToSortedRecords(batches, sr);
EXPECT_EQ(sr.size(), 454);
EXPECT_TRUE(isProperlySorted(sr));
Expand Down

0 comments on commit 346713e

Please sign in to comment.