Skip to content

Commit

Permalink
Use filters for pruning properly for hash-joins.
Browse files Browse the repository at this point in the history
Native used them too aggressively: it might use filters for the RHS
to prune the LHS. MSQ used them not at all. Now, both use them properly,
pruning based on base (LHS) columns only.
  • Loading branch information
gianm committed Nov 1, 2023
1 parent 2ea7177 commit 2c99005
Show file tree
Hide file tree
Showing 12 changed files with 305 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.Set;

/**
* Input spec representing a Druid table.
Expand All @@ -45,28 +46,35 @@ public class TableInputSpec implements InputSpec
@Nullable
private final DimFilter filter;

@Nullable
private final Set<String> filterFields;

/**
* Create a table input spec.
*
* @param dataSource datasource to read
* @param intervals intervals to filter, or null if no time filtering is desired. Interval filtering is strict,
* meaning that when this spec is sliced and read, the returned {@link SegmentWithDescriptor}
* from {@link ReadableInput#getSegment()} are clipped to these intervals.
* @param filter other filters to use for pruning, or null if no pruning is desired. Pruning filters are
* *not strict*, which means that processors must re-apply them when processing the returned
* {@link SegmentWithDescriptor} from {@link ReadableInput#getSegment()}. This matches how
* Broker-based pruning works for native queries.
* @param dataSource datasource to read
* @param intervals intervals to filter, or null if no time filtering is desired. Interval filtering is strict,
* meaning that when this spec is sliced and read, the returned {@link SegmentWithDescriptor}
* from {@link ReadableInput#getSegment()} are clipped to these intervals.
* @param filter other filters to use for pruning, or null if no pruning is desired. Pruning filters are
* *not strict*, which means that processors must re-apply them when processing the returned
* {@link SegmentWithDescriptor} from {@link ReadableInput#getSegment()}. This matches how
* Broker-based pruning works for native queries.
* @param filterFields list of fields from {@link DimFilter#getRequiredColumns()} to consider for pruning. If null,
* all fields are considered for pruning.
*/
@JsonCreator
public TableInputSpec(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("intervals") @Nullable List<Interval> intervals,
@JsonProperty("filter") @Nullable DimFilter filter
@JsonProperty("filter") @Nullable DimFilter filter,
@JsonProperty("filterFields") @Nullable Set<String> filterFields
)
{
this.dataSource = dataSource;
this.intervals = intervals == null ? Intervals.ONLY_ETERNITY : intervals;
this.filter = filter;
this.filterFields = filterFields;
}

@JsonProperty
Expand Down Expand Up @@ -97,6 +105,14 @@ public DimFilter getFilter()
return filter;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public Set<String> getFilterFields()
{
return filterFields;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -109,13 +125,14 @@ public boolean equals(Object o)
TableInputSpec that = (TableInputSpec) o;
return Objects.equals(dataSource, that.dataSource)
&& Objects.equals(intervals, that.intervals)
&& Objects.equals(filter, that.filter);
&& Objects.equals(filter, that.filter)
&& Objects.equals(filterFields, that.filterFields);
}

@Override
public int hashCode()
{
return Objects.hash(dataSource, intervals, filter);
return Objects.hash(dataSource, intervals, filter, filterFields);
}

@Override
Expand All @@ -124,7 +141,8 @@ public String toString()
return "TableInputSpec{" +
"dataSource='" + dataSource + '\'' +
", intervals=" + intervals +
", filter=" + filter +
(filter == null ? "" : ", filter=" + filter) +
(filterFields == null ? "" : ", filterFields=" + filterFields) +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -115,8 +116,10 @@ private Set<DataSegmentWithInterval> getPrunedSegmentSet(final TableInputSpec ta

return DimFilterUtils.filterShards(
tableInputSpec.getFilter(),
tableInputSpec.getFilterFields(),
() -> dataSegmentIterator,
segment -> segment.getSegment().getShardSpec()
segment -> segment.getSegment().getShardSpec(),
new HashMap<>()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.DimFilterUtils;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
Expand All @@ -74,8 +75,12 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Plan for getting data from a {@link DataSource}. Used by {@link QueryKit} implementations.
*/
public class DataSourcePlan
{
/**
Expand Down Expand Up @@ -116,6 +121,22 @@ public class DataSourcePlan
}
}

/**
* Build a plan.
*
* @param queryKit query kit reference for recursive planning
* @param queryId query ID
* @param queryContext query context
* @param dataSource datasource to plan
* @param querySegmentSpec intervals for mandatory pruning. Must be {@link MultipleIntervalSegmentSpec}. The returned
* plan is guaranteed to be filtered to this interval.
* @param filter filter for best-effort pruning. The returned plan may or may not be filtered to this
* filter. Query processing must still apply the filter to generated correct results.
* @param filterFields which fields from the filter to consider for pruning, or null to consider all fields.
* @param maxWorkerCount maximum number of workers for subqueries
* @param minStageNumber starting stage number for subqueries
* @param broadcast whether the plan should broadcast data for this datasource
*/
@SuppressWarnings("rawtypes")
public static DataSourcePlan forDataSource(
final QueryKit queryKit,
Expand All @@ -124,13 +145,31 @@ public static DataSourcePlan forDataSource(
final DataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
@Nullable DimFilter filter,
@Nullable Set<String> filterFields,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
)
{
if (!queryContext.isSecondaryPartitionPruningEnabled()) {
// Clear filter, we don't want to prune today.
filter = null;
}

if (filter != null && filterFields == null) {
// Ensure filterFields is nonnull if filter is nonnull. Helps for other forXYZ methods, so they don't need to
// deal with the case where filter is nonnull but filterFields is null.
filterFields = filter.getRequiredColumns();
}

if (dataSource instanceof TableDataSource) {
return forTable((TableDataSource) dataSource, querySegmentSpecIntervals(querySegmentSpec), filter, broadcast);
return forTable(
(TableDataSource) dataSource,
querySegmentSpecIntervals(querySegmentSpec),
filter,
filterFields,
broadcast
);
} else if (dataSource instanceof ExternalDataSource) {
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forExternal((ExternalDataSource) dataSource, broadcast);
Expand Down Expand Up @@ -179,6 +218,7 @@ public static DataSourcePlan forDataSource(
(UnionDataSource) dataSource,
querySegmentSpec,
filter,
filterFields,
maxWorkerCount,
minStageNumber,
broadcast
Expand All @@ -198,6 +238,8 @@ public static DataSourcePlan forDataSource(
queryContext,
(JoinDataSource) dataSource,
querySegmentSpec,
filter,
filterFields,
maxWorkerCount,
minStageNumber,
broadcast
Expand All @@ -222,21 +264,36 @@ public static DataSourcePlan forDataSource(
}
}

/**
* Possibly remapped datasource that should be used when processing. Will be either the original datasource, or the
* original datasource with itself or some children replaced by {@link InputNumberDataSource}. Any added
* {@link InputNumberDataSource} refer to {@link StageInputSpec} in {@link #getInputSpecs()}.
*/
public DataSource getNewDataSource()
{
return newDataSource;
}

/**
* Input specs that should be used when processing.
*/
public List<InputSpec> getInputSpecs()
{
return inputSpecs;
}

/**
* Which input specs from {@link #getInputSpecs()} are broadcast.
*/
public IntSet getBroadcastInputs()
{
return broadcastInputs;
}

/**
* Returns a {@link QueryDefinitionBuilder} that includes any {@link StageInputSpec} from {@link #getInputSpecs()}.
* Absent if this plan does not involve reading from prior stages.
*/
public Optional<QueryDefinitionBuilder> getSubQueryDefBuilder()
{
return Optional.ofNullable(subQueryDefBuilder);
Expand Down Expand Up @@ -302,12 +359,13 @@ private static DataSourcePlan forTable(
final TableDataSource dataSource,
final List<Interval> intervals,
@Nullable final DimFilter filter,
@Nullable final Set<String> filterFields,
final boolean broadcast
)
{
return new DataSourcePlan(
(broadcast && dataSource.isGlobal()) ? dataSource : new InputNumberDataSource(0),
Collections.singletonList(new TableInputSpec(dataSource.getName(), intervals, filter)),
Collections.singletonList(new TableInputSpec(dataSource.getName(), intervals, filter, filterFields)),
broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(),
null
);
Expand Down Expand Up @@ -407,6 +465,7 @@ private static DataSourcePlan forFilteredDataSource(
dataSource.getBase(),
querySegmentSpec,
null,
null,
maxWorkerCount,
minStageNumber,
broadcast
Expand Down Expand Up @@ -447,6 +506,7 @@ private static DataSourcePlan forUnnest(
dataSource.getBase(),
querySegmentSpec,
null,
null,
maxWorkerCount,
minStageNumber,
broadcast
Expand Down Expand Up @@ -478,6 +538,7 @@ private static DataSourcePlan forUnion(
final UnionDataSource unionDataSource,
final QuerySegmentSpec querySegmentSpec,
@Nullable DimFilter filter,
@Nullable Set<String> filterFields,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
Expand All @@ -499,6 +560,7 @@ private static DataSourcePlan forUnion(
child,
querySegmentSpec,
filter,
filterFields,
maxWorkerCount,
Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()),
broadcast
Expand Down Expand Up @@ -528,6 +590,8 @@ private static DataSourcePlan forBroadcastHashJoin(
final QueryContext queryContext,
final JoinDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
@Nullable final DimFilter filter,
@Nullable final Set<String> filterFields,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
Expand All @@ -542,7 +606,8 @@ private static DataSourcePlan forBroadcastHashJoin(
queryContext,
analysis.getBaseDataSource(),
querySegmentSpec,
null, // Don't push query filters down through a join: this needs some work to ensure pruning works properly.
filter,
filter == null ? null : DimFilterUtils.onlyBaseFields(filterFields, analysis),
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
broadcast
Expand All @@ -561,7 +626,8 @@ private static DataSourcePlan forBroadcastHashJoin(
queryContext,
clause.getDataSource(),
new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY),
null, // Don't push query filters down through a join: this needs some work to ensure pruning works properly.
null, // Don't push down query filters for right-hand side: needs some work to ensure it works properly.
null,
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
true // Always broadcast right-hand side of the join.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public QueryDefinition makeQueryDefinition(
originalQuery.getDataSource(),
originalQuery.getQuerySegmentSpec(),
originalQuery.getFilter(),
null,
maxWorkerCount,
minStageNumber,
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public QueryDefinition makeQueryDefinition(
originalQuery.getDataSource(),
originalQuery.getQuerySegmentSpec(),
originalQuery.getFilter(),
null,
maxWorkerCount,
minStageNumber,
false
Expand Down
Loading

0 comments on commit 2c99005

Please sign in to comment.