Skip to content

Commit

Permalink
Add upsert config - outOfOrderRecordColumn to track out-of-order even…
Browse files Browse the repository at this point in the history
…ts (apache#11877)
  • Loading branch information
tibrewalpratik17 authored Nov 17, 2023
1 parent 0cb43eb commit c1fdb66
Show file tree
Hide file tree
Showing 17 changed files with 253 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,8 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns())
.setUpsertDeleteRecordColumn(tableConfig.getUpsertDeleteRecordColumn())
.setUpsertOutOfOrderRecordColumn(tableConfig.getOutOfOrderRecordColumn())
.setUpsertDropOutOfOrderRecord(tableConfig.isDropOutOfOrderRecord())
.setFieldConfigList(tableConfig.getFieldConfigList());

// Create message decoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ public void loadSegment()
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, false, false, 0, INDEX_DIR,
serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null,
false, 0, INDEX_DIR, serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ public class MutableSegmentImpl implements MutableSegment {
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final List<String> _upsertComparisonColumns;
private final String _deleteRecordColumn;
private final String _upsertOutOfOrderRecordColumn;
private final boolean _upsertDropOutOfOrderRecord;
// The valid doc ids are maintained locally instead of in the upsert metadata manager because:
// 1. There is only one consuming segment per partition, the committed segments do not need to modify the valid doc
// ids for the consuming segment.
Expand Down Expand Up @@ -381,6 +383,8 @@ public boolean isMutableSegment() {
_upsertComparisonColumns =
upsertComparisonColumns != null ? upsertComparisonColumns : Collections.singletonList(_timeColumnName);
_deleteRecordColumn = config.getUpsertDeleteRecordColumn();
_upsertOutOfOrderRecordColumn = config.getUpsertOutOfOrderRecordColumn();
_upsertDropOutOfOrderRecord = config.isUpsertDropOutOfOrderRecord();
_validDocIds = new ThreadSafeMutableRoaringBitmap();
if (_deleteRecordColumn != null) {
_queryableDocIds = new ThreadSafeMutableRoaringBitmap();
Expand All @@ -392,6 +396,8 @@ public boolean isMutableSegment() {
_deleteRecordColumn = null;
_validDocIds = null;
_queryableDocIds = null;
_upsertOutOfOrderRecordColumn = null;
_upsertDropOutOfOrderRecord = false;
}
}

Expand Down Expand Up @@ -496,7 +502,11 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
// segment indexing or addNewRow call errors out in those scenario, there can be metadata inconsistency where
// a key is pointing to some other key's docID
// TODO fix this metadata mismatch scenario
if (_partitionUpsertMetadataManager.addRecord(this, recordInfo)) {
boolean isOutOfOrderRecord = !_partitionUpsertMetadataManager.addRecord(this, recordInfo);
if (_upsertOutOfOrderRecordColumn != null) {
updatedRow.putValue(_upsertOutOfOrderRecordColumn, BooleanUtils.toInt(isOutOfOrderRecord));
}
if (!isOutOfOrderRecord || !_upsertDropOutOfOrderRecord) {
updateDictionary(updatedRow);
addNewRow(numDocsIndexed, updatedRow);
// Update number of documents indexed before handling the upsert metadata so that the record becomes queryable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class RealtimeSegmentConfig {
private final UpsertConfig.Mode _upsertMode;
private final List<String> _upsertComparisonColumns;
private final String _upsertDeleteRecordColumn;
private final String _upsertOutOfOrderRecordColumn;
private final boolean _upsertDropOutOfOrderRecord;
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
private final String _consumerDir;
Expand All @@ -76,7 +78,8 @@ private RealtimeSegmentConfig(String tableNameWithType, String segmentName, Stri
PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn,
PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled,
String consumerDir, UpsertConfig.Mode upsertMode, List<String> upsertComparisonColumns,
String upsertDeleteRecordColumn, PartitionUpsertMetadataManager partitionUpsertMetadataManager,
String upsertDeleteRecordColumn, String upsertOutOfOrderRecordColumn, boolean upsertDropOutOfOrderRecord,
PartitionUpsertMetadataManager partitionUpsertMetadataManager,
PartitionDedupMetadataManager partitionDedupMetadataManager, List<FieldConfig> fieldConfigList,
List<AggregationConfig> ingestionAggregationConfigs) {
_tableNameWithType = tableNameWithType;
Expand All @@ -100,6 +103,8 @@ private RealtimeSegmentConfig(String tableNameWithType, String segmentName, Stri
_upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE;
_upsertComparisonColumns = upsertComparisonColumns;
_upsertDeleteRecordColumn = upsertDeleteRecordColumn;
_upsertOutOfOrderRecordColumn = upsertOutOfOrderRecordColumn;
_upsertDropOutOfOrderRecord = upsertDropOutOfOrderRecord;
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
_partitionDedupMetadataManager = partitionDedupMetadataManager;
_fieldConfigList = fieldConfigList;
Expand Down Expand Up @@ -195,6 +200,14 @@ public String getUpsertDeleteRecordColumn() {
return _upsertDeleteRecordColumn;
}

public String getUpsertOutOfOrderRecordColumn() {
return _upsertOutOfOrderRecordColumn;
}

public boolean isUpsertDropOutOfOrderRecord() {
return _upsertDropOutOfOrderRecord;
}

public PartitionUpsertMetadataManager getPartitionUpsertMetadataManager() {
return _partitionUpsertMetadataManager;
}
Expand Down Expand Up @@ -233,6 +246,8 @@ public static class Builder {
private UpsertConfig.Mode _upsertMode;
private List<String> _upsertComparisonColumns;
private String _upsertDeleteRecordColumn;
private String _upsertOutOfOrderRecordColumn;
private boolean _upsertDropOutOfOrderRecord;
private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private PartitionDedupMetadataManager _partitionDedupMetadataManager;
private List<FieldConfig> _fieldConfigList;
Expand Down Expand Up @@ -373,6 +388,16 @@ public Builder setUpsertDeleteRecordColumn(String upsertDeleteRecordColumn) {
return this;
}

public Builder setUpsertOutOfOrderRecordColumn(String upsertOutOfOrderRecordColumn) {
_upsertOutOfOrderRecordColumn = upsertOutOfOrderRecordColumn;
return this;
}

public Builder setUpsertDropOutOfOrderRecord(boolean upsertDropOutOfOrderRecord) {
_upsertDropOutOfOrderRecord = upsertDropOutOfOrderRecord;
return this;
}

public Builder setPartitionUpsertMetadataManager(PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
return this;
Expand Down Expand Up @@ -403,6 +428,7 @@ public RealtimeSegmentConfig build() {
_capacity, _avgNumMultiValues, Collections.unmodifiableMap(indexConfigByCol), _segmentZKMetadata, _offHeap,
_memoryManager, _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics,
_nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumns, _upsertDeleteRecordColumn,
_upsertOutOfOrderRecordColumn, _upsertDropOutOfOrderRecord,
_partitionUpsertMetadataManager, _partitionDedupMetadataManager, _fieldConfigList,
_ingestionAggregationConfigs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected final PartialUpsertHandler _partialUpsertHandler;
protected final boolean _enableSnapshot;
protected final double _metadataTTL;
protected final boolean _dropOutOfOrderRecord;
protected final File _tableIndexDir;
protected final ServerMetrics _serverMetrics;
protected final Logger _logger;
Expand Down Expand Up @@ -99,7 +98,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
boolean dropOutOfOrderRecord, double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
_primaryKeyColumns = primaryKeyColumns;
Expand All @@ -109,7 +108,6 @@ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int parti
_partialUpsertHandler = partialUpsertHandler;
_enableSnapshot = enableSnapshot;
_metadataTTL = metadataTTL;
_dropOutOfOrderRecord = dropOutOfOrderRecord;
_tableIndexDir = tableIndexDir;
_snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
_serverMetrics = serverMetrics;
Expand Down Expand Up @@ -362,6 +360,10 @@ public boolean addRecord(MutableSegment segment, RecordInfo recordInfo) {
}
}

/**
Returns {@code true} when the record is added to the upsert metadata manager,
{@code false} when the record is out-of-order thus not added.
*/
protected abstract boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
protected ServerMetrics _serverMetrics;
protected HelixManager _helixManager;
protected ExecutorService _segmentPreloadExecutor;
protected boolean _dropOutOfOrderRecord;

private volatile boolean _isPreloading = false;

Expand Down Expand Up @@ -112,7 +111,6 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD

_enableSnapshot = upsertConfig.isEnableSnapshot();
_metadataTTL = upsertConfig.getMetadataTTL();
_dropOutOfOrderRecord = upsertConfig.isDropOutOfOrderRecord();
_tableIndexDir = tableDataManager.getTableDataDir();
_serverMetrics = serverMetrics;
_helixManager = helixManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
boolean dropOutOfOrderRecord, double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction,
partialUpsertHandler, enableSnapshot, dropOutOfOrderRecord, metadataTTL, tableIndexDir, serverMetrics);
double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn,
hashFunction, partialUpsertHandler, enableSnapshot, metadataTTL, tableIndexDir, serverMetrics);
}

@Override
Expand Down Expand Up @@ -240,9 +240,13 @@ public void doRemoveExpiredPrimaryKeys() {
persistWatermark(_largestSeenComparisonValue);
}

/**
Returns {@code true} when the record is added to the upsert metadata manager,
{@code false} when the record is out-of-order thus not added.
*/
@Override
protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
AtomicBoolean shouldDropRecord = new AtomicBoolean(false);
AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false);
ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds();
int newDocId = recordInfo.getDocId();
Expand Down Expand Up @@ -273,9 +277,8 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
return new RecordLocation(segment, newDocId, newComparisonValue);
} else {
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
// this is a out-of-order record, if upsert config _dropOutOfOrderRecord is true, then set
// shouldDropRecord to true. This method returns inverse of this value
shouldDropRecord.set(_dropOutOfOrderRecord);
// this is a out-of-order record then set value to true - this indicates whether out-of-order or not
isOutOfOrderRecord.set(true);
return currentRecordLocation;
}
} else {
Expand All @@ -288,7 +291,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
// Update metrics
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
_primaryKeyToRecordLocationMap.size());
return !shouldDropRecord.get();
return !isOutOfOrderRecord.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(i
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler,
_enableSnapshot, _dropOutOfOrderRecord, _metadataTTL, _tableIndexDir, _serverMetrics));
_enableSnapshot, _metadataTTL, _tableIndexDir, _serverMetrics));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,18 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema)
fieldSpec != null && fieldSpec.isSingleValueField() && fieldSpec.getDataType() == DataType.BOOLEAN,
"The delete record column must be a single-valued BOOLEAN column");
}

String outOfOrderRecordColumn = upsertConfig.getOutOfOrderRecordColumn();
Preconditions.checkState(
outOfOrderRecordColumn == null || !upsertConfig.isDropOutOfOrderRecord(),
"outOfOrderRecordColumn and dropOutOfOrderRecord shouldn't exist together for upsert table");

if (outOfOrderRecordColumn != null) {
FieldSpec fieldSpec = schema.getFieldSpecFor(outOfOrderRecordColumn);
Preconditions.checkState(
fieldSpec != null && fieldSpec.isSingleValueField() && fieldSpec.getDataType() == DataType.BOOLEAN,
"The outOfOrderRecordColumn must be a single-valued BOOLEAN column");
}
}

Preconditions.checkState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<Str

UpsertConfig.Mode upsertMode = upsertConfig == null ? UpsertConfig.Mode.NONE : upsertConfig.getMode();
List<String> comparisonColumns = upsertConfig == null ? null : upsertConfig.getComparisonColumns();
boolean isUpsertDropOutOfOrderRecord = upsertConfig == null ? false : upsertConfig.isDropOutOfOrderRecord();
String upsertOutOfOrderRecordColumn = upsertConfig == null ? null : upsertConfig.getOutOfOrderRecordColumn();
DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false, true);
RealtimeSegmentConfig.Builder segmentConfBuilder = new RealtimeSegmentConfig.Builder()
.setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
Expand All @@ -114,7 +116,9 @@ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<Str
.setUpsertComparisonColumns(comparisonColumns)
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setIngestionAggregationConfigs(aggregationConfigs);
.setIngestionAggregationConfigs(aggregationConfigs)
.setUpsertDropOutOfOrderRecord(isUpsertDropOutOfOrderRecord)
.setUpsertOutOfOrderRecordColumn(upsertOutOfOrderRecordColumn);
for (Map.Entry<String, JsonIndexConfig> entry : jsonIndexConfigs.entrySet()) {
segmentConfBuilder.setIndex(entry.getKey(), StandardIndexes.json(), entry.getValue());
}
Expand Down
Loading

0 comments on commit c1fdb66

Please sign in to comment.