diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java index b3337f4307c5..2691d827f895 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java @@ -32,6 +32,7 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Objects; +import java.util.Set; /** * Input spec representing a Druid table. @@ -45,28 +46,35 @@ public class TableInputSpec implements InputSpec @Nullable private final DimFilter filter; + @Nullable + private final Set filterFields; + /** * Create a table input spec. * - * @param dataSource datasource to read - * @param intervals intervals to filter, or null if no time filtering is desired. Interval filtering is strict, - * meaning that when this spec is sliced and read, the returned {@link SegmentWithDescriptor} - * from {@link ReadableInput#getSegment()} are clipped to these intervals. - * @param filter other filters to use for pruning, or null if no pruning is desired. Pruning filters are - * *not strict*, which means that processors must re-apply them when processing the returned - * {@link SegmentWithDescriptor} from {@link ReadableInput#getSegment()}. This matches how - * Broker-based pruning works for native queries. + * @param dataSource datasource to read + * @param intervals intervals to filter, or null if no time filtering is desired. Interval filtering is strict, + * meaning that when this spec is sliced and read, the returned {@link SegmentWithDescriptor} + * from {@link ReadableInput#getSegment()} are clipped to these intervals. + * @param filter other filters to use for pruning, or null if no pruning is desired. Pruning filters are + * *not strict*, which means that processors must re-apply them when processing the returned + * {@link SegmentWithDescriptor} from {@link ReadableInput#getSegment()}. This matches how + * Broker-based pruning works for native queries. + * @param filterFields list of fields from {@link DimFilter#getRequiredColumns()} to consider for pruning. If null, + * all fields are considered for pruning. */ @JsonCreator public TableInputSpec( @JsonProperty("dataSource") String dataSource, @JsonProperty("intervals") @Nullable List intervals, - @JsonProperty("filter") @Nullable DimFilter filter + @JsonProperty("filter") @Nullable DimFilter filter, + @JsonProperty("filterFields") @Nullable Set filterFields ) { this.dataSource = dataSource; this.intervals = intervals == null ? Intervals.ONLY_ETERNITY : intervals; this.filter = filter; + this.filterFields = filterFields; } @JsonProperty @@ -97,6 +105,14 @@ public DimFilter getFilter() return filter; } + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public Set getFilterFields() + { + return filterFields; + } + @Override public boolean equals(Object o) { @@ -109,13 +125,14 @@ public boolean equals(Object o) TableInputSpec that = (TableInputSpec) o; return Objects.equals(dataSource, that.dataSource) && Objects.equals(intervals, that.intervals) - && Objects.equals(filter, that.filter); + && Objects.equals(filter, that.filter) + && Objects.equals(filterFields, that.filterFields); } @Override public int hashCode() { - return Objects.hash(dataSource, intervals, filter); + return Objects.hash(dataSource, intervals, filter, filterFields); } @Override @@ -124,7 +141,8 @@ public String toString() return "TableInputSpec{" + "dataSource='" + dataSource + '\'' + ", intervals=" + intervals + - ", filter=" + filter + + (filter == null ? "" : ", filter=" + filter) + + (filterFields == null ? "" : ", filterFields=" + filterFields) + '}'; } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java index 91f2e681e1ea..1cd82f726ed6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -115,8 +116,10 @@ private Set getPrunedSegmentSet(final TableInputSpec ta return DimFilterUtils.filterShards( tableInputSpec.getFilter(), + tableInputSpec.getFilterFields(), () -> dataSegmentIterator, - segment -> segment.getSegment().getShardSpec() + segment -> segment.getSegment().getShardSpec(), + new HashMap<>() ); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 16eaef63c497..503153ec275d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -54,6 +54,7 @@ import org.apache.druid.query.UnionDataSource; import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.filter.DimFilterUtils; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.planning.PreJoinableClause; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; @@ -74,8 +75,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; +/** + * Plan for getting data from a {@link DataSource}. Used by {@link QueryKit} implementations. + */ public class DataSourcePlan { /** @@ -116,6 +121,22 @@ public class DataSourcePlan } } + /** + * Build a plan. + * + * @param queryKit query kit reference for recursive planning + * @param queryId query ID + * @param queryContext query context + * @param dataSource datasource to plan + * @param querySegmentSpec intervals for mandatory pruning. Must be {@link MultipleIntervalSegmentSpec}. The returned + * plan is guaranteed to be filtered to this interval. + * @param filter filter for best-effort pruning. The returned plan may or may not be filtered to this + * filter. Query processing must still apply the filter to generated correct results. + * @param filterFields which fields from the filter to consider for pruning, or null to consider all fields. + * @param maxWorkerCount maximum number of workers for subqueries + * @param minStageNumber starting stage number for subqueries + * @param broadcast whether the plan should broadcast data for this datasource + */ @SuppressWarnings("rawtypes") public static DataSourcePlan forDataSource( final QueryKit queryKit, @@ -124,13 +145,31 @@ public static DataSourcePlan forDataSource( final DataSource dataSource, final QuerySegmentSpec querySegmentSpec, @Nullable DimFilter filter, + @Nullable Set filterFields, final int maxWorkerCount, final int minStageNumber, final boolean broadcast ) { + if (!queryContext.isSecondaryPartitionPruningEnabled()) { + // Clear filter, we don't want to prune today. + filter = null; + } + + if (filter != null && filterFields == null) { + // Ensure filterFields is nonnull if filter is nonnull. Helps for other forXYZ methods, so they don't need to + // deal with the case where filter is nonnull but filterFields is null. + filterFields = filter.getRequiredColumns(); + } + if (dataSource instanceof TableDataSource) { - return forTable((TableDataSource) dataSource, querySegmentSpecIntervals(querySegmentSpec), filter, broadcast); + return forTable( + (TableDataSource) dataSource, + querySegmentSpecIntervals(querySegmentSpec), + filter, + filterFields, + broadcast + ); } else if (dataSource instanceof ExternalDataSource) { checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forExternal((ExternalDataSource) dataSource, broadcast); @@ -179,6 +218,7 @@ public static DataSourcePlan forDataSource( (UnionDataSource) dataSource, querySegmentSpec, filter, + filterFields, maxWorkerCount, minStageNumber, broadcast @@ -198,6 +238,8 @@ public static DataSourcePlan forDataSource( queryContext, (JoinDataSource) dataSource, querySegmentSpec, + filter, + filterFields, maxWorkerCount, minStageNumber, broadcast @@ -222,21 +264,36 @@ public static DataSourcePlan forDataSource( } } + /** + * Possibly remapped datasource that should be used when processing. Will be either the original datasource, or the + * original datasource with itself or some children replaced by {@link InputNumberDataSource}. Any added + * {@link InputNumberDataSource} refer to {@link StageInputSpec} in {@link #getInputSpecs()}. + */ public DataSource getNewDataSource() { return newDataSource; } + /** + * Input specs that should be used when processing. + */ public List getInputSpecs() { return inputSpecs; } + /** + * Which input specs from {@link #getInputSpecs()} are broadcast. + */ public IntSet getBroadcastInputs() { return broadcastInputs; } + /** + * Returns a {@link QueryDefinitionBuilder} that includes any {@link StageInputSpec} from {@link #getInputSpecs()}. + * Absent if this plan does not involve reading from prior stages. + */ public Optional getSubQueryDefBuilder() { return Optional.ofNullable(subQueryDefBuilder); @@ -302,12 +359,13 @@ private static DataSourcePlan forTable( final TableDataSource dataSource, final List intervals, @Nullable final DimFilter filter, + @Nullable final Set filterFields, final boolean broadcast ) { return new DataSourcePlan( (broadcast && dataSource.isGlobal()) ? dataSource : new InputNumberDataSource(0), - Collections.singletonList(new TableInputSpec(dataSource.getName(), intervals, filter)), + Collections.singletonList(new TableInputSpec(dataSource.getName(), intervals, filter, filterFields)), broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), null ); @@ -407,6 +465,7 @@ private static DataSourcePlan forFilteredDataSource( dataSource.getBase(), querySegmentSpec, null, + null, maxWorkerCount, minStageNumber, broadcast @@ -447,6 +506,7 @@ private static DataSourcePlan forUnnest( dataSource.getBase(), querySegmentSpec, null, + null, maxWorkerCount, minStageNumber, broadcast @@ -478,6 +538,7 @@ private static DataSourcePlan forUnion( final UnionDataSource unionDataSource, final QuerySegmentSpec querySegmentSpec, @Nullable DimFilter filter, + @Nullable Set filterFields, final int maxWorkerCount, final int minStageNumber, final boolean broadcast @@ -499,6 +560,7 @@ private static DataSourcePlan forUnion( child, querySegmentSpec, filter, + filterFields, maxWorkerCount, Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()), broadcast @@ -528,6 +590,8 @@ private static DataSourcePlan forBroadcastHashJoin( final QueryContext queryContext, final JoinDataSource dataSource, final QuerySegmentSpec querySegmentSpec, + @Nullable final DimFilter filter, + @Nullable final Set filterFields, final int maxWorkerCount, final int minStageNumber, final boolean broadcast @@ -542,7 +606,8 @@ private static DataSourcePlan forBroadcastHashJoin( queryContext, analysis.getBaseDataSource(), querySegmentSpec, - null, // Don't push query filters down through a join: this needs some work to ensure pruning works properly. + filter, + filter == null ? null : DimFilterUtils.onlyBaseFields(filterFields, analysis), maxWorkerCount, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), broadcast @@ -561,7 +626,8 @@ private static DataSourcePlan forBroadcastHashJoin( queryContext, clause.getDataSource(), new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY), - null, // Don't push query filters down through a join: this needs some work to ensure pruning works properly. + null, // Don't push down query filters for right-hand side: needs some work to ensure it works properly. + null, maxWorkerCount, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), true // Always broadcast right-hand side of the join. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index b12048a315f2..469a8a8aa46c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -84,6 +84,7 @@ public QueryDefinition makeQueryDefinition( originalQuery.getDataSource(), originalQuery.getQuerySegmentSpec(), originalQuery.getFilter(), + null, maxWorkerCount, minStageNumber, false diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index 681d0ae9c00a..986554f86c80 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -95,6 +95,7 @@ public QueryDefinition makeQueryDefinition( originalQuery.getDataSource(), originalQuery.getQuerySegmentSpec(), originalQuery.getFilter(), + null, maxWorkerCount, minStageNumber, false diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java index fd5db7e75f64..45831a779f5f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java @@ -112,13 +112,13 @@ public void setUp() @Test public void test_canSliceDynamic() { - Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE, null, null))); + Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE, null, null, null))); } @Test public void test_sliceStatic_noDataSource() { - final TableInputSpec spec = new TableInputSpec("no such datasource", null, null); + final TableInputSpec spec = new TableInputSpec("no such datasource", null, null, null); Assert.assertEquals( ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE), slicer.sliceStatic(spec, 2) @@ -134,6 +134,7 @@ public void test_sliceStatic_intervalFilter() Intervals.of("2000/P1M"), Intervals.of("2000-06-01/P1M") ), + null, null ); @@ -183,6 +184,7 @@ public void test_sliceStatic_intervalFilterMatchesNothing() final TableInputSpec spec = new TableInputSpec( DATASOURCE, Collections.singletonList(Intervals.of("2002/P1M")), + null, null ); @@ -192,13 +194,65 @@ public void test_sliceStatic_intervalFilterMatchesNothing() ); } + @Test + public void test_sliceStatic_dimFilterNotUsed() + { + final TableInputSpec spec = new TableInputSpec( + DATASOURCE, + null, + new SelectorDimFilter("dim", "bar", null), + Collections.emptyList() + ); + + Assert.assertEquals( + ImmutableList.of( + new SegmentsInputSlice( + DATASOURCE, + ImmutableList.of( + new RichSegmentDescriptor( + SEGMENT1.getInterval(), + Intervals.of("2000/P1M"), + SEGMENT1.getVersion(), + SEGMENT1.getShardSpec().getPartitionNum(), + null + ), + new RichSegmentDescriptor( + SEGMENT2.getInterval(), + Intervals.of("2000/P1M"), + SEGMENT2.getVersion(), + SEGMENT2.getShardSpec().getPartitionNum(), + null + ), + new RichSegmentDescriptor( + SEGMENT1.getInterval(), + Intervals.of("2000-06-01/P1M"), + SEGMENT1.getVersion(), + SEGMENT1.getShardSpec().getPartitionNum(), + null + ), + new RichSegmentDescriptor( + SEGMENT2.getInterval(), + Intervals.of("2000-06-01/P1M"), + SEGMENT2.getVersion(), + SEGMENT2.getShardSpec().getPartitionNum(), + null + ) + ) + ), + NilInputSlice.INSTANCE + ), + slicer.sliceStatic(spec, 2) + ); + } + @Test public void test_sliceStatic_dimFilter() { final TableInputSpec spec = new TableInputSpec( DATASOURCE, null, - new SelectorDimFilter("dim", "bar", null) + new SelectorDimFilter("dim", "bar", null), + null ); Assert.assertEquals( @@ -230,7 +284,8 @@ public void test_sliceStatic_intervalAndDimFilter() Intervals.of("2000/P1M"), Intervals.of("2000-06-01/P1M") ), - new SelectorDimFilter("dim", "bar", null) + new SelectorDimFilter("dim", "bar", null), + null ); Assert.assertEquals( @@ -267,7 +322,7 @@ public void test_sliceStatic_intervalAndDimFilter() @Test public void test_sliceStatic_oneSlice() { - final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null); + final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null); Assert.assertEquals( Collections.singletonList( new SegmentsInputSlice( @@ -297,7 +352,7 @@ public void test_sliceStatic_oneSlice() @Test public void test_sliceStatic_needTwoSlices() { - final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null); + final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null); Assert.assertEquals( ImmutableList.of( new SegmentsInputSlice( @@ -332,7 +387,7 @@ public void test_sliceStatic_needTwoSlices() @Test public void test_sliceStatic_threeSlices() { - final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null); + final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null); Assert.assertEquals( ImmutableList.of( new SegmentsInputSlice( @@ -371,6 +426,7 @@ public void test_sliceDynamic_none() final TableInputSpec spec = new TableInputSpec( DATASOURCE, ImmutableList.of(Intervals.of("2002/P1M")), + null, null ); @@ -386,6 +442,7 @@ public void test_sliceDynamic_maxOneSlice() final TableInputSpec spec = new TableInputSpec( DATASOURCE, ImmutableList.of(Intervals.of("2000/P1M")), + null, null ); @@ -421,6 +478,7 @@ public void test_sliceDynamic_needOne() final TableInputSpec spec = new TableInputSpec( DATASOURCE, ImmutableList.of(Intervals.of("2000/P1M")), + null, null ); @@ -456,6 +514,7 @@ public void test_sliceDynamic_needTwoDueToFiles() final TableInputSpec spec = new TableInputSpec( DATASOURCE, ImmutableList.of(Intervals.of("2000/P1M")), + null, null ); @@ -496,6 +555,7 @@ public void test_sliceDynamic_needTwoDueToBytes() final TableInputSpec spec = new TableInputSpec( DATASOURCE, ImmutableList.of(Intervals.of("2000/P1M")), + null, null ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java index 10aba22939b6..6f4c788c86a1 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java @@ -43,7 +43,27 @@ public void testSerde() throws Exception final TableInputSpec spec = new TableInputSpec( "myds", Collections.singletonList(Intervals.of("2000/P1M")), - new SelectorDimFilter("dim", "val", null) + new SelectorDimFilter("dim", "val", null), + Collections.singletonList("dim") + ); + + Assert.assertEquals( + spec, + mapper.readValue(mapper.writeValueAsString(spec), InputSpec.class) + ); + } + + @Test + public void testSerdeEmptyFilterFields() throws Exception + { + final ObjectMapper mapper = TestHelper.makeJsonMapper() + .registerModules(new MSQIndexingModule().getJacksonModules()); + + final TableInputSpec spec = new TableInputSpec( + "myds", + Collections.singletonList(Intervals.of("2000/P1M")), + new SelectorDimFilter("dim", "val", null), + Collections.emptyList() ); Assert.assertEquals( @@ -61,7 +81,8 @@ public void testSerdeEternityInterval() throws Exception final TableInputSpec spec = new TableInputSpec( "myds", Intervals.ONLY_ETERNITY, - new SelectorDimFilter("dim", "val", null) + new SelectorDimFilter("dim", "val", null), + null ); Assert.assertEquals( diff --git a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java index ed03efac38a6..919159831624 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java +++ b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java @@ -21,10 +21,13 @@ import com.google.common.base.Function; import com.google.common.collect.RangeSet; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.timeline.partition.ShardSpec; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -86,26 +89,6 @@ static byte[] computeCacheKey(byte cacheIdKey, List filters) return retVal.array(); } - /** - * Filter the given iterable of objects by removing any object whose ShardSpec, obtained from the converter function, - * does not fit in the RangeSet of the dimFilter {@link DimFilter#getDimensionRangeSet(String)}. The returned set - * contains the filtered objects in the same order as they appear in input. - * - * If you plan to call this multiple times with the same dimFilter, consider using - * {@link #filterShards(DimFilter, Iterable, Function, Map)} instead with a cached map - * - * @param dimFilter The filter to use - * @param input The iterable of objects to be filtered - * @param converter The function to convert T to ShardSpec that can be filtered by - * @param This can be any type, as long as transform function is provided to convert this to ShardSpec - * - * @return The set of filtered object, in the same order as input - */ - public static Set filterShards(DimFilter dimFilter, Iterable input, Function converter) - { - return filterShards(dimFilter, input, converter, new HashMap<>()); - } - /** * Filter the given iterable of objects by removing any object whose ShardSpec, obtained from the converter function, * does not fit in the RangeSet of the dimFilter {@link DimFilter#getDimensionRangeSet(String)}. The returned set @@ -116,6 +99,7 @@ public static Set filterShards(DimFilter dimFilter, Iterable input, Fu * on same dimensions. * * @param dimFilter The filter to use + * @param filterFields Set of fields to consider for pruning, or null to consider all fields * @param input The iterable of objects to be filtered * @param converter The function to convert T to ShardSpec that can be filtered by * @param dimensionRangeCache The cache of RangeSets of different dimensions for the dimFilter @@ -124,7 +108,8 @@ public static Set filterShards(DimFilter dimFilter, Iterable input, Fu * @return The set of filtered object, in the same order as input */ public static Set filterShards( - final DimFilter dimFilter, + @Nullable final DimFilter dimFilter, + @Nullable final Set filterFields, final Iterable input, final Function converter, final Map>> dimensionRangeCache @@ -140,11 +125,13 @@ public static Set filterShards( Map> filterDomain = new HashMap<>(); List dimensions = shard.getDomainDimensions(); for (String dimension : dimensions) { - Optional> optFilterRangeSet = dimensionRangeCache - .computeIfAbsent(dimension, d -> Optional.ofNullable(dimFilter.getDimensionRangeSet(d))); + if (filterFields == null || filterFields.contains(dimension)) { + Optional> optFilterRangeSet = dimensionRangeCache + .computeIfAbsent(dimension, d -> Optional.ofNullable(dimFilter.getDimensionRangeSet(d))); - if (optFilterRangeSet.isPresent()) { - filterDomain.put(dimension, optFilterRangeSet.get()); + if (optFilterRangeSet.isPresent()) { + filterDomain.put(dimension, optFilterRangeSet.get()); + } } } if (!filterDomain.isEmpty() && !shard.possibleInDomain(filterDomain)) { @@ -158,4 +145,26 @@ public static Set filterShards( } return retSet; } + + /** + * Returns a copy of "fields" only including base fields from {@link DataSourceAnalysis}. + * + * @param fields field list, must be nonnull + * @param dataSourceAnalysis analyzed datasource + */ + public static Set onlyBaseFields( + final Set fields, + final DataSourceAnalysis dataSourceAnalysis + ) + { + final Set retVal = new HashSet<>(); + + for (final String field : fields) { + if (dataSourceAnalysis.isBaseColumn(field)) { + retVal.add(field); + } + } + + return retVal; + } } diff --git a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java index f17ab6aec235..8d3f741087fc 100644 --- a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java +++ b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java @@ -29,6 +29,7 @@ import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.join.JoinPrefixUtils; import javax.annotation.Nullable; import java.util.List; @@ -247,6 +248,24 @@ public boolean isJoin() return !preJoinableClauses.isEmpty(); } + /** + * Returns whether "column" on the analyzed datasource refers to a column from the base datasource. + */ + public boolean isBaseColumn(final String column) + { + if (baseQuery != null) { + return false; + } + + for (final PreJoinableClause clause : preJoinableClauses) { + if (JoinPrefixUtils.isPrefixedBy(column, clause.getPrefix())) { + return false; + } + } + + return true; + } + @Override public boolean equals(Object o) { diff --git a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java index 778db21b0a3e..13e15891d2f3 100644 --- a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java @@ -31,6 +31,7 @@ import org.junit.Test; import javax.annotation.Nullable; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -73,18 +74,31 @@ public void testFilterShards() EasyMock.replay(filter1, shard1, shard2, shard3, shard4, shard5, shard6, shard7); Set expected1 = ImmutableSet.of(shard1, shard4, shard5, shard6, shard7); - assertFilterResult(filter1, shards, expected1); + assertFilterResult(filter1, null, shards, expected1); + assertFilterResult(filter1, Collections.singleton("dim1"), shards, expected1); + assertFilterResult(filter1, Collections.singleton("dim2"), shards, ImmutableSet.copyOf(shards)); + assertFilterResult(filter1, Collections.emptySet(), shards, ImmutableSet.copyOf(shards)); } - private void assertFilterResult(DimFilter filter, Iterable input, Set expected) + private void assertFilterResult( + DimFilter filter, + Set filterFields, + Iterable input, + Set expected + ) { - Set result = DimFilterUtils.filterShards(filter, input, CONVERTER); - Assert.assertEquals(expected, result); - + Set result = new HashSet<>(); Map>> dimensionRangeMap = new HashMap<>(); - result = new HashSet<>(); for (ShardSpec shard : input) { - result.addAll(DimFilterUtils.filterShards(filter, ImmutableList.of(shard), CONVERTER, dimensionRangeMap)); + result.addAll( + DimFilterUtils.filterShards( + filter, + filterFields, + ImmutableList.of(shard), + CONVERTER, + dimensionRangeMap + ) + ); } Assert.assertEquals(expected, result); } diff --git a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java index 1302e504dc9c..1240115221d7 100644 --- a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java +++ b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java @@ -74,6 +74,7 @@ public void testTable() Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); Assert.assertFalse(analysis.isJoin()); + Assert.assertTrue(analysis.isBaseColumn("foo")); } @Test @@ -92,6 +93,7 @@ public void testUnion() Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); Assert.assertFalse(analysis.isJoin()); + Assert.assertTrue(analysis.isBaseColumn("foo")); } @Test @@ -113,6 +115,7 @@ public void testQueryOnTable() ); Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); Assert.assertFalse(analysis.isJoin()); + Assert.assertFalse(analysis.isBaseColumn("foo")); } @Test @@ -135,6 +138,7 @@ public void testQueryOnUnion() ); Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); Assert.assertFalse(analysis.isJoin()); + Assert.assertFalse(analysis.isBaseColumn("foo")); } @Test @@ -152,6 +156,7 @@ public void testLookup() Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); Assert.assertFalse(analysis.isJoin()); + Assert.assertTrue(analysis.isBaseColumn("foo")); } @Test @@ -173,6 +178,7 @@ public void testQueryOnLookup() ); Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); Assert.assertFalse(analysis.isJoin()); + Assert.assertFalse(analysis.isBaseColumn("foo")); } @Test @@ -190,6 +196,7 @@ public void testInline() Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); Assert.assertFalse(analysis.isJoin()); + Assert.assertTrue(analysis.isBaseColumn("foo")); } @Test @@ -237,6 +244,10 @@ public void testJoinSimpleLeftLeaning() analysis.getPreJoinableClauses() ); Assert.assertTrue(analysis.isJoin()); + Assert.assertTrue(analysis.isBaseColumn("foo")); + Assert.assertFalse(analysis.isBaseColumn("1.foo")); + Assert.assertFalse(analysis.isBaseColumn("2.foo")); + Assert.assertFalse(analysis.isBaseColumn("3.foo")); } @Test @@ -282,6 +293,10 @@ public void testJoinSimpleLeftLeaningWithLeftFilter() analysis.getPreJoinableClauses() ); Assert.assertTrue(analysis.isJoin()); + Assert.assertTrue(analysis.isBaseColumn("foo")); + Assert.assertFalse(analysis.isBaseColumn("1.foo")); + Assert.assertFalse(analysis.isBaseColumn("2.foo")); + Assert.assertFalse(analysis.isBaseColumn("3.foo")); } @Test @@ -331,6 +346,10 @@ public void testJoinSimpleRightLeaning() analysis.getPreJoinableClauses() ); Assert.assertTrue(analysis.isJoin()); + Assert.assertTrue(analysis.isBaseColumn("foo")); + Assert.assertTrue(analysis.isBaseColumn("1.foo")); + Assert.assertTrue(analysis.isBaseColumn("2.foo")); + Assert.assertFalse(analysis.isBaseColumn("3.foo")); } @Test @@ -376,6 +395,10 @@ public void testJoinSimpleRightLeaningWithLeftFilter() analysis.getPreJoinableClauses() ); Assert.assertTrue(analysis.isJoin()); + Assert.assertTrue(analysis.isBaseColumn("foo")); + Assert.assertTrue(analysis.isBaseColumn("1.foo")); + Assert.assertTrue(analysis.isBaseColumn("2.foo")); + Assert.assertFalse(analysis.isBaseColumn("3.foo")); } @Test @@ -405,6 +428,8 @@ public void testJoinOverTableSubquery() analysis.getPreJoinableClauses() ); Assert.assertTrue(analysis.isJoin()); + Assert.assertTrue(analysis.isBaseColumn("foo")); + Assert.assertFalse(analysis.isBaseColumn("1.foo")); } @Test @@ -436,6 +461,8 @@ public void testJoinTableUnionToLookup() analysis.getPreJoinableClauses() ); Assert.assertTrue(analysis.isJoin()); + Assert.assertTrue(analysis.isBaseColumn("foo")); + Assert.assertFalse(analysis.isBaseColumn("1.foo")); } @Test @@ -488,6 +515,8 @@ public void testJoinUnderTopLevelSubqueries() analysis.getPreJoinableClauses() ); Assert.assertTrue(analysis.isJoin()); + Assert.assertFalse(analysis.isBaseColumn("foo")); + Assert.assertFalse(analysis.isBaseColumn("1.foo")); } @Test @@ -518,6 +547,8 @@ public void testJoinLookupToLookup() analysis.getPreJoinableClauses() ); Assert.assertTrue(analysis.isJoin()); + Assert.assertTrue(analysis.isBaseColumn("foo")); + Assert.assertFalse(analysis.isBaseColumn("1.foo")); } @Test @@ -548,6 +579,8 @@ public void testJoinLookupToTable() analysis.getPreJoinableClauses() ); Assert.assertTrue(analysis.isJoin()); + Assert.assertTrue(analysis.isBaseColumn("foo")); + Assert.assertFalse(analysis.isBaseColumn("1.foo")); } @Test diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 19df276344ef..2eea9aa594db 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -440,9 +440,10 @@ private Set computeSegmentsToQuery( // Filter unneeded chunks based on partition dimension for (TimelineObjectHolder holder : serversLookup) { final Set> filteredChunks; - if (query.context().isSecondaryPartitionPruningEnabled()) { + if (query.getFilter() != null && query.context().isSecondaryPartitionPruningEnabled()) { filteredChunks = DimFilterUtils.filterShards( query.getFilter(), + DimFilterUtils.onlyBaseFields(query.getFilter().getRequiredColumns(), dataSourceAnalysis), holder.getObject(), partitionChunk -> partitionChunk.getObject().getSegment().getShardSpec(), dimensionRangeCache