Skip to content

Commit

Permalink
Add dropOutOfOrderRecord config to drop out-of-order events (apache#1…
Browse files Browse the repository at this point in the history
…1811)

* Add upsert config - dropOutOfOrderRecord to drop out-of-order events

* address comments

* refactor position of dropOOOrecord in class constructor

* refactor addRecord to return boolean
  • Loading branch information
tibrewalpratik17 authored Oct 25, 2023
1 parent 649477a commit 91bba48
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void loadSegment()
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, false, 0, INDEX_DIR,
Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, false, false, 0, INDEX_DIR,
serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,12 +489,19 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
if (isUpsertEnabled()) {
RecordInfo recordInfo = getRecordInfo(row, numDocsIndexed);
GenericRow updatedRow = _partitionUpsertMetadataManager.updateRecord(row, recordInfo);
updateDictionary(updatedRow);
addNewRow(numDocsIndexed, updatedRow);
// Update number of documents indexed before handling the upsert metadata so that the record becomes queryable
// once validated
canTakeMore = numDocsIndexed++ < _capacity;
_partitionUpsertMetadataManager.addRecord(this, recordInfo);
// if record doesn't need to be dropped, then persist in segment and update metadata hashmap
// we are doing metadata update first followed by segment data update here, there can be a scenario where
// segment indexing or addNewRow call errors out in those scenario, there can be metadata inconsistency where
// a key is pointing to some other key's docID
// TODO fix this metadata mismatch scenario
if (_partitionUpsertMetadataManager.addRecord(this, recordInfo)) {
updateDictionary(updatedRow);
addNewRow(numDocsIndexed, updatedRow);
// Update number of documents indexed before handling the upsert metadata so that the record becomes queryable
// once validated
numDocsIndexed++;
}
canTakeMore = numDocsIndexed < _capacity;
} else {
// Update dictionary first
updateDictionary(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected final PartialUpsertHandler _partialUpsertHandler;
protected final boolean _enableSnapshot;
protected final double _metadataTTL;
protected final boolean _dropOutOfOrderRecord;
protected final File _tableIndexDir;
protected final ServerMetrics _serverMetrics;
protected final Logger _logger;
Expand Down Expand Up @@ -95,7 +96,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
boolean dropOutOfOrderRecord, double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
_primaryKeyColumns = primaryKeyColumns;
Expand All @@ -105,6 +106,7 @@ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int parti
_partialUpsertHandler = partialUpsertHandler;
_enableSnapshot = enableSnapshot;
_metadataTTL = metadataTTL;
_dropOutOfOrderRecord = dropOutOfOrderRecord;
_tableIndexDir = tableIndexDir;
_snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
_serverMetrics = serverMetrics;
Expand Down Expand Up @@ -317,24 +319,25 @@ protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeM
}

@Override
public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
public boolean addRecord(MutableSegment segment, RecordInfo recordInfo) {
_gotFirstConsumingSegment = true;
if (!startOperation()) {
_logger.debug("Skip adding record to segment: {} because metadata manager is already stopped",
segment.getSegmentName());
return;
return false;
}
// NOTE: We don't acquire snapshot read lock here because snapshot is always taken before a new consuming segment
// starts consuming, so it won't overlap with this method
try {
doAddRecord(segment, recordInfo);
boolean addRecord = doAddRecord(segment, recordInfo);
_trackedSegments.add(segment);
return addRecord;
} finally {
finishOperation();
}
}

protected abstract void doAddRecord(MutableSegment segment, RecordInfo recordInfo);
protected abstract boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo);

@Override
public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
protected ServerMetrics _serverMetrics;
protected HelixManager _helixManager;
protected ExecutorService _segmentPreloadExecutor;
protected boolean _dropOutOfOrderRecord;

private volatile boolean _isPreloading = false;

Expand Down Expand Up @@ -111,6 +112,7 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD

_enableSnapshot = upsertConfig.isEnableSnapshot();
_metadataTTL = upsertConfig.getMetadataTTL();
_dropOutOfOrderRecord = upsertConfig.isDropOutOfOrderRecord();
_tableIndexDir = tableDataManager.getTableDataDir();
_serverMetrics = serverMetrics;
_helixManager = helixManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -56,9 +57,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
boolean dropOutOfOrderRecord, double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction,
partialUpsertHandler, enableSnapshot, metadataTTL, tableIndexDir, serverMetrics);
partialUpsertHandler, enableSnapshot, dropOutOfOrderRecord, metadataTTL, tableIndexDir, serverMetrics);
}

@Override
Expand Down Expand Up @@ -236,7 +237,8 @@ public void doRemoveExpiredPrimaryKeys() {
}

@Override
protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
AtomicBoolean shouldDropRecord = new AtomicBoolean(false);
ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds();
int newDocId = recordInfo.getDocId();
Expand Down Expand Up @@ -267,6 +269,9 @@ protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
return new RecordLocation(segment, newDocId, newComparisonValue);
} else {
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
// this is a out-of-order record, if upsert config _dropOutOfOrderRecord is true, then set
// shouldDropRecord to true. This method returns inverse of this value
shouldDropRecord.set(_dropOutOfOrderRecord);
return currentRecordLocation;
}
} else {
Expand All @@ -279,6 +284,7 @@ protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
// Update metrics
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
_primaryKeyToRecordLocationMap.size());
return !shouldDropRecord.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(i
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler,
_enableSnapshot, _metadataTTL, _tableIndexDir, _serverMetrics));
_enableSnapshot, _dropOutOfOrderRecord, _metadataTTL, _tableIndexDir, _serverMetrics));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public interface PartitionUpsertMetadataManager extends Closeable {
/**
* Updates the upsert metadata for a new consumed record in the given consuming segment.
*/
void addRecord(MutableSegment segment, RecordInfo recordInfo);
boolean addRecord(MutableSegment segment, RecordInfo recordInfo);

/**
* Replaces the upsert metadata for the old segment with the new immutable segment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void tearDown()
public void testStartFinishOperation() {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 0, INDEX_DIR,
Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, false, 0, INDEX_DIR,
mock(ServerMetrics.class));

// Start 2 operations
Expand Down Expand Up @@ -163,7 +163,7 @@ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean en
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
Collections.singletonList(comparisonColumn), null, hashFunction, null, false, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
Expand Down Expand Up @@ -326,8 +326,8 @@ private void verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction hashFunc
String deleteRecordColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, false, 0,
INDEX_DIR, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;

Expand Down Expand Up @@ -609,7 +609,7 @@ private void verifyAddRecord(HashFunction hashFunction)
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
Collections.singletonList(comparisonColumn), null, hashFunction, null, false, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;

Expand Down Expand Up @@ -689,6 +689,77 @@ private void verifyAddRecord(HashFunction hashFunction)
upsertMetadataManager.close();
}

@Test
public void testAddOutOfOrderRecord()
throws IOException {
verifyAddOutOfOrderRecord(HashFunction.NONE);
verifyAddOutOfOrderRecord(HashFunction.MD5);
verifyAddOutOfOrderRecord(HashFunction.MURMUR3);
}

private void verifyAddOutOfOrderRecord(HashFunction hashFunction)
throws IOException {
String comparisonColumn = "timeCol";
// here dropOutOfOrderRecord = true
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList(comparisonColumn), null, hashFunction, null, false, true, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;

// Add the first segment
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
int numRecords = 3;
int[] primaryKeys = new int[]{0, 1, 2};
int[] timestamps = new int[]{100, 120, 100};
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment1 =
mockImmutableSegment(1, validDocIds1, null, getPrimaryKeyList(numRecords, primaryKeys));
upsertMetadataManager.addSegment(segment1, validDocIds1, null,
getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator());

// Update records from the second segment
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
MutableSegment segment2 = mockMutableSegment(1, validDocIds2, null);

// new record, should return true to add it
boolean shouldAddRecord =
upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100), false));
assertTrue(shouldAddRecord);

// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});

// send an out-of-order event, should return false to drop event
shouldAddRecord =
upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(80), false));
assertFalse(shouldAddRecord);

// ordered event for an existing key
shouldAddRecord =
upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(150), false));
assertTrue(shouldAddRecord);

// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 3 -> {0, 100}, 2 -> {1, 150}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});

// Close the metadata manager
upsertMetadataManager.stop();
upsertMetadataManager.close();
}

@Test
public void testPreloadSegment() {
verifyPreloadSegment(HashFunction.NONE);
Expand All @@ -700,7 +771,7 @@ private void verifyPreloadSegment(HashFunction hashFunction) {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
Collections.singletonList(comparisonColumn), null, hashFunction, null, false, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;

Expand Down Expand Up @@ -756,7 +827,7 @@ private void verifyAddRecordWithDeleteColumn(HashFunction hashFunction)
String deleteColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, false, 0, INDEX_DIR,
Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, false, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;

Expand Down Expand Up @@ -857,7 +928,7 @@ private void verifyRemoveExpiredPrimaryKeys(Comparable earlierComparisonValue, C

ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, tableDir,
Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, false, 30, tableDir,
mock(ServerMetrics.class));
Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
Expand Down Expand Up @@ -918,7 +989,7 @@ private void verifyAddOutOfTTLSegment() {

ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 30, tableDir,
mock(ServerMetrics.class));
Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
Expand Down Expand Up @@ -984,7 +1055,7 @@ private void verifyAddSegmentForTTL(Comparable comparisonValue) {

ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 30, tableDir,
mock(ServerMetrics.class));
Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
Expand Down Expand Up @@ -1038,7 +1109,7 @@ private static void checkRecordLocationForTTL(Map<Object, RecordLocation> record
private void verifyPersistAndLoadWatermark() {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 10, INDEX_DIR,
Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 10, INDEX_DIR,
mock(ServerMetrics.class));

double currentTimeMs = System.currentTimeMillis();
Expand Down
Loading

0 comments on commit 91bba48

Please sign in to comment.