diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java index 6135982e185c..423eb527faee 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java @@ -18,8 +18,6 @@ */ package org.apache.pinot.broker.routing.segmentpruner; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -37,7 +35,6 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.DateTimeFieldSpec; -import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +65,7 @@ public static List getSegmentPruners(TableConfig tableConfig, List configuredSegmentPruners = new ArrayList<>(segmentPrunerTypes.size()); for (String segmentPrunerType : segmentPrunerTypes) { if (RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType)) { - SegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore); + SegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig); if (partitionSegmentPruner != null) { configuredSegmentPruners.add(partitionSegmentPruner); } @@ -91,7 +88,7 @@ public static List getSegmentPruners(TableConfig tableConfig, if ((tableType == TableType.OFFLINE && LEGACY_PARTITION_AWARE_OFFLINE_ROUTING.equalsIgnoreCase( routingTableBuilderName)) || (tableType == TableType.REALTIME && LEGACY_PARTITION_AWARE_REALTIME_ROUTING.equalsIgnoreCase(routingTableBuilderName))) { - SegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore); + SegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig); if (partitionSegmentPruner != null) { segmentPruners.add(partitionSegmentPruner); } @@ -102,8 +99,7 @@ public static List getSegmentPruners(TableConfig tableConfig, } @Nullable - private static SegmentPruner getPartitionSegmentPruner(TableConfig tableConfig, - ZkHelixPropertyStore propertyStore) { + private static SegmentPruner getPartitionSegmentPruner(TableConfig tableConfig) { String tableNameWithType = tableConfig.getTableName(); SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig(); if (segmentPartitionConfig == null) { @@ -137,26 +133,20 @@ private static TimeSegmentPruner getTimeSegmentPruner(TableConfig tableConfig, LOGGER.warn("Cannot enable time range pruning without time column for table: {}", tableNameWithType); return null; } - return createTimeSegmentPruner(tableConfig, propertyStore); - } - - @VisibleForTesting - static TimeSegmentPruner createTimeSegmentPruner(TableConfig tableConfig, - ZkHelixPropertyStore propertyStore) { - String tableNameWithType = tableConfig.getTableName(); - String timeColumn = tableConfig.getValidationConfig().getTimeColumnName(); - Preconditions.checkNotNull(timeColumn, "Time column must be configured in table config for table: %s", - tableNameWithType); - Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, tableNameWithType); - Preconditions.checkNotNull(schema, "Failed to find schema for table: %s", tableNameWithType); - DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn); - Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in schema for time column: %s of table: %s", - timeColumn, tableNameWithType); - DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec(); - - LOGGER.info("Using TimeRangePruner on time column: {} for table: {} with DateTimeFormatSpec: {}", - timeColumn, tableNameWithType, timeFormatSpec); - return new TimeSegmentPruner(tableConfig, timeColumn, timeFormatSpec); + Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, tableConfig); + if (schema == null) { + LOGGER.warn("Cannot enable time range pruning without schema for table: {}", tableNameWithType); + return null; + } + DateTimeFieldSpec timeFieldSpec = schema.getSpecForTimeColumn(timeColumn); + if (timeFieldSpec == null) { + LOGGER.warn("Cannot enable time range pruning without field spec for table: {}, time column: {}", + tableNameWithType, timeColumn); + return null; + } + LOGGER.info("Using TimeRangePruner on time column: {} for table: {} with DateTimeFieldSpec: {}", timeColumn, + tableNameWithType, timeFieldSpec); + return new TimeSegmentPruner(tableConfig, timeFieldSpec); } private static List sortSegmentPruners(List pruners) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java index a7ac4fce4bdf..c2e6b20cce54 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.broker.routing.segmentpruner; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -37,7 +38,9 @@ import org.apache.pinot.common.request.Expression; import org.apache.pinot.common.request.Function; import org.apache.pinot.common.request.Identifier; +import org.apache.pinot.common.request.Literal; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Query.Range; @@ -64,10 +67,10 @@ public class TimeSegmentPruner implements SegmentPruner { private volatile IntervalTree _intervalTree; private final Map _intervalMap = new HashMap<>(); - public TimeSegmentPruner(TableConfig tableConfig, String timeColumn, DateTimeFormatSpec timeFormatSpec) { + public TimeSegmentPruner(TableConfig tableConfig, DateTimeFieldSpec timeFieldSpec) { _tableNameWithType = tableConfig.getTableName(); - _timeColumn = timeColumn; - _timeFormatSpec = timeFormatSpec; + _timeColumn = timeFieldSpec.getName(); + _timeFormatSpec = timeFieldSpec.getFormatSpec(); } @Override @@ -206,97 +209,53 @@ private List getFilterTimeIntervals(Expression filterExpression) { } else { return getComplementSortedIntervals(childIntervals); } - case EQUALS: { - Identifier identifier = operands.get(0).getIdentifier(); - if (identifier != null && identifier.getName().equals(_timeColumn)) { - long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString()); - return Collections.singletonList(new Interval(timeStamp, timeStamp)); - } else { - return null; + case EQUALS: + if (isTimeColumn(operands.get(0))) { + long timestamp = toMillisSinceEpoch(operands.get(1)); + return List.of(new Interval(timestamp, timestamp)); } - } - case IN: { - Identifier identifier = operands.get(0).getIdentifier(); - if (identifier != null && identifier.getName().equals(_timeColumn)) { + return null; + case IN: + if (isTimeColumn(operands.get(0))) { int numOperands = operands.size(); List intervals = new ArrayList<>(numOperands - 1); for (int i = 1; i < numOperands; i++) { - long timeStamp = - _timeFormatSpec.fromFormatToMillis(operands.get(i).getLiteral().getFieldValue().toString()); - intervals.add(new Interval(timeStamp, timeStamp)); + long timestamp = toMillisSinceEpoch(operands.get(i)); + intervals.add(new Interval(timestamp, timestamp)); } return intervals; - } else { - return null; } - } - case GREATER_THAN: { - Identifier identifier = operands.get(0).getIdentifier(); - if (identifier != null && identifier.getName().equals(_timeColumn)) { - long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString()); - return Collections.singletonList(new Interval(timeStamp + 1, MAX_END_TIME)); - } else { - return null; + return null; + case GREATER_THAN: + if (isTimeColumn(operands.get(0))) { + return getInterval(toMillisSinceEpoch(operands.get(1)) + 1, MAX_END_TIME); } - } - case GREATER_THAN_OR_EQUAL: { - Identifier identifier = operands.get(0).getIdentifier(); - if (identifier != null && identifier.getName().equals(_timeColumn)) { - long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString()); - return Collections.singletonList(new Interval(timeStamp, MAX_END_TIME)); - } else { - return null; + return null; + case GREATER_THAN_OR_EQUAL: + if (isTimeColumn(operands.get(0))) { + return getInterval(toMillisSinceEpoch(operands.get(1)), MAX_END_TIME); } - } - case LESS_THAN: { - Identifier identifier = operands.get(0).getIdentifier(); - if (identifier != null && identifier.getName().equals(_timeColumn)) { - long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString()); - if (timeStamp > MIN_START_TIME) { - return Collections.singletonList(new Interval(MIN_START_TIME, timeStamp - 1)); - } else { - return Collections.emptyList(); - } - } else { - return null; + return null; + case LESS_THAN: + if (isTimeColumn(operands.get(0))) { + return getInterval(MIN_START_TIME, toMillisSinceEpoch(operands.get(1)) - 1); } - } - case LESS_THAN_OR_EQUAL: { - Identifier identifier = operands.get(0).getIdentifier(); - if (identifier != null && identifier.getName().equals(_timeColumn)) { - long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString()); - if (timeStamp >= MIN_START_TIME) { - return Collections.singletonList(new Interval(MIN_START_TIME, timeStamp)); - } else { - return Collections.emptyList(); - } - } else { - return null; + return null; + case LESS_THAN_OR_EQUAL: + if (isTimeColumn(operands.get(0))) { + return getInterval(MIN_START_TIME, toMillisSinceEpoch(operands.get(1))); } - } - case BETWEEN: { - Identifier identifier = operands.get(0).getIdentifier(); - if (identifier != null && identifier.getName().equals(_timeColumn)) { - long startTimestamp = - _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString()); - long endTimestamp = - _timeFormatSpec.fromFormatToMillis(operands.get(2).getLiteral().getFieldValue().toString()); - if (endTimestamp >= startTimestamp) { - return Collections.singletonList(new Interval(startTimestamp, endTimestamp)); - } else { - return Collections.emptyList(); - } - } else { - return null; + return null; + case BETWEEN: + if (isTimeColumn(operands.get(0))) { + return getInterval(toMillisSinceEpoch(operands.get(1)), toMillisSinceEpoch(operands.get(2))); } - } - case RANGE: { - Identifier identifier = operands.get(0).getIdentifier(); - if (identifier != null && identifier.getName().equals(_timeColumn)) { + return null; + case RANGE: + if (isTimeColumn(operands.get(0))) { return parseInterval(operands.get(1).getLiteral().getFieldValue().toString()); } return null; - } default: return null; } @@ -408,6 +367,17 @@ private List getComplementSortedIntervals(List intervals) { return res; } + private boolean isTimeColumn(Expression expression) { + Identifier identifier = expression.getIdentifier(); + return identifier != null && identifier.getName().equals(_timeColumn); + } + + private long toMillisSinceEpoch(Expression expression) { + Literal literal = expression.getLiteral(); + Preconditions.checkArgument(literal != null, "Literal is required for time column filter, got: %s", expression); + return _timeFormatSpec.fromFormatToMillis(literal.getFieldValue().toString()); + } + /** * Parse interval to millisecond as [min, max] with both sides included. * E.g. '(* 16311]' is parsed as [0, 16311], '(1455 16311)' is parsed as [1456, 16310] @@ -432,10 +402,10 @@ private List parseInterval(String rangeString) { endTime--; } } + return getInterval(startTime, endTime); + } - if (startTime > endTime) { - return Collections.emptyList(); - } - return Collections.singletonList(new Interval(startTime, endTime)); + private static List getInterval(long inclusiveStart, long inclusiveEnd) { + return inclusiveStart <= inclusiveEnd ? List.of(new Interval(inclusiveStart, inclusiveEnd)) : List.of(); } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java index feaad35169ba..5e48a981ccc4 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java @@ -18,8 +18,7 @@ */ package org.apache.pinot.broker.routing.segmentpruner; -import java.util.Arrays; -import java.util.Collections; +import java.sql.Timestamp; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -50,11 +49,11 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; -import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.StreamConfigProperties; -import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; import org.mockito.Mockito; @@ -78,29 +77,45 @@ public class SegmentPrunerTest extends ControllerTest { private static final String SDF_PATTERN = "yyyyMMdd"; private static final String QUERY_1 = "SELECT * FROM testTable"; - private static final String QUERY_2 = "SELECT * FROM testTable where memberId = 0"; - private static final String QUERY_3 = "SELECT * FROM testTable where memberId IN (1, 2)"; - private static final String QUERY_4 = "SELECT * FROM testTable where memberId = 0 AND memberName='xyz'"; - - private static final String TIME_QUERY_1 = "SELECT * FROM testTable where timeColumn = 40"; - private static final String TIME_QUERY_2 = "SELECT * FROM testTable where timeColumn BETWEEN 20 AND 30"; - private static final String TIME_QUERY_3 = "SELECT * FROM testTable where 30 < timeColumn AND timeColumn <= 50"; - private static final String TIME_QUERY_4 = "SELECT * FROM testTable where timeColumn < 15 OR timeColumn > 45"; + private static final String QUERY_2 = "SELECT * FROM testTable WHERE memberId = 0"; + private static final String QUERY_3 = "SELECT * FROM testTable WHERE memberId IN (1, 2)"; + private static final String QUERY_4 = "SELECT * FROM testTable WHERE memberId = 0 AND memberName = 'xyz'"; + + private static final String TIME_QUERY_1 = "SELECT * FROM testTable WHERE timeColumn = 40"; + private static final String TIME_QUERY_2 = "SELECT * FROM testTable WHERE timeColumn BETWEEN 20 AND 30"; + private static final String TIME_QUERY_3 = "SELECT * FROM testTable WHERE 30 < timeColumn AND timeColumn <= 50"; + private static final String TIME_QUERY_4 = "SELECT * FROM testTable WHERE timeColumn < 15 OR timeColumn > 45"; private static final String TIME_QUERY_5 = - "SELECT * FROM testTable where timeColumn < 15 OR (60 < timeColumn AND timeColumn < 70)"; - private static final String TIME_QUERY_6 = "SELECT * FROM testTable where timeColumn < 0 AND timeColumn > 0"; + "SELECT * FROM testTable WHERE timeColumn < 15 OR (60 < timeColumn AND timeColumn < 70)"; + private static final String TIME_QUERY_6 = "SELECT * FROM testTable WHERE timeColumn NOT BETWEEN 20 AND 30"; + private static final String TIME_QUERY_7 = "SELECT * FROM testTable WHERE NOT timeColumn > 30"; + private static final String TIME_QUERY_8 = "SELECT * FROM testTable WHERE timeColumn < 0 AND timeColumn > 0"; - private static final String SDF_QUERY_1 = "SELECT * FROM testTable where timeColumn = 20200131"; - private static final String SDF_QUERY_2 = "SELECT * FROM testTable where timeColumn BETWEEN 20200101 AND 20200331"; + private static final String SDF_QUERY_1 = "SELECT * FROM testTable WHERE timeColumn = 20200131"; + private static final String SDF_QUERY_2 = "SELECT * FROM testTable WHERE timeColumn BETWEEN 20200101 AND 20200331"; private static final String SDF_QUERY_3 = - "SELECT * FROM testTable where 20200430 < timeColumn AND timeColumn < 20200630"; + "SELECT * FROM testTable WHERE 20200430 < timeColumn AND timeColumn < 20200630"; private static final String SDF_QUERY_4 = - "SELECT * FROM testTable where timeColumn <= 20200101 OR timeColumn in (20200201, 20200401)"; + "SELECT * FROM testTable WHERE timeColumn <= 20200101 OR timeColumn IN (20200201, 20200401)"; private static final String SDF_QUERY_5 = - "SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND timeColumn >= 20200530"; - - private static final String SQL_TIME_QUERY_1 = "SELECT * FROM testTable WHERE timeColumn NOT BETWEEN 20 AND 30"; - private static final String SQL_TIME_QUERY_2 = "SELECT * FROM testTable WHERE NOT timeColumn > 30"; + "SELECT * FROM testTable WHERE timeColumn IN (20200101, 20200102) AND timeColumn >= 20200530"; + + // Timestamp can be passed as string or long + private static final String TIMESTAMP_QUERY_1 = "SELECT * FROM testTable WHERE timeColumn = '2020-01-31 00:00:00'"; + private static final String TIMESTAMP_QUERY_2 = String.format("SELECT * FROM testTable WHERE timeColumn = %d", + Timestamp.valueOf("2020-01-31 00:00:00").getTime()); + private static final String TIMESTAMP_QUERY_3 = + "SELECT * FROM testTable WHERE timeColumn BETWEEN '2020-01-01 00:00:00' AND '2020-03-31 00:00:00'"; + private static final String TIMESTAMP_QUERY_4 = + String.format("SELECT * FROM testTable WHERE timeColumn BETWEEN %d AND %d", + Timestamp.valueOf("2020-01-01 00:00:00").getTime(), Timestamp.valueOf("2020-03-31 00:00:00").getTime()); + private static final String TIMESTAMP_QUERY_5 = + "SELECT * FROM testTable WHERE timeColumn <= '2020-01-01 00:00:00' OR timeColumn IN ('2020-02-01 00:00:00', " + + "'2020-04-01 00:00:00')"; + private static final String TIMESTAMP_QUERY_6 = + String.format("SELECT * FROM testTable WHERE timeColumn <= %d OR timeColumn IN (%d, %d)", + Timestamp.valueOf("2020-01-01 00:00:00").getTime(), Timestamp.valueOf("2020-02-01 00:00:00").getTime(), + Timestamp.valueOf("2020-04-01 00:00:00").getTime()); // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency. @@ -127,6 +142,7 @@ public void tearDown() { @Test public void testSegmentPrunerFactoryForPartitionPruner() { TableConfig tableConfig = mock(TableConfig.class); + when(tableConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME); IndexingConfig indexingConfig = mock(IndexingConfig.class); when(tableConfig.getIndexingConfig()).thenReturn(indexingConfig); @@ -141,8 +157,7 @@ public void testSegmentPrunerFactoryForPartitionPruner() { assertEquals(segmentPruners.size(), 0); // Segment partition config is missing - when(routingConfig.getSegmentPrunerTypes()).thenReturn( - Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE)); + when(routingConfig.getSegmentPrunerTypes()).thenReturn(List.of(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE)); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); assertEquals(segmentPruners.size(), 0); @@ -189,8 +204,7 @@ public void testSegmentPrunerFactoryForPartitionPruner() { @Test public void testSegmentPrunerFactoryForTimeRangePruner() { TableConfig tableConfig = mock(TableConfig.class); - when(tableConfig.getTableName()).thenReturn(RAW_TABLE_NAME); - setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.HOURS); + when(tableConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME); // Routing config is missing List segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); @@ -203,8 +217,7 @@ public void testSegmentPrunerFactoryForTimeRangePruner() { assertEquals(segmentPruners.size(), 0); // Validation config is missing - when(routingConfig.getSegmentPrunerTypes()).thenReturn( - Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE)); + when(routingConfig.getSegmentPrunerTypes()).thenReturn(List.of(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE)); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); assertEquals(segmentPruners.size(), 0); @@ -214,41 +227,54 @@ public void testSegmentPrunerFactoryForTimeRangePruner() { segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); assertEquals(segmentPruners.size(), 0); - // Time range pruner should be returned + // Schema is missing when(validationConfig.getTimeColumnName()).thenReturn(TIME_COLUMN); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); + assertEquals(segmentPruners.size(), 0); + + // Field spec is missing + Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).build(); + ZKMetadataProvider.setSchema(_propertyStore, schema); + segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); + assertEquals(segmentPruners.size(), 0); + + // Time range pruner should be returned + schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME) + .addDateTimeField(TIME_COLUMN, DataType.TIMESTAMP, "TIMESTAMP", "1:MILLISECONDS").build(); + ZKMetadataProvider.setSchema(_propertyStore, schema); + segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); assertEquals(segmentPruners.size(), 1); assertTrue(segmentPruners.get(0) instanceof TimeSegmentPruner); } @Test - public void testEnablingEmptySegmentPruner() { + public void testSegmentPrunerFactoryForEmptySegmentPruner() { TableConfig tableConfig = mock(TableConfig.class); + when(tableConfig.getTableName()).thenReturn(REALTIME_TABLE_NAME); IndexingConfig indexingConfig = mock(IndexingConfig.class); + when(tableConfig.getIndexingConfig()).thenReturn(indexingConfig); RoutingConfig routingConfig = mock(RoutingConfig.class); - StreamIngestionConfig streamIngestionConfig = mock(StreamIngestionConfig.class); + when(tableConfig.getRoutingConfig()).thenReturn(routingConfig); + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); // When routingConfig is configured with EmptySegmentPruner, EmptySegmentPruner should be returned. - when(tableConfig.getRoutingConfig()).thenReturn(routingConfig); - when(routingConfig.getSegmentPrunerTypes()).thenReturn( - Collections.singletonList(RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE)); - List segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); + when(routingConfig.getSegmentPrunerTypes()).thenReturn(List.of(RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE)); + List segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, propertyStore); assertEquals(segmentPruners.size(), 1); assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); // When indexingConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned. - when(indexingConfig.getStreamConfigs()).thenReturn( - Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)); - segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); + when(indexingConfig.getStreamConfigs()).thenReturn(Map.of(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)); + segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, propertyStore); assertEquals(segmentPruners.size(), 1); assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); // When streamIngestionConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned. + StreamIngestionConfig streamIngestionConfig = mock(StreamIngestionConfig.class); when(streamIngestionConfig.getStreamConfigMaps()).thenReturn( - Collections.singletonList(Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE))); - when(indexingConfig.getStreamConfigs()).thenReturn( - Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)); - segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); + List.of(Map.of(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE))); + when(indexingConfig.getStreamConfigs()).thenReturn(Map.of(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)); + segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, propertyStore); assertEquals(segmentPruners.size(), 1); assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); } @@ -259,95 +285,76 @@ public void testPartitionAwareSegmentPruner() { BrokerRequest brokerRequest2 = CalciteSqlCompiler.compileToBrokerRequest(QUERY_2); BrokerRequest brokerRequest3 = CalciteSqlCompiler.compileToBrokerRequest(QUERY_3); BrokerRequest brokerRequest4 = CalciteSqlCompiler.compileToBrokerRequest(QUERY_4); + // NOTE: Ideal state and external view are not used in the current implementation IdealState idealState = Mockito.mock(IdealState.class); ExternalView externalView = Mockito.mock(ExternalView.class); SinglePartitionColumnSegmentPruner singlePartitionColumnSegmentPruner = new SinglePartitionColumnSegmentPruner(OFFLINE_TABLE_NAME, PARTITION_COLUMN_1); - SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, - _propertyStore); + SegmentZkMetadataFetcher segmentZkMetadataFetcher = + new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore); segmentZkMetadataFetcher.register(singlePartitionColumnSegmentPruner); Set onlineSegments = new HashSet<>(); segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); - assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, Collections.emptySet()), - Collections.emptySet()); - assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, Collections.emptySet()), - Collections.emptySet()); - assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, Collections.emptySet()), - Collections.emptySet()); + + Set input = Set.of(); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, input), input); // Segments without metadata (not updated yet) should not be pruned String newSegment = "newSegment"; - assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, Collections.singleton(newSegment)), - Collections.singletonList(newSegment)); - assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, Collections.singleton(newSegment)), - Collections.singletonList(newSegment)); - assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, Collections.singleton(newSegment)), - Collections.singletonList(newSegment)); + onlineSegments.add(newSegment); + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); + input = Set.of(newSegment); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, input), input); // Segments without partition metadata should not be pruned String segmentWithoutPartitionMetadata = "segmentWithoutPartitionMetadata"; - onlineSegments.add(segmentWithoutPartitionMetadata); - SegmentZKMetadata segmentZKMetadataWithoutPartitionMetadata = - new SegmentZKMetadata(segmentWithoutPartitionMetadata); ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, - segmentZKMetadataWithoutPartitionMetadata); + new SegmentZKMetadata(segmentWithoutPartitionMetadata)); + onlineSegments.add(segmentWithoutPartitionMetadata); segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); - assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, - new HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))), - Collections.singletonList(segmentWithoutPartitionMetadata)); - assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, - new HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))), - Collections.singletonList(segmentWithoutPartitionMetadata)); - assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, - new HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))), - Collections.singletonList(segmentWithoutPartitionMetadata)); + input = Set.of(segmentWithoutPartitionMetadata); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, input), input); // Test different partition functions and number of partitions // 0 % 5 = 0; 1 % 5 = 1; 2 % 5 = 2 String segment0 = "segment0"; - onlineSegments.add(segment0); setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment0, "Modulo", 5, 0); + onlineSegments.add(segment0); // Murmur(0) % 4 = 0; Murmur(1) % 4 = 3; Murmur(2) % 4 = 0 String segment1 = "segment1"; - onlineSegments.add(segment1); setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment1, "Murmur", 4, 0); + onlineSegments.add(segment1); segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); - assertEquals( - singlePartitionColumnSegmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Arrays.asList(segment0, segment1))); - assertEquals( - singlePartitionColumnSegmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Arrays.asList(segment0, segment1))); - assertEquals( - singlePartitionColumnSegmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Collections.singletonList(segment1))); + input = Set.of(segment0, segment1); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, input), Set.of(segment1)); // Update partition metadata without refreshing should have no effect setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment0, "Modulo", 4, 1); segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); - assertEquals( - singlePartitionColumnSegmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Arrays.asList(segment0, segment1))); - assertEquals( - singlePartitionColumnSegmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Arrays.asList(segment0, segment1))); - assertEquals( - singlePartitionColumnSegmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Collections.singletonList(segment1))); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, input), Set.of(segment1)); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, input), input); // Refresh the changed segment should update the segment pruner segmentZkMetadataFetcher.refreshSegment(segment0); - assertEquals( - singlePartitionColumnSegmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Arrays.asList(segment0, segment1))); - assertEquals( - singlePartitionColumnSegmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Collections.singletonList(segment1))); - assertEquals( - singlePartitionColumnSegmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Arrays.asList(segment0, segment1))); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, input), Set.of(segment1)); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, input), input); + assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, input), Set.of(segment1)); // Multi-column partitioned segment. MultiPartitionColumnsSegmentPruner multiPartitionColumnsSegmentPruner = @@ -356,38 +363,25 @@ public void testPartitionAwareSegmentPruner() { segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore); segmentZkMetadataFetcher.register(multiPartitionColumnsSegmentPruner); segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); - assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1, Collections.emptySet()), - Collections.emptySet()); - assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2, Collections.emptySet()), - Collections.emptySet()); - assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest3, Collections.emptySet()), - Collections.emptySet()); - assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4, Collections.emptySet()), - Collections.emptySet()); + + assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1, input), input); + assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2, input), Set.of(segment1)); + assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest3, input), input); + assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4, input), Set.of(segment1)); String segment2 = "segment2"; - onlineSegments.add(segment2); Map columnPartitionMetadataMap = new HashMap<>(); - columnPartitionMetadataMap.put(PARTITION_COLUMN_1, - new ColumnPartitionMetadata("Modulo", 4, Collections.singleton(0), null)); - Map partitionColumn2FunctionConfig = new HashMap<>(); - partitionColumn2FunctionConfig.put("columnValues", "xyz|abc"); - partitionColumn2FunctionConfig.put("columnValuesDelimiter", "|"); - columnPartitionMetadataMap.put(PARTITION_COLUMN_2, new ColumnPartitionMetadata( - "BoundedColumnValue", 3, Collections.singleton(1), partitionColumn2FunctionConfig)); + columnPartitionMetadataMap.put(PARTITION_COLUMN_1, new ColumnPartitionMetadata("Modulo", 4, Set.of(0), null)); + columnPartitionMetadataMap.put(PARTITION_COLUMN_2, new ColumnPartitionMetadata("BoundedColumnValue", 3, Set.of(1), + Map.of("columnValues", "xyz|abc", "columnValuesDelimiter", "|"))); setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment2, columnPartitionMetadataMap); + onlineSegments.add(segment2); segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); - assertEquals( - multiPartitionColumnsSegmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Arrays.asList(segment0, segment1))); - assertEquals( - multiPartitionColumnsSegmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Collections.singletonList(segment1))); - assertEquals( - multiPartitionColumnsSegmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Arrays.asList(segment0, segment1))); - assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4, - new HashSet<>(Arrays.asList(segment0, segment1, segment2))), new HashSet<>(Arrays.asList(segment1, segment2))); + input = Set.of(segment0, segment1, segment2); + assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1, input), input); + assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2, input), Set.of(segment1, segment2)); + assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest3, input), Set.of(segment0, segment1)); + assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4, input), Set.of(segment1, segment2)); } @Test @@ -399,143 +393,112 @@ public void testTimeSegmentPruner() { BrokerRequest brokerRequest5 = CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_4); BrokerRequest brokerRequest6 = CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_5); BrokerRequest brokerRequest7 = CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_6); + BrokerRequest brokerRequest8 = CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_7); + BrokerRequest brokerRequest9 = CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_8); + // NOTE: Ideal state and external view are not used in the current implementation IdealState idealState = Mockito.mock(IdealState.class); ExternalView externalView = Mockito.mock(ExternalView.class); - TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME); - setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS); - TimeSegmentPruner segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, - _propertyStore); - Set onlineSegments = new HashSet<>(); - SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, - _propertyStore); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN).build(); + DateTimeFieldSpec timeFieldSpec = new DateTimeFieldSpec(TIME_COLUMN, DataType.INT, "EPOCH|DAYS", "1:DAYS"); + TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, timeFieldSpec); + SegmentZkMetadataFetcher segmentZkMetadataFetcher = + new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore); segmentZkMetadataFetcher.register(segmentPruner); + Set onlineSegments = new HashSet<>(); segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); - assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()), Collections.emptySet()); - assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()), Collections.emptySet()); - assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()), Collections.emptySet()); - assertEquals(segmentPruner.prune(brokerRequest4, Collections.emptySet()), Collections.emptySet()); - assertEquals(segmentPruner.prune(brokerRequest5, Collections.emptySet()), Collections.emptySet()); - assertEquals(segmentPruner.prune(brokerRequest6, Collections.emptySet()), Collections.emptySet()); - assertEquals(segmentPruner.prune(brokerRequest7, Collections.emptySet()), Collections.emptySet()); - - // Initialize with non-empty onlineSegments + + Set input = Set.of(); + assertEquals(segmentPruner.prune(brokerRequest1, input), input); + assertEquals(segmentPruner.prune(brokerRequest2, input), input); + assertEquals(segmentPruner.prune(brokerRequest3, input), input); + assertEquals(segmentPruner.prune(brokerRequest4, input), input); + assertEquals(segmentPruner.prune(brokerRequest5, input), input); + assertEquals(segmentPruner.prune(brokerRequest6, input), input); + assertEquals(segmentPruner.prune(brokerRequest7, input), input); + assertEquals(segmentPruner.prune(brokerRequest8, input), input); + assertEquals(segmentPruner.prune(brokerRequest9, input), input); + // Segments without metadata (not updated yet) should not be pruned - segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore); - segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore); - segmentZkMetadataFetcher.register(segmentPruner); String newSegment = "newSegment"; onlineSegments.add(newSegment); - segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); - assertEquals(segmentPruner.prune(brokerRequest1, Collections.singleton(newSegment)), - Collections.singletonList(newSegment)); - assertEquals(segmentPruner.prune(brokerRequest2, Collections.singleton(newSegment)), - Collections.singletonList(newSegment)); - assertEquals(segmentPruner.prune(brokerRequest3, Collections.singleton(newSegment)), - Collections.singletonList(newSegment)); - assertEquals(segmentPruner.prune(brokerRequest4, Collections.singleton(newSegment)), - Collections.singletonList(newSegment)); - assertEquals(segmentPruner.prune(brokerRequest5, Collections.singleton(newSegment)), - Collections.singletonList(newSegment)); - assertEquals(segmentPruner.prune(brokerRequest6, Collections.singleton(newSegment)), - Collections.singletonList(newSegment)); - assertEquals(segmentPruner.prune(brokerRequest7, Collections.singleton(newSegment)), - Collections.emptySet()); // query with invalid range will always have empty filtered result + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); + input = Set.of(newSegment); + assertEquals(segmentPruner.prune(brokerRequest1, input), input); + assertEquals(segmentPruner.prune(brokerRequest2, input), input); + assertEquals(segmentPruner.prune(brokerRequest3, input), input); + assertEquals(segmentPruner.prune(brokerRequest4, input), input); + assertEquals(segmentPruner.prune(brokerRequest5, input), input); + assertEquals(segmentPruner.prune(brokerRequest6, input), input); + assertEquals(segmentPruner.prune(brokerRequest7, input), input); + assertEquals(segmentPruner.prune(brokerRequest8, input), input); + assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // Query with invalid range // Segments without time range metadata should not be pruned String segmentWithoutTimeRangeMetadata = "segmentWithoutTimeRangeMetadata"; - onlineSegments.add(segmentWithoutTimeRangeMetadata); SegmentZKMetadata segmentZKMetadataWithoutTimeRangeMetadata = new SegmentZKMetadata(segmentWithoutTimeRangeMetadata); - segmentZKMetadataWithoutTimeRangeMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE); ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, REALTIME_TABLE_NAME, segmentZKMetadataWithoutTimeRangeMetadata); + onlineSegments.add(segmentWithoutTimeRangeMetadata); segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); - assertEquals( - segmentPruner.prune(brokerRequest1, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))), - Collections.singletonList(segmentWithoutTimeRangeMetadata)); - assertEquals( - segmentPruner.prune(brokerRequest2, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))), - Collections.singletonList(segmentWithoutTimeRangeMetadata)); - assertEquals( - segmentPruner.prune(brokerRequest3, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))), - Collections.singletonList(segmentWithoutTimeRangeMetadata)); - assertEquals( - segmentPruner.prune(brokerRequest4, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))), - Collections.singletonList(segmentWithoutTimeRangeMetadata)); - assertEquals( - segmentPruner.prune(brokerRequest5, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))), - Collections.singletonList(segmentWithoutTimeRangeMetadata)); - assertEquals( - segmentPruner.prune(brokerRequest6, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))), - Collections.singletonList(segmentWithoutTimeRangeMetadata)); - assertEquals( - segmentPruner.prune(brokerRequest7, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))), - Collections.emptySet()); + assertEquals(segmentPruner.prune(brokerRequest1, input), input); + assertEquals(segmentPruner.prune(brokerRequest2, input), input); + assertEquals(segmentPruner.prune(brokerRequest3, input), input); + assertEquals(segmentPruner.prune(brokerRequest4, input), input); + assertEquals(segmentPruner.prune(brokerRequest5, input), input); + assertEquals(segmentPruner.prune(brokerRequest6, input), input); + assertEquals(segmentPruner.prune(brokerRequest7, input), input); + assertEquals(segmentPruner.prune(brokerRequest8, input), input); + assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // Query with invalid range // Test different time range String segment0 = "segment0"; - onlineSegments.add(segment0); setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 10, 60, TimeUnit.DAYS); - + onlineSegments.add(segment0); String segment1 = "segment1"; - onlineSegments.add(segment1); setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 20, 30, TimeUnit.DAYS); - + onlineSegments.add(segment1); String segment2 = "segment2"; - onlineSegments.add(segment2); setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, TimeUnit.DAYS); - + onlineSegments.add(segment2); segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); - assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment1, segment2))); - assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - Collections.singleton(segment0)); - assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment1))); - assertEquals(segmentPruner.prune(brokerRequest4, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment2))); - assertEquals(segmentPruner.prune(brokerRequest5, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment2))); - assertEquals(segmentPruner.prune(brokerRequest6, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment2))); - assertEquals(segmentPruner.prune(brokerRequest7, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - Collections.emptySet()); + input = Set.of(segment0, segment1, segment2); + assertEquals(segmentPruner.prune(brokerRequest1, input), input); + assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0)); + assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment0, segment1)); + assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0, segment2)); + assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of(segment0, segment2)); + assertEquals(segmentPruner.prune(brokerRequest6, input), Set.of(segment0, segment2)); + assertEquals(segmentPruner.prune(brokerRequest7, input), Set.of(segment0, segment2)); + assertEquals(segmentPruner.prune(brokerRequest8, input), Set.of(segment0, segment1)); + assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // Query with invalid range // Update metadata without external view change or refreshing should have no effect setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 20, 30, TimeUnit.DAYS); - assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment1, segment2))); - assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - Collections.singleton(segment0)); - assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment1))); - assertEquals(segmentPruner.prune(brokerRequest4, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment2))); - assertEquals(segmentPruner.prune(brokerRequest5, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment2))); - assertEquals(segmentPruner.prune(brokerRequest6, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment2))); - assertEquals(segmentPruner.prune(brokerRequest7, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - Collections.emptySet()); + assertEquals(segmentPruner.prune(brokerRequest1, input), input); + assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0)); + assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment0, segment1)); + assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0, segment2)); + assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of(segment0, segment2)); + assertEquals(segmentPruner.prune(brokerRequest6, input), Set.of(segment0, segment2)); + assertEquals(segmentPruner.prune(brokerRequest7, input), Set.of(segment0, segment2)); + assertEquals(segmentPruner.prune(brokerRequest8, input), Set.of(segment0, segment1)); + assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // Query with invalid range // Refresh the changed segment should update the segment pruner segmentZkMetadataFetcher.refreshSegment(segment2); - assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment1, segment2))); - assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - Collections.singleton(segment0)); - assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment1, segment2))); - assertEquals(segmentPruner.prune(brokerRequest4, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - Collections.singleton(segment0)); - assertEquals(segmentPruner.prune(brokerRequest5, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - Collections.singleton(segment0)); - assertEquals(segmentPruner.prune(brokerRequest6, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - Collections.singleton(segment0)); - assertEquals(segmentPruner.prune(brokerRequest7, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - Collections.emptySet()); + assertEquals(segmentPruner.prune(brokerRequest1, input), input); + assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0)); + assertEquals(segmentPruner.prune(brokerRequest3, input), input); + assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0)); + assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of(segment0)); + assertEquals(segmentPruner.prune(brokerRequest6, input), Set.of(segment0)); + assertEquals(segmentPruner.prune(brokerRequest7, input), Set.of(segment0)); + assertEquals(segmentPruner.prune(brokerRequest8, input), input); + assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // Query with invalid range } @Test @@ -545,215 +508,175 @@ public void testTimeSegmentPrunerSimpleDateFormat() { BrokerRequest brokerRequest3 = CalciteSqlCompiler.compileToBrokerRequest(SDF_QUERY_3); BrokerRequest brokerRequest4 = CalciteSqlCompiler.compileToBrokerRequest(SDF_QUERY_4); BrokerRequest brokerRequest5 = CalciteSqlCompiler.compileToBrokerRequest(SDF_QUERY_5); + // NOTE: Ideal state and external view are not used in the current implementation IdealState idealState = Mockito.mock(IdealState.class); ExternalView externalView = Mockito.mock(ExternalView.class); - TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME); - setSchemaDateTimeFieldSpecSDF(RAW_TABLE_NAME, SDF_PATTERN); - - TimeSegmentPruner segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore); - SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, - _propertyStore); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN).build(); + DateTimeFieldSpec timeFieldSpec = + new DateTimeFieldSpec(TIME_COLUMN, DataType.STRING, "SIMPLE_DATE_FORMAT|" + SDF_PATTERN, "1:DAYS"); + TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, timeFieldSpec); + SegmentZkMetadataFetcher segmentZkMetadataFetcher = + new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore); segmentZkMetadataFetcher.register(segmentPruner); - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, RAW_TABLE_NAME); - DateTimeFormatSpec dateTimeFormatSpec = schema.getSpecForTimeColumn(TIME_COLUMN).getFormatSpec(); - + DateTimeFormatSpec timeFormatSpec = timeFieldSpec.getFormatSpec(); Set onlineSegments = new HashSet<>(); String segment0 = "segment0"; + setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, timeFormatSpec.fromFormatToMillis("20200101"), + timeFormatSpec.fromFormatToMillis("20200228"), TimeUnit.MILLISECONDS); onlineSegments.add(segment0); - setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, dateTimeFormatSpec.fromFormatToMillis("20200101"), - dateTimeFormatSpec.fromFormatToMillis("20200228"), TimeUnit.MILLISECONDS); - String segment1 = "segment1"; + setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, timeFormatSpec.fromFormatToMillis("20200201"), + timeFormatSpec.fromFormatToMillis("20200530"), TimeUnit.MILLISECONDS); onlineSegments.add(segment1); - setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, dateTimeFormatSpec.fromFormatToMillis("20200201"), - dateTimeFormatSpec.fromFormatToMillis("20200530"), TimeUnit.MILLISECONDS); - String segment2 = "segment2"; + setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, timeFormatSpec.fromFormatToMillis("20200401"), + timeFormatSpec.fromFormatToMillis("20200430"), TimeUnit.MILLISECONDS); onlineSegments.add(segment2); - setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, dateTimeFormatSpec.fromFormatToMillis("20200401"), - dateTimeFormatSpec.fromFormatToMillis("20200430"), TimeUnit.MILLISECONDS); - segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); - assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), Collections.singleton(segment0)); - assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment1))); - assertEquals(segmentPruner.prune(brokerRequest3, onlineSegments), Collections.singleton(segment1)); - assertEquals(segmentPruner.prune(brokerRequest4, onlineSegments), - new HashSet<>(Arrays.asList(segment0, segment1, segment2))); - assertEquals(segmentPruner.prune(brokerRequest5, onlineSegments), Collections.emptySet()); + + Set input = Set.of(segment0, segment1, segment2); + assertEquals(segmentPruner.prune(brokerRequest1, input), Set.of(segment0)); + assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0, segment1)); + assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment1)); + assertEquals(segmentPruner.prune(brokerRequest4, input), input); + assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of()); } @Test - public void testTimeSegmentPrunerSql() { - BrokerRequest brokerRequest1 = CalciteSqlCompiler.compileToBrokerRequest(SQL_TIME_QUERY_1); - BrokerRequest brokerRequest2 = CalciteSqlCompiler.compileToBrokerRequest(SQL_TIME_QUERY_2); + public void testTimeSegmentPrunerTimestampFormat() { + BrokerRequest brokerRequest1 = CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_1); + BrokerRequest brokerRequest2 = CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_2); + BrokerRequest brokerRequest3 = CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_3); + BrokerRequest brokerRequest4 = CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_4); + BrokerRequest brokerRequest5 = CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_5); + BrokerRequest brokerRequest6 = CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_6); + // NOTE: Ideal state and external view are not used in the current implementation IdealState idealState = Mockito.mock(IdealState.class); ExternalView externalView = Mockito.mock(ExternalView.class); - TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME); - setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS); - - TimeSegmentPruner segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore); - SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, - _propertyStore); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN).build(); + // Intentionally put EPOCH as the format which Pinot should handle + DateTimeFieldSpec timeFieldSpec = + new DateTimeFieldSpec(TIME_COLUMN, DataType.TIMESTAMP, "EPOCH|MILLISECONDS", "1:DAYS"); + TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, timeFieldSpec); + SegmentZkMetadataFetcher segmentZkMetadataFetcher = + new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore); segmentZkMetadataFetcher.register(segmentPruner); + DateTimeFormatSpec timeFormatSpec = timeFieldSpec.getFormatSpec(); Set onlineSegments = new HashSet<>(); String segment0 = "segment0"; + setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, + timeFormatSpec.fromFormatToMillis("2020-01-01 00:00:00"), + timeFormatSpec.fromFormatToMillis("2020-02-28 00:00:00"), TimeUnit.MILLISECONDS); onlineSegments.add(segment0); - setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 10, 60, TimeUnit.DAYS); String segment1 = "segment1"; + setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, + timeFormatSpec.fromFormatToMillis("2020-02-01 00:00:00"), + timeFormatSpec.fromFormatToMillis("2020-05-30 00:00:00"), TimeUnit.MILLISECONDS); onlineSegments.add(segment1); - setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 20, 30, TimeUnit.DAYS); String segment2 = "segment2"; + setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, + timeFormatSpec.fromFormatToMillis("2020-04-01 00:00:00"), + timeFormatSpec.fromFormatToMillis("2020-04-30 00:00:00"), TimeUnit.MILLISECONDS); onlineSegments.add(segment2); - setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, TimeUnit.DAYS); segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); - assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment2))); - assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment1))); + Set input = Set.of(segment0, segment1, segment2); + assertEquals(segmentPruner.prune(brokerRequest1, input), Set.of(segment0)); + assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0)); + assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment0, segment1)); + assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0, segment1)); + assertEquals(segmentPruner.prune(brokerRequest5, input), input); + assertEquals(segmentPruner.prune(brokerRequest6, input), input); } @Test public void testEmptySegmentPruner() { BrokerRequest brokerRequest1 = CalciteSqlCompiler.compileToBrokerRequest(QUERY_1); - BrokerRequest brokerRequest2 = CalciteSqlCompiler.compileToBrokerRequest(QUERY_2); - BrokerRequest brokerRequest3 = CalciteSqlCompiler.compileToBrokerRequest(QUERY_3); + // NOTE: Ideal state and external view are not used in the current implementation IdealState idealState = Mockito.mock(IdealState.class); ExternalView externalView = Mockito.mock(ExternalView.class); - TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).build(); - // init with list of segments + // Init with a list of segments EmptySegmentPruner segmentPruner = new EmptySegmentPruner(tableConfig); - SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, - _propertyStore); + SegmentZkMetadataFetcher segmentZkMetadataFetcher = + new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore); segmentZkMetadataFetcher.register(segmentPruner); Set onlineSegments = new HashSet<>(); String segment0 = "segment0"; - onlineSegments.add(segment0); setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment0, 10); + onlineSegments.add(segment0); String segment1 = "segment1"; - onlineSegments.add(segment1); setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment1, 0); + onlineSegments.add(segment1); segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); - assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Collections.singletonList(segment0))); - assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Collections.singletonList(segment0))); - assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1))), - new HashSet<>(Collections.singletonList(segment0))); - - // init with empty list of segments + assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), Set.of(segment0)); + + // Init with no segment segmentPruner = new EmptySegmentPruner(tableConfig); segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore); segmentZkMetadataFetcher.register(segmentPruner); onlineSegments.clear(); segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); - assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()), Collections.emptySet()); - assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()), Collections.emptySet()); - assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()), Collections.emptySet()); + assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), onlineSegments); // Segments without metadata (not updated yet) should not be pruned String newSegment = "newSegment"; onlineSegments.add(newSegment); segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); - assertEquals(segmentPruner.prune(brokerRequest1, Collections.singleton(newSegment)), - Collections.singleton(newSegment)); - assertEquals(segmentPruner.prune(brokerRequest2, Collections.singleton(newSegment)), - Collections.singleton(newSegment)); - assertEquals(segmentPruner.prune(brokerRequest3, Collections.singleton(newSegment)), - Collections.singleton(newSegment)); + assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), onlineSegments); // Segments without totalDocs metadata should not be pruned - onlineSegments.clear(); String segmentWithoutTotalDocsMetadata = "segmentWithoutTotalDocsMetadata"; - onlineSegments.add(segmentWithoutTotalDocsMetadata); SegmentZKMetadata segmentZKMetadataWithoutTotalDocsMetadata = new SegmentZKMetadata(segmentWithoutTotalDocsMetadata); - segmentZKMetadataWithoutTotalDocsMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS); ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, REALTIME_TABLE_NAME, segmentZKMetadataWithoutTotalDocsMetadata); + onlineSegments.add(segmentWithoutTotalDocsMetadata); segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); - assertEquals(segmentPruner.prune(brokerRequest1, Collections.singleton(segmentWithoutTotalDocsMetadata)), - Collections.singleton(segmentWithoutTotalDocsMetadata)); - assertEquals(segmentPruner.prune(brokerRequest2, Collections.singleton(segmentWithoutTotalDocsMetadata)), - Collections.singleton(segmentWithoutTotalDocsMetadata)); - assertEquals(segmentPruner.prune(brokerRequest3, Collections.singleton(segmentWithoutTotalDocsMetadata)), - Collections.singleton(segmentWithoutTotalDocsMetadata)); + assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), onlineSegments); // Segments with -1 totalDocs should not be pruned - onlineSegments.clear(); String segmentWithNegativeTotalDocsMetadata = "segmentWithNegativeTotalDocsMetadata"; - onlineSegments.add(segmentWithNegativeTotalDocsMetadata); setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segmentWithNegativeTotalDocsMetadata, -1); + onlineSegments.add(segmentWithNegativeTotalDocsMetadata); segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); - assertEquals(segmentPruner.prune(brokerRequest1, Collections.singleton(segmentWithNegativeTotalDocsMetadata)), - Collections.singleton(segmentWithNegativeTotalDocsMetadata)); - assertEquals(segmentPruner.prune(brokerRequest2, Collections.singleton(segmentWithNegativeTotalDocsMetadata)), - Collections.singleton(segmentWithNegativeTotalDocsMetadata)); - assertEquals(segmentPruner.prune(brokerRequest3, Collections.singleton(segmentWithNegativeTotalDocsMetadata)), - Collections.singleton(segmentWithNegativeTotalDocsMetadata)); + assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), onlineSegments); // Prune segments with 0 total docs onlineSegments.clear(); - onlineSegments.add(segment0); setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment0, 10); - onlineSegments.add(segment1); + onlineSegments.add(segment0); setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment1, 0); + onlineSegments.add(segment1); String segment2 = "segment2"; - onlineSegments.add(segment2); setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment2, -1); - + onlineSegments.add(segment2); segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); - assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment2))); - assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment2))); - assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment2))); + assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), Set.of(segment0, segment2)); // Update metadata without external view change or refreshing should have no effect - setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 20, 30, TimeUnit.DAYS); setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment2, 0); - assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment2))); - assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment2))); - assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Arrays.asList(segment0, segment2))); + assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), Set.of(segment0, segment2)); // Refresh the changed segment should update the segment pruner segmentZkMetadataFetcher.refreshSegment(segment2); - assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Collections.singletonList(segment0))); - assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Collections.singletonList(segment0))); - assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), - new HashSet<>(Collections.singletonList(segment0))); - } - - private TableConfig getTableConfig(String rawTableName, TableType type) { - return new TableConfigBuilder(type).setTableName(rawTableName).setTimeColumnName(TIME_COLUMN).build(); - } - - private void setSchemaDateTimeFieldSpec(String rawTableName, TimeUnit timeUnit) { - ZKMetadataProvider.setSchema(_propertyStore, new Schema.SchemaBuilder().setSchemaName(rawTableName) - .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:" + timeUnit + ":EPOCH", "1:" + timeUnit).build()); - } - - private void setSchemaDateTimeFieldSpecSDF(String rawTableName, String format) { - ZKMetadataProvider.setSchema(_propertyStore, new Schema.SchemaBuilder().setSchemaName(rawTableName) - .addDateTime(TIME_COLUMN, FieldSpec.DataType.STRING, "1:DAYS:SIMPLE_DATE_FORMAT:" + format, "1:DAYS").build()); + assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), Set.of(segment0)); } private void setSegmentZKPartitionMetadata(String tableNameWithType, String segment, String partitionFunction, int numPartitions, int partitionId) { SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segment); - segmentZKMetadata.setPartitionMetadata(new SegmentPartitionMetadata(Collections.singletonMap(PARTITION_COLUMN_1, - new ColumnPartitionMetadata(partitionFunction, numPartitions, Collections.singleton(partitionId), null)))); + segmentZKMetadata.setPartitionMetadata(new SegmentPartitionMetadata(Map.of(PARTITION_COLUMN_1, + new ColumnPartitionMetadata(partitionFunction, numPartitions, Set.of(partitionId), null)))); ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, tableNameWithType, segmentZKMetadata); } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java index 626a09100555..e8fd1287293c 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java @@ -221,8 +221,7 @@ public void testFetchFieldSpecForTime() { .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.DAYS, "time"), null) .addDateTime("dateTime0", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS") .addDateTime("dateTime1", FieldSpec.DataType.TIMESTAMP, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") - .addDateTime("dateTime2", FieldSpec.DataType.INT, "1:DAYS:EPOCH", "1:DAYS") - .build(); + .addDateTime("dateTime2", FieldSpec.DataType.INT, "1:DAYS:EPOCH", "1:DAYS").build(); // Test method which fetches the DateTimeFieldSpec given the timeColumnName // Test is on TIME @@ -254,7 +253,7 @@ public void testFetchFieldSpecForTime() { Assert.assertEquals(dateTimeFieldSpec.getDataType(), FieldSpec.DataType.TIMESTAMP); Assert.assertTrue(dateTimeFieldSpec.isSingleValueField()); Assert.assertEquals(dateTimeFieldSpec.getDefaultNullValue(), 0L); - Assert.assertEquals(dateTimeFieldSpec.getFormat(), "1:MILLISECONDS:EPOCH"); + Assert.assertEquals(dateTimeFieldSpec.getFormat(), "TIMESTAMP"); Assert.assertEquals(dateTimeFieldSpec.getGranularity(), "1:MILLISECONDS"); dateTimeFieldSpec = schema.getSpecForTimeColumn("dateTime2"); @@ -326,15 +325,10 @@ public void testSerializeDeserialize() @Test public void testSerializeDeserializeOptions() throws IOException { - String json = "{\n" - + " \"primaryKeyColumns\" : null,\n" - + " \"timeFieldSpec\" : null,\n" - + " \"schemaName\" : null,\n" - + " \"enableColumnBasedNullHandling\" : true,\n" - + " \"dimensionFieldSpecs\" : [ ],\n" - + " \"metricFieldSpecs\" : [ ],\n" - + " \"dateTimeFieldSpecs\" : [ ]\n" - + "}"; + String json = + "{\n" + " \"primaryKeyColumns\" : null,\n" + " \"timeFieldSpec\" : null,\n" + " \"schemaName\" : null,\n" + + " \"enableColumnBasedNullHandling\" : true,\n" + " \"dimensionFieldSpecs\" : [ ],\n" + + " \"metricFieldSpecs\" : [ ],\n" + " \"dateTimeFieldSpecs\" : [ ]\n" + "}"; JsonNode expectedNode = JsonUtils.stringToJsonNode(json); Schema schema = JsonUtils.jsonNodeToObject(expectedNode, Schema.class); @@ -363,6 +357,17 @@ public void testSimpleDateFormat() Assert.assertEquals(schemaFromJson.hashCode(), schema.hashCode()); } + @Test + public void testTimestampFormatOverride() + throws Exception { + URL resourceUrl = getClass().getClassLoader().getResource("schemaTest.schema"); + Assert.assertNotNull(resourceUrl); + Schema schema = Schema.fromFile(new File(resourceUrl.getFile())); + DateTimeFieldSpec fieldSpec = schema.getDateTimeSpec("dateTime3"); + Assert.assertNotNull(fieldSpec); + Assert.assertEquals(fieldSpec.getFormat(), "TIMESTAMP"); + } + @Test public void testByteType() throws Exception { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java index ea9285a10487..dbb92090d177 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java @@ -25,6 +25,7 @@ import javax.annotation.Nullable; import org.apache.pinot.spi.utils.EqualityUtils; + @SuppressWarnings("unused") @JsonIgnoreProperties(ignoreUnknown = true) public final class DateTimeFieldSpec extends FieldSpec { @@ -74,6 +75,10 @@ public DateTimeFieldSpec(String name, DataType dataType, String format, String g @Nullable Object sampleValue) { super(name, dataType, true); + // Override format to be "TIMESTAMP" for TIMESTAMP data type because the format is implicit + if (dataType == DataType.TIMESTAMP) { + format = TimeFormat.TIMESTAMP.name(); + } _format = format; _granularity = granularity; _formatSpec = new DateTimeFormatSpec(format); @@ -119,13 +124,23 @@ public void setSingleValueField(boolean isSingleValueField) { Preconditions.checkArgument(isSingleValueField, "Unsupported multi-value for date time field."); } + @Override + public void setDataType(DataType dataType) { + super.setDataType(dataType); + if (dataType == DataType.TIMESTAMP) { + _format = TimeFormat.TIMESTAMP.name(); + } + } + public String getFormat() { return _format; } // Required by JSON de-serializer. DO NOT REMOVE. public void setFormat(String format) { - _format = format; + if (_dataType != DataType.TIMESTAMP) { + _format = format; + } } @JsonIgnore