diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 087f8c2cd2ed..3b66bc97fc6e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -1438,6 +1438,8 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf .setPartitionDedupMetadataManager(partitionDedupMetadataManager) .setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns()) .setUpsertDeleteRecordColumn(tableConfig.getUpsertDeleteRecordColumn()) + .setUpsertOutOfOrderRecordColumn(tableConfig.getOutOfOrderRecordColumn()) + .setUpsertDropOutOfOrderRecord(tableConfig.isDropOutOfOrderRecord()) .setFieldConfigList(tableConfig.getFieldConfigList()); // Create message decoder diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java index 9db6b126325b..807ef5f9a0d0 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java @@ -131,8 +131,8 @@ public void loadSegment() _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap); ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert( new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"), - Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, false, false, 0, INDEX_DIR, - serverMetrics), new ThreadSafeMutableRoaringBitmap(), null); + Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, + false, 0, INDEX_DIR, serverMetrics), new ThreadSafeMutableRoaringBitmap(), null); } @AfterClass diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index 7adeae3d7b11..c47d420c829a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -165,6 +165,8 @@ public class MutableSegmentImpl implements MutableSegment { private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager; private final List _upsertComparisonColumns; private final String _deleteRecordColumn; + private final String _upsertOutOfOrderRecordColumn; + private final boolean _upsertDropOutOfOrderRecord; // The valid doc ids are maintained locally instead of in the upsert metadata manager because: // 1. There is only one consuming segment per partition, the committed segments do not need to modify the valid doc // ids for the consuming segment. @@ -381,6 +383,8 @@ public boolean isMutableSegment() { _upsertComparisonColumns = upsertComparisonColumns != null ? upsertComparisonColumns : Collections.singletonList(_timeColumnName); _deleteRecordColumn = config.getUpsertDeleteRecordColumn(); + _upsertOutOfOrderRecordColumn = config.getUpsertOutOfOrderRecordColumn(); + _upsertDropOutOfOrderRecord = config.isUpsertDropOutOfOrderRecord(); _validDocIds = new ThreadSafeMutableRoaringBitmap(); if (_deleteRecordColumn != null) { _queryableDocIds = new ThreadSafeMutableRoaringBitmap(); @@ -392,6 +396,8 @@ public boolean isMutableSegment() { _deleteRecordColumn = null; _validDocIds = null; _queryableDocIds = null; + _upsertOutOfOrderRecordColumn = null; + _upsertDropOutOfOrderRecord = false; } } @@ -496,7 +502,11 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) // segment indexing or addNewRow call errors out in those scenario, there can be metadata inconsistency where // a key is pointing to some other key's docID // TODO fix this metadata mismatch scenario - if (_partitionUpsertMetadataManager.addRecord(this, recordInfo)) { + boolean isOutOfOrderRecord = !_partitionUpsertMetadataManager.addRecord(this, recordInfo); + if (_upsertOutOfOrderRecordColumn != null) { + updatedRow.putValue(_upsertOutOfOrderRecordColumn, BooleanUtils.toInt(isOutOfOrderRecord)); + } + if (!isOutOfOrderRecord || !_upsertDropOutOfOrderRecord) { updateDictionary(updatedRow); addNewRow(numDocsIndexed, updatedRow); // Update number of documents indexed before handling the upsert metadata so that the record becomes queryable diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java index 2ad4a177d4fa..a95a68893c2e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java @@ -63,6 +63,8 @@ public class RealtimeSegmentConfig { private final UpsertConfig.Mode _upsertMode; private final List _upsertComparisonColumns; private final String _upsertDeleteRecordColumn; + private final String _upsertOutOfOrderRecordColumn; + private final boolean _upsertDropOutOfOrderRecord; private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager; private final PartitionDedupMetadataManager _partitionDedupMetadataManager; private final String _consumerDir; @@ -76,7 +78,8 @@ private RealtimeSegmentConfig(String tableNameWithType, String segmentName, Stri PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn, PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, String consumerDir, UpsertConfig.Mode upsertMode, List upsertComparisonColumns, - String upsertDeleteRecordColumn, PartitionUpsertMetadataManager partitionUpsertMetadataManager, + String upsertDeleteRecordColumn, String upsertOutOfOrderRecordColumn, boolean upsertDropOutOfOrderRecord, + PartitionUpsertMetadataManager partitionUpsertMetadataManager, PartitionDedupMetadataManager partitionDedupMetadataManager, List fieldConfigList, List ingestionAggregationConfigs) { _tableNameWithType = tableNameWithType; @@ -100,6 +103,8 @@ private RealtimeSegmentConfig(String tableNameWithType, String segmentName, Stri _upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE; _upsertComparisonColumns = upsertComparisonColumns; _upsertDeleteRecordColumn = upsertDeleteRecordColumn; + _upsertOutOfOrderRecordColumn = upsertOutOfOrderRecordColumn; + _upsertDropOutOfOrderRecord = upsertDropOutOfOrderRecord; _partitionUpsertMetadataManager = partitionUpsertMetadataManager; _partitionDedupMetadataManager = partitionDedupMetadataManager; _fieldConfigList = fieldConfigList; @@ -195,6 +200,14 @@ public String getUpsertDeleteRecordColumn() { return _upsertDeleteRecordColumn; } + public String getUpsertOutOfOrderRecordColumn() { + return _upsertOutOfOrderRecordColumn; + } + + public boolean isUpsertDropOutOfOrderRecord() { + return _upsertDropOutOfOrderRecord; + } + public PartitionUpsertMetadataManager getPartitionUpsertMetadataManager() { return _partitionUpsertMetadataManager; } @@ -233,6 +246,8 @@ public static class Builder { private UpsertConfig.Mode _upsertMode; private List _upsertComparisonColumns; private String _upsertDeleteRecordColumn; + private String _upsertOutOfOrderRecordColumn; + private boolean _upsertDropOutOfOrderRecord; private PartitionUpsertMetadataManager _partitionUpsertMetadataManager; private PartitionDedupMetadataManager _partitionDedupMetadataManager; private List _fieldConfigList; @@ -373,6 +388,16 @@ public Builder setUpsertDeleteRecordColumn(String upsertDeleteRecordColumn) { return this; } + public Builder setUpsertOutOfOrderRecordColumn(String upsertOutOfOrderRecordColumn) { + _upsertOutOfOrderRecordColumn = upsertOutOfOrderRecordColumn; + return this; + } + + public Builder setUpsertDropOutOfOrderRecord(boolean upsertDropOutOfOrderRecord) { + _upsertDropOutOfOrderRecord = upsertDropOutOfOrderRecord; + return this; + } + public Builder setPartitionUpsertMetadataManager(PartitionUpsertMetadataManager partitionUpsertMetadataManager) { _partitionUpsertMetadataManager = partitionUpsertMetadataManager; return this; @@ -403,6 +428,7 @@ public RealtimeSegmentConfig build() { _capacity, _avgNumMultiValues, Collections.unmodifiableMap(indexConfigByCol), _segmentZKMetadata, _offHeap, _memoryManager, _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics, _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumns, _upsertDeleteRecordColumn, + _upsertOutOfOrderRecordColumn, _upsertDropOutOfOrderRecord, _partitionUpsertMetadataManager, _partitionDedupMetadataManager, _fieldConfigList, _ingestionAggregationConfigs); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 7d3de0346e2f..0e54d399bfe7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -71,7 +71,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps protected final PartialUpsertHandler _partialUpsertHandler; protected final boolean _enableSnapshot; protected final double _metadataTTL; - protected final boolean _dropOutOfOrderRecord; protected final File _tableIndexDir; protected final ServerMetrics _serverMetrics; protected final Logger _logger; @@ -99,7 +98,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, List primaryKeyColumns, List comparisonColumns, @Nullable String deleteRecordColumn, HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, - boolean dropOutOfOrderRecord, double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) { + double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) { _tableNameWithType = tableNameWithType; _partitionId = partitionId; _primaryKeyColumns = primaryKeyColumns; @@ -109,7 +108,6 @@ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int parti _partialUpsertHandler = partialUpsertHandler; _enableSnapshot = enableSnapshot; _metadataTTL = metadataTTL; - _dropOutOfOrderRecord = dropOutOfOrderRecord; _tableIndexDir = tableIndexDir; _snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null; _serverMetrics = serverMetrics; @@ -362,6 +360,10 @@ public boolean addRecord(MutableSegment segment, RecordInfo recordInfo) { } } + /** + Returns {@code true} when the record is added to the upsert metadata manager, + {@code false} when the record is out-of-order thus not added. + */ protected abstract boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo); @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index 7b6c66c0aec8..bc783480b9db 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -73,7 +73,6 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad protected ServerMetrics _serverMetrics; protected HelixManager _helixManager; protected ExecutorService _segmentPreloadExecutor; - protected boolean _dropOutOfOrderRecord; private volatile boolean _isPreloading = false; @@ -112,7 +111,6 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD _enableSnapshot = upsertConfig.isEnableSnapshot(); _metadataTTL = upsertConfig.getMetadataTTL(); - _dropOutOfOrderRecord = upsertConfig.isDropOutOfOrderRecord(); _tableIndexDir = tableDataManager.getTableDataDir(); _serverMetrics = serverMetrics; _helixManager = helixManager; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index 47f2fa2a96ae..435bab20d119 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -61,9 +61,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId, List primaryKeyColumns, List comparisonColumns, @Nullable String deleteRecordColumn, HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, - boolean dropOutOfOrderRecord, double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) { - super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction, - partialUpsertHandler, enableSnapshot, dropOutOfOrderRecord, metadataTTL, tableIndexDir, serverMetrics); + double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) { + super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn, + hashFunction, partialUpsertHandler, enableSnapshot, metadataTTL, tableIndexDir, serverMetrics); } @Override @@ -240,9 +240,13 @@ public void doRemoveExpiredPrimaryKeys() { persistWatermark(_largestSeenComparisonValue); } + /** + Returns {@code true} when the record is added to the upsert metadata manager, + {@code false} when the record is out-of-order thus not added. + */ @Override protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { - AtomicBoolean shouldDropRecord = new AtomicBoolean(false); + AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false); ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds()); ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds(); int newDocId = recordInfo.getDocId(); @@ -273,9 +277,8 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { return new RecordLocation(segment, newDocId, newComparisonValue); } else { handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue()); - // this is a out-of-order record, if upsert config _dropOutOfOrderRecord is true, then set - // shouldDropRecord to true. This method returns inverse of this value - shouldDropRecord.set(_dropOutOfOrderRecord); + // this is a out-of-order record then set value to true - this indicates whether out-of-order or not + isOutOfOrderRecord.set(true); return currentRecordLocation; } } else { @@ -288,7 +291,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { // Update metrics _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, _primaryKeyToRecordLocationMap.size()); - return !shouldDropRecord.get(); + return !isOutOfOrderRecord.get(); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java index 55db795883b9..33802036560b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java @@ -37,7 +37,7 @@ public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(i return _partitionMetadataManagerMap.computeIfAbsent(partitionId, k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler, - _enableSnapshot, _dropOutOfOrderRecord, _metadataTTL, _tableIndexDir, _serverMetrics)); + _enableSnapshot, _metadataTTL, _tableIndexDir, _serverMetrics)); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index f6d9ea957b67..7605925432ac 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -723,6 +723,18 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) fieldSpec != null && fieldSpec.isSingleValueField() && fieldSpec.getDataType() == DataType.BOOLEAN, "The delete record column must be a single-valued BOOLEAN column"); } + + String outOfOrderRecordColumn = upsertConfig.getOutOfOrderRecordColumn(); + Preconditions.checkState( + outOfOrderRecordColumn == null || !upsertConfig.isDropOutOfOrderRecord(), + "outOfOrderRecordColumn and dropOutOfOrderRecord shouldn't exist together for upsert table"); + + if (outOfOrderRecordColumn != null) { + FieldSpec fieldSpec = schema.getFieldSpecFor(outOfOrderRecordColumn); + Preconditions.checkState( + fieldSpec != null && fieldSpec.isSingleValueField() && fieldSpec.getDataType() == DataType.BOOLEAN, + "The outOfOrderRecordColumn must be a single-valued BOOLEAN column"); + } } Preconditions.checkState( diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java index 10270e5b6fb8..ade22fcad6f2 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java @@ -100,6 +100,8 @@ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set comparisonColumns = upsertConfig == null ? null : upsertConfig.getComparisonColumns(); + boolean isUpsertDropOutOfOrderRecord = upsertConfig == null ? false : upsertConfig.isDropOutOfOrderRecord(); + String upsertOutOfOrderRecordColumn = upsertConfig == null ? null : upsertConfig.getOutOfOrderRecordColumn(); DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false, true); RealtimeSegmentConfig.Builder segmentConfBuilder = new RealtimeSegmentConfig.Builder() .setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME) @@ -114,7 +116,9 @@ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set entry : jsonIndexConfigs.entrySet()) { segmentConfBuilder.setIndex(entry.getKey(), StandardIndexes.json(), entry.getValue()); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java index aae9b9cec699..fe17e40dc2e2 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java @@ -29,6 +29,7 @@ import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager; import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory; +import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.UpsertConfig; @@ -37,10 +38,10 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFactory; +import org.apache.pinot.spi.utils.BooleanUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.testng.Assert; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.mockito.Mockito.mock; @@ -55,16 +56,20 @@ public class MutableSegmentImplUpsertComparisonColTest { private static MutableSegmentImpl _mutableSegmentImpl; private static PartitionUpsertMetadataManager _partitionUpsertMetadataManager; - @BeforeClass - public void setup() + private UpsertConfig createFullUpsertConfig(HashFunction hashFunction) { + UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.FULL); + upsertConfigWithHash.setHashFunction(hashFunction); + upsertConfigWithHash.setComparisonColumn("offset"); + return upsertConfigWithHash; + } + + public void setup(UpsertConfig upsertConfig) throws Exception { URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH); URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH); _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); - UpsertConfig offsetUpsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); - offsetUpsertConfig.setComparisonColumn("offset"); _tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(offsetUpsertConfig) + new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfig) .build(); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); File jsonFile = new File(dataResourceUrl.getFile()); @@ -74,7 +79,7 @@ public void setup() _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0); _mutableSegmentImpl = MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), - Collections.emptySet(), false, true, offsetUpsertConfig, "secondsSinceEpoch", + Collections.emptySet(), false, true, upsertConfig, "secondsSinceEpoch", _partitionUpsertMetadataManager, null); GenericRow reuse = new GenericRow(); try (RecordReader recordReader = RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile, @@ -89,12 +94,69 @@ public void setup() } @Test - public void testUpsertIngestion() { + public void testHashFunctions() + throws Exception { + testUpsertIngestion(createFullUpsertConfig(HashFunction.NONE)); + testUpsertIngestion(createFullUpsertConfig(HashFunction.MD5)); + testUpsertIngestion(createFullUpsertConfig(HashFunction.MURMUR3)); + } + + @Test + public void testUpsertDropOutOfOrderRecord() + throws Exception { + testUpsertDropOfOrderRecordIngestion(createFullUpsertConfig(HashFunction.NONE)); + testUpsertDropOfOrderRecordIngestion(createFullUpsertConfig(HashFunction.MD5)); + testUpsertDropOfOrderRecordIngestion(createFullUpsertConfig(HashFunction.MURMUR3)); + } + + @Test + public void testUpsertOutOfOrderRecordColumn() + throws Exception { + testUpsertOutOfOrderRecordColumnIngestion(createFullUpsertConfig(HashFunction.NONE)); + testUpsertOutOfOrderRecordColumnIngestion(createFullUpsertConfig(HashFunction.MD5)); + testUpsertOutOfOrderRecordColumnIngestion(createFullUpsertConfig(HashFunction.MURMUR3)); + } + + public void testUpsertIngestion(UpsertConfig upsertConfig) + throws Exception { + setup(upsertConfig); + ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap(); + // note offset column is used for determining sequence but not time column + Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 4); + Assert.assertFalse(bitmap.contains(0)); + Assert.assertTrue(bitmap.contains(1)); + Assert.assertTrue(bitmap.contains(2)); + Assert.assertFalse(bitmap.contains(3)); + } + + public void testUpsertDropOfOrderRecordIngestion(UpsertConfig upsertConfig) + throws Exception { + upsertConfig.setDropOutOfOrderRecord(true); + setup(upsertConfig); + ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap(); + // note offset column is used for determining sequence but not time column + Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 3); + Assert.assertFalse(bitmap.contains(0)); + Assert.assertTrue(bitmap.contains(1)); + Assert.assertTrue(bitmap.contains(2)); + } + + public void testUpsertOutOfOrderRecordColumnIngestion(UpsertConfig upsertConfig) + throws Exception { + String outOfOrderRecordColumn = "outOfOrderRecordColumn"; + upsertConfig.setOutOfOrderRecordColumn(outOfOrderRecordColumn); + setup(upsertConfig); ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap(); // note offset column is used for determining sequence but not time column + Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 4); Assert.assertFalse(bitmap.contains(0)); Assert.assertTrue(bitmap.contains(1)); Assert.assertTrue(bitmap.contains(2)); Assert.assertFalse(bitmap.contains(3)); + + Assert.assertFalse(BooleanUtils.toBoolean(_mutableSegmentImpl.getValue(0, outOfOrderRecordColumn))); + Assert.assertFalse(BooleanUtils.toBoolean(_mutableSegmentImpl.getValue(1, outOfOrderRecordColumn))); + Assert.assertFalse(BooleanUtils.toBoolean(_mutableSegmentImpl.getValue(2, outOfOrderRecordColumn))); + Assert.assertTrue(BooleanUtils.toBoolean(_mutableSegmentImpl.getValue(3, outOfOrderRecordColumn))); } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index feaa8da3043d..b212a8b918d0 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -91,7 +91,7 @@ public void tearDown() public void testStartFinishOperation() { ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, false, 0, INDEX_DIR, + Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 0, INDEX_DIR, mock(ServerMetrics.class)); // Start 2 operations @@ -205,7 +205,7 @@ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean en String comparisonColumn = "timeCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), null, hashFunction, null, false, false, 0, INDEX_DIR, + Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR, mock(ServerMetrics.class)); Map recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; Set trackedSegments = upsertMetadataManager._trackedSegments; @@ -368,7 +368,7 @@ private void verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction hashFunc String deleteRecordColumn = "deleteCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, false, 0, + Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, 0, INDEX_DIR, mock(ServerMetrics.class)); Map recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; Set trackedSegments = upsertMetadataManager._trackedSegments; @@ -660,7 +660,7 @@ private void verifyAddRecord(HashFunction hashFunction) String comparisonColumn = "timeCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), null, hashFunction, null, false, false, 0, INDEX_DIR, + Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR, mock(ServerMetrics.class)); Map recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; @@ -751,10 +751,9 @@ public void testAddOutOfOrderRecord() private void verifyAddOutOfOrderRecord(HashFunction hashFunction) throws IOException { String comparisonColumn = "timeCol"; - // here dropOutOfOrderRecord = true ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), null, hashFunction, null, false, true, 0, INDEX_DIR, + Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR, mock(ServerMetrics.class)); Map recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; @@ -773,10 +772,10 @@ private void verifyAddOutOfOrderRecord(HashFunction hashFunction) ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap(); MutableSegment segment2 = mockMutableSegment(1, validDocIds2, null); - // new record, should return true to add it - boolean shouldAddRecord = - upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100), false)); - assertTrue(shouldAddRecord); + // new record, should return false for out of order event + boolean isOutOfOrderRecord = + !upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100), false)); + assertFalse(isOutOfOrderRecord); // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100} // segment2: 3 -> {0, 100} @@ -787,15 +786,15 @@ private void verifyAddOutOfOrderRecord(HashFunction hashFunction) assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0}); - // send an out-of-order event, should return false to drop event - shouldAddRecord = - upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(80), false)); - assertFalse(shouldAddRecord); + // send an out-of-order event, should return true for orderness of event + isOutOfOrderRecord = + !upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(80), false)); + assertTrue(isOutOfOrderRecord); // ordered event for an existing key - shouldAddRecord = - upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(150), false)); - assertTrue(shouldAddRecord); + isOutOfOrderRecord = + !upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(150), false)); + assertFalse(isOutOfOrderRecord); // segment1: 0 -> {0, 100}, 1 -> {1, 120} // segment2: 3 -> {0, 100}, 2 -> {1, 150} @@ -822,7 +821,7 @@ private void verifyPreloadSegment(HashFunction hashFunction) { String comparisonColumn = "timeCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), null, hashFunction, null, false, false, 0, INDEX_DIR, + Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR, mock(ServerMetrics.class)); Map recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; @@ -878,8 +877,8 @@ private void verifyAddRecordWithDeleteColumn(HashFunction hashFunction) String deleteColumn = "deleteCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, false, false, 0, INDEX_DIR, - mock(ServerMetrics.class)); + Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, + false, 0, INDEX_DIR, mock(ServerMetrics.class)); Map recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // queryableDocIds is same as validDocIds in the absence of delete markers @@ -980,7 +979,7 @@ private void verifyRemoveExpiredPrimaryKeys(Comparable earlierComparisonValue, C ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, false, 30, tableDir, + Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, tableDir, mock(ServerMetrics.class)); Map recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; @@ -1048,7 +1047,7 @@ private void verifyAddOutOfTTLSegment() ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 30, tableDir, + Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir, mock(ServerMetrics.class)); Map recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; @@ -1121,7 +1120,7 @@ private void verifyAddOutOfTTLSegmentWithRecordDelete() String deleteRecordColumn = "deleteCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, false, 30, + Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30, INDEX_DIR, mock(ServerMetrics.class)); Map recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; Set trackedSegments = upsertMetadataManager._trackedSegments; @@ -1206,7 +1205,7 @@ public void verifyGetQueryableDocIds(boolean isDeleteColumnNull, boolean[] delet String deleteRecordColumn = "deleteCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, false, 30, + Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30, INDEX_DIR, mock(ServerMetrics.class)); try (MockedConstruction deleteColReader = mockConstruction(PinotSegmentColumnReader.class, @@ -1238,7 +1237,7 @@ private void verifyAddSegmentForTTL(Comparable comparisonValue) ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 30, tableDir, + Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir, mock(ServerMetrics.class)); Map recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; @@ -1299,7 +1298,7 @@ private void verifyPersistAndLoadWatermark() throws IOException { ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 10, INDEX_DIR, + Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 10, INDEX_DIR, mock(ServerMetrics.class)); double currentTimeMs = System.currentTimeMillis(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index 1ad4cc74d602..4f691188f6e4 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -1694,6 +1694,49 @@ public void testValidateUpsertConfig() { } catch (IllegalStateException e) { Assert.fail("Shouldn't fail table creation when delete column type is boolean."); } + + // upsert out-of-order configs + String outOfOrderRecordColumn = "outOfOrderRecordColumn"; + boolean dropOutOfOrderRecord = true; + schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol")) + .addSingleValueDimension("myCol", FieldSpec.DataType.STRING) + .addSingleValueDimension(outOfOrderRecordColumn, FieldSpec.DataType.BOOLEAN).build(); + streamConfigs = getStreamConfigs(); + streamConfigs.put("stream.kafka.consumer.type", "simple"); + + upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); + upsertConfig.setDropOutOfOrderRecord(dropOutOfOrderRecord); + upsertConfig.setOutOfOrderRecordColumn(outOfOrderRecordColumn); + tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs) + .setUpsertConfig(upsertConfig) + .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) + .build(); + try { + TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema); + } catch (IllegalStateException e) { + Assert.assertEquals(e.getMessage(), + "outOfOrderRecordColumn and dropOutOfOrderRecord shouldn't exist together for upsert table"); + } + + // outOfOrderRecordColumn not of type BOOLEAN + schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol")) + .addSingleValueDimension("myCol", FieldSpec.DataType.STRING) + .addSingleValueDimension(outOfOrderRecordColumn, FieldSpec.DataType.STRING).build(); + streamConfigs = getStreamConfigs(); + streamConfigs.put("stream.kafka.consumer.type", "simple"); + + upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); + upsertConfig.setOutOfOrderRecordColumn(outOfOrderRecordColumn); + tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs) + .setUpsertConfig(upsertConfig) + .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) + .build(); + try { + TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema); + } catch (IllegalStateException e) { + Assert.assertEquals(e.getMessage(), + "The outOfOrderRecordColumn must be a single-valued BOOLEAN column"); + } } @Test diff --git a/pinot-segment-local/src/test/resources/data/test_upsert_comparison_col_schema.json b/pinot-segment-local/src/test/resources/data/test_upsert_comparison_col_schema.json index 4448f694d1b4..692ea60dafa6 100644 --- a/pinot-segment-local/src/test/resources/data/test_upsert_comparison_col_schema.json +++ b/pinot-segment-local/src/test/resources/data/test_upsert_comparison_col_schema.json @@ -12,6 +12,10 @@ { "name": "offset", "dataType": "LONG" + }, + { + "name": "outOfOrderRecordColumn", + "dataType": "BOOLEAN" } ], "timeFieldSpec": { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java index c71b5481d590..19f6397ddb15 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java @@ -392,6 +392,17 @@ public String getUpsertDeleteRecordColumn() { return _upsertConfig == null ? null : _upsertConfig.getDeleteRecordColumn(); } + @JsonIgnore + @Nullable + public String getOutOfOrderRecordColumn() { + return _upsertConfig == null ? null : _upsertConfig.getOutOfOrderRecordColumn(); + } + + @JsonIgnore + public boolean isDropOutOfOrderRecord() { + return _upsertConfig != null && _upsertConfig.isDropOutOfOrderRecord(); + } + @JsonProperty(TUNER_CONFIG_LIST_KEY) public List getTunerConfigsList() { return _tunerConfigList; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index 2b389b8133c1..3f9b67c59ccf 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -57,6 +57,9 @@ public enum Strategy { @JsonPropertyDescription("Boolean column to indicate whether a records should be deleted") private String _deleteRecordColumn; + @JsonPropertyDescription("Boolean column to indicate whether a records is out-of-order") + private String _outOfOrderRecordColumn; + @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery") private boolean _enableSnapshot; @@ -113,6 +116,11 @@ public String getDeleteRecordColumn() { return _deleteRecordColumn; } + @Nullable + public String getOutOfOrderRecordColumn() { + return _outOfOrderRecordColumn; + } + public boolean isEnableSnapshot() { return _enableSnapshot; } @@ -184,9 +192,11 @@ public void setComparisonColumn(String comparisonColumn) { } public void setDeleteRecordColumn(String deleteRecordColumn) { - if (deleteRecordColumn != null) { - _deleteRecordColumn = deleteRecordColumn; - } + _deleteRecordColumn = deleteRecordColumn; + } + + public void setOutOfOrderRecordColumn(String outOfOrderRecordColumn) { + _outOfOrderRecordColumn = outOfOrderRecordColumn; } public void setEnableSnapshot(boolean enableSnapshot) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BooleanUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BooleanUtils.java index aeee5286c0b1..07566bd14b60 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BooleanUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BooleanUtils.java @@ -64,6 +64,17 @@ public static int toInt(String booleanString) { return toBoolean(booleanString) ? INTERNAL_TRUE : INTERNAL_FALSE; } + /** + * Returns the int value (1 for true, 0 for false) for the given boolean value. + *
    + *
  • 'true' -> '1'
  • + *
  • 'false' -> '0'
  • + *
+ */ + public static int toInt(boolean booleanValue) { + return booleanValue ? INTERNAL_TRUE : INTERNAL_FALSE; + } + /** * Returns the boolean value for the given non-null Integer object (internal value for BOOLEAN). */