Skip to content

Commit

Permalink
Fix the time segment pruner on TIMESTAMP data type (apache#12789)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Apr 8, 2024
1 parent 32a02bc commit a0a2171
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 472 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pinot.broker.routing.segmentpruner;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -37,7 +35,6 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -68,7 +65,7 @@ public static List<SegmentPruner> getSegmentPruners(TableConfig tableConfig,
List<SegmentPruner> configuredSegmentPruners = new ArrayList<>(segmentPrunerTypes.size());
for (String segmentPrunerType : segmentPrunerTypes) {
if (RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType)) {
SegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore);
SegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig);
if (partitionSegmentPruner != null) {
configuredSegmentPruners.add(partitionSegmentPruner);
}
Expand All @@ -91,7 +88,7 @@ public static List<SegmentPruner> getSegmentPruners(TableConfig tableConfig,
if ((tableType == TableType.OFFLINE && LEGACY_PARTITION_AWARE_OFFLINE_ROUTING.equalsIgnoreCase(
routingTableBuilderName)) || (tableType == TableType.REALTIME
&& LEGACY_PARTITION_AWARE_REALTIME_ROUTING.equalsIgnoreCase(routingTableBuilderName))) {
SegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore);
SegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig);
if (partitionSegmentPruner != null) {
segmentPruners.add(partitionSegmentPruner);
}
Expand All @@ -102,8 +99,7 @@ public static List<SegmentPruner> getSegmentPruners(TableConfig tableConfig,
}

@Nullable
private static SegmentPruner getPartitionSegmentPruner(TableConfig tableConfig,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
private static SegmentPruner getPartitionSegmentPruner(TableConfig tableConfig) {
String tableNameWithType = tableConfig.getTableName();
SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
if (segmentPartitionConfig == null) {
Expand Down Expand Up @@ -137,26 +133,20 @@ private static TimeSegmentPruner getTimeSegmentPruner(TableConfig tableConfig,
LOGGER.warn("Cannot enable time range pruning without time column for table: {}", tableNameWithType);
return null;
}
return createTimeSegmentPruner(tableConfig, propertyStore);
}

@VisibleForTesting
static TimeSegmentPruner createTimeSegmentPruner(TableConfig tableConfig,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
String tableNameWithType = tableConfig.getTableName();
String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
Preconditions.checkNotNull(timeColumn, "Time column must be configured in table config for table: %s",
tableNameWithType);
Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, tableNameWithType);
Preconditions.checkNotNull(schema, "Failed to find schema for table: %s", tableNameWithType);
DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn);
Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in schema for time column: %s of table: %s",
timeColumn, tableNameWithType);
DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec();

LOGGER.info("Using TimeRangePruner on time column: {} for table: {} with DateTimeFormatSpec: {}",
timeColumn, tableNameWithType, timeFormatSpec);
return new TimeSegmentPruner(tableConfig, timeColumn, timeFormatSpec);
Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, tableConfig);
if (schema == null) {
LOGGER.warn("Cannot enable time range pruning without schema for table: {}", tableNameWithType);
return null;
}
DateTimeFieldSpec timeFieldSpec = schema.getSpecForTimeColumn(timeColumn);
if (timeFieldSpec == null) {
LOGGER.warn("Cannot enable time range pruning without field spec for table: {}, time column: {}",
tableNameWithType, timeColumn);
return null;
}
LOGGER.info("Using TimeRangePruner on time column: {} for table: {} with DateTimeFieldSpec: {}", timeColumn,
tableNameWithType, timeFieldSpec);
return new TimeSegmentPruner(tableConfig, timeFieldSpec);
}

private static List<SegmentPruner> sortSegmentPruners(List<SegmentPruner> pruners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.broker.routing.segmentpruner;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -37,7 +38,9 @@
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.Function;
import org.apache.pinot.common.request.Identifier;
import org.apache.pinot.common.request.Literal;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Query.Range;
Expand All @@ -64,10 +67,10 @@ public class TimeSegmentPruner implements SegmentPruner {
private volatile IntervalTree<String> _intervalTree;
private final Map<String, Interval> _intervalMap = new HashMap<>();

public TimeSegmentPruner(TableConfig tableConfig, String timeColumn, DateTimeFormatSpec timeFormatSpec) {
public TimeSegmentPruner(TableConfig tableConfig, DateTimeFieldSpec timeFieldSpec) {
_tableNameWithType = tableConfig.getTableName();
_timeColumn = timeColumn;
_timeFormatSpec = timeFormatSpec;
_timeColumn = timeFieldSpec.getName();
_timeFormatSpec = timeFieldSpec.getFormatSpec();
}

@Override
Expand Down Expand Up @@ -206,97 +209,53 @@ private List<Interval> getFilterTimeIntervals(Expression filterExpression) {
} else {
return getComplementSortedIntervals(childIntervals);
}
case EQUALS: {
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null && identifier.getName().equals(_timeColumn)) {
long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
return Collections.singletonList(new Interval(timeStamp, timeStamp));
} else {
return null;
case EQUALS:
if (isTimeColumn(operands.get(0))) {
long timestamp = toMillisSinceEpoch(operands.get(1));
return List.of(new Interval(timestamp, timestamp));
}
}
case IN: {
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null && identifier.getName().equals(_timeColumn)) {
return null;
case IN:
if (isTimeColumn(operands.get(0))) {
int numOperands = operands.size();
List<Interval> intervals = new ArrayList<>(numOperands - 1);
for (int i = 1; i < numOperands; i++) {
long timeStamp =
_timeFormatSpec.fromFormatToMillis(operands.get(i).getLiteral().getFieldValue().toString());
intervals.add(new Interval(timeStamp, timeStamp));
long timestamp = toMillisSinceEpoch(operands.get(i));
intervals.add(new Interval(timestamp, timestamp));
}
return intervals;
} else {
return null;
}
}
case GREATER_THAN: {
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null && identifier.getName().equals(_timeColumn)) {
long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
return Collections.singletonList(new Interval(timeStamp + 1, MAX_END_TIME));
} else {
return null;
return null;
case GREATER_THAN:
if (isTimeColumn(operands.get(0))) {
return getInterval(toMillisSinceEpoch(operands.get(1)) + 1, MAX_END_TIME);
}
}
case GREATER_THAN_OR_EQUAL: {
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null && identifier.getName().equals(_timeColumn)) {
long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
return Collections.singletonList(new Interval(timeStamp, MAX_END_TIME));
} else {
return null;
return null;
case GREATER_THAN_OR_EQUAL:
if (isTimeColumn(operands.get(0))) {
return getInterval(toMillisSinceEpoch(operands.get(1)), MAX_END_TIME);
}
}
case LESS_THAN: {
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null && identifier.getName().equals(_timeColumn)) {
long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
if (timeStamp > MIN_START_TIME) {
return Collections.singletonList(new Interval(MIN_START_TIME, timeStamp - 1));
} else {
return Collections.emptyList();
}
} else {
return null;
return null;
case LESS_THAN:
if (isTimeColumn(operands.get(0))) {
return getInterval(MIN_START_TIME, toMillisSinceEpoch(operands.get(1)) - 1);
}
}
case LESS_THAN_OR_EQUAL: {
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null && identifier.getName().equals(_timeColumn)) {
long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
if (timeStamp >= MIN_START_TIME) {
return Collections.singletonList(new Interval(MIN_START_TIME, timeStamp));
} else {
return Collections.emptyList();
}
} else {
return null;
return null;
case LESS_THAN_OR_EQUAL:
if (isTimeColumn(operands.get(0))) {
return getInterval(MIN_START_TIME, toMillisSinceEpoch(operands.get(1)));
}
}
case BETWEEN: {
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null && identifier.getName().equals(_timeColumn)) {
long startTimestamp =
_timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
long endTimestamp =
_timeFormatSpec.fromFormatToMillis(operands.get(2).getLiteral().getFieldValue().toString());
if (endTimestamp >= startTimestamp) {
return Collections.singletonList(new Interval(startTimestamp, endTimestamp));
} else {
return Collections.emptyList();
}
} else {
return null;
return null;
case BETWEEN:
if (isTimeColumn(operands.get(0))) {
return getInterval(toMillisSinceEpoch(operands.get(1)), toMillisSinceEpoch(operands.get(2)));
}
}
case RANGE: {
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null && identifier.getName().equals(_timeColumn)) {
return null;
case RANGE:
if (isTimeColumn(operands.get(0))) {
return parseInterval(operands.get(1).getLiteral().getFieldValue().toString());
}
return null;
}
default:
return null;
}
Expand Down Expand Up @@ -408,6 +367,17 @@ private List<Interval> getComplementSortedIntervals(List<Interval> intervals) {
return res;
}

private boolean isTimeColumn(Expression expression) {
Identifier identifier = expression.getIdentifier();
return identifier != null && identifier.getName().equals(_timeColumn);
}

private long toMillisSinceEpoch(Expression expression) {
Literal literal = expression.getLiteral();
Preconditions.checkArgument(literal != null, "Literal is required for time column filter, got: %s", expression);
return _timeFormatSpec.fromFormatToMillis(literal.getFieldValue().toString());
}

/**
* Parse interval to millisecond as [min, max] with both sides included.
* E.g. '(* 16311]' is parsed as [0, 16311], '(1455 16311)' is parsed as [1456, 16310]
Expand All @@ -432,10 +402,10 @@ private List<Interval> parseInterval(String rangeString) {
endTime--;
}
}
return getInterval(startTime, endTime);
}

if (startTime > endTime) {
return Collections.emptyList();
}
return Collections.singletonList(new Interval(startTime, endTime));
private static List<Interval> getInterval(long inclusiveStart, long inclusiveEnd) {
return inclusiveStart <= inclusiveEnd ? List.of(new Interval(inclusiveStart, inclusiveEnd)) : List.of();
}
}
Loading

0 comments on commit a0a2171

Please sign in to comment.