diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQuery.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQuery.java index fcbe2533f4b3..a9e4b21fe81c 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQuery.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQuery.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.DataSource; @@ -32,8 +31,6 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.QuerySegmentSpec; -import org.apache.druid.segment.VirtualColumn; -import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; @@ -54,74 +51,36 @@ */ public class WindowOperatorQuery extends BaseQuery { - private final RowSignature rowSignature; - private final List operators; - private final List leafOperators; - - public static WindowOperatorQuery build( - DataSource dataSource, - QuerySegmentSpec intervals, - Map context, - RowSignature rowSignature, - List operators - ) + private static DataSource validateMaybeRewriteDataSource(DataSource dataSource, boolean hasLeafs) { - List leafOperators = new ArrayList(); + if (hasLeafs) { + return dataSource; + } + // We can re-write scan-style sub queries into an operator instead of doing the actual Scan query. So, we + // check for that and, if we are going to do the rewrite, then we return the sub datasource such that the + // parent constructor in BaseQuery stores the actual data source that we want to be distributed to. + + // At this point, we could also reach into a QueryDataSource and validate that the ordering expected by the + // partitioning at least aligns with the ordering coming from the underlying query. We unfortunately don't + // have enough information to validate that the underlying ordering aligns with expectations for the actual + // window operator queries, but maybe we could get that and validate it here too. if (dataSource instanceof QueryDataSource) { final Query subQuery = ((QueryDataSource) dataSource).getQuery(); if (subQuery instanceof ScanQuery) { - // transform the scan query into a leaf operator - ScanQuery scan = (ScanQuery) subQuery; - dataSource = subQuery.getDataSource(); - - ArrayList ordering = new ArrayList<>(); - for (ScanQuery.OrderBy orderBy : scan.getOrderBys()) { - ordering.add( - new ColumnWithDirection( - orderBy.getColumnName(), - ScanQuery.Order.DESCENDING == orderBy.getOrder() - ? ColumnWithDirection.Direction.DESC - : ColumnWithDirection.Direction.ASC)); - } - - leafOperators.add( - new ScanOperatorFactory( - null, - scan.getFilter(), - (int) scan.getScanRowsLimit(), - scan.getColumns(), - scan.getVirtualColumns(), - ordering)); + return subQuery.getDataSource(); } + return dataSource; } else if (dataSource instanceof InlineDataSource) { - // ok + return dataSource; } else { throw new IAE("WindowOperatorQuery must run on top of a query or inline data source, got [%s]", dataSource); } - - return new WindowOperatorQuery(dataSource, intervals, context, rowSignature, operators, leafOperators); } - private static VirtualColumns vc_union(VirtualColumns virtualColumns, VirtualColumns virtualColumns2) - { - if (virtualColumns2.isEmpty()) { - return virtualColumns; - } - - VirtualColumn[] aa = virtualColumns.getVirtualColumns(); - VirtualColumn[] aa2 = virtualColumns2.getVirtualColumns(); - List vcs = new ArrayList(); - for (VirtualColumn virtualColumn : aa) { - vcs.add(virtualColumn); - - } - for (VirtualColumn virtualColumn : aa2) { - vcs.add(virtualColumn); - - } - return VirtualColumns.create(vcs); - } + private final RowSignature rowSignature; + private final List operators; + private final List leafOperators; @JsonCreator public WindowOperatorQuery( @@ -134,14 +93,51 @@ public WindowOperatorQuery( ) { super( - dataSource, + validateMaybeRewriteDataSource(dataSource, leafOperators != null), intervals, false, context ); this.rowSignature = rowSignature; this.operators = operators; - this.leafOperators = Preconditions.checkNotNull(leafOperators, "leafOperators may not be null at this point!"); + + if (leafOperators == null) { + this.leafOperators = new ArrayList<>(); + // We have to double check again because this was validated in a static context before passing to the `super()` + // and we cannot save state from that... Ah well. + + if (dataSource instanceof QueryDataSource) { + final Query subQuery = ((QueryDataSource) dataSource).getQuery(); + if (subQuery instanceof ScanQuery) { + ScanQuery scan = (ScanQuery) subQuery; + + ArrayList ordering = new ArrayList<>(); + for (ScanQuery.OrderBy orderBy : scan.getOrderBys()) { + ordering.add( + new ColumnWithDirection( + orderBy.getColumnName(), + ScanQuery.Order.DESCENDING == orderBy.getOrder() + ? ColumnWithDirection.Direction.DESC + : ColumnWithDirection.Direction.ASC + ) + ); + } + + this.leafOperators.add( + new ScanOperatorFactory( + null, + scan.getFilter(), + (int) scan.getScanRowsLimit(), + scan.getColumns(), + scan.getVirtualColumns(), + ordering + ) + ); + } + } + } else { + this.leafOperators = leafOperators; + } } @JsonProperty("operatorDefinition") diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultColumnSelectorFactoryMaker.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultColumnSelectorFactoryMaker.java index ec5335f1b46e..cd40eca68f9e 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultColumnSelectorFactoryMaker.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultColumnSelectorFactoryMaker.java @@ -160,29 +160,23 @@ public ColumnValueSelector makeColumnValueSelector(@Nonnull String columnName) @Override public ColumnCapabilities getColumnCapabilities(String column) { - return withColumnAccessor(column, columnAccessor -> { - if (columnAccessor == null) { - return ColumnCapabilitiesImpl.createDefault(); - } else { - return new ColumnCapabilitiesImpl() + return withColumnAccessor(column, columnAccessor -> + new ColumnCapabilitiesImpl() .setType(columnAccessor.getType()) .setHasMultipleValues(false) .setDictionaryEncoded(false) - .setHasBitmapIndexes(false); - } - }); + .setHasBitmapIndexes(false)); } private T withColumnAccessor(String column, Function fn) { - @Nullable ColumnAccessor retVal = accessorCache.get(column); if (retVal == null) { Column racColumn = rac.findColumn(column); if (racColumn == null) { - throw DruidException.defensive("didnt expected this!"); + throw DruidException.defensive("Can't find column[%s]", column); } - retVal = racColumn == null ? null : racColumn.toAccessor(); + retVal = racColumn.toAccessor(); accessorCache.put(column, retVal); } return fn.apply(retVal); diff --git a/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryTest.java b/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryTest.java index c1b30aa53e64..69712c06493c 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryTest.java @@ -49,12 +49,13 @@ public class WindowOperatorQueryTest @Before public void setUp() { - query = WindowOperatorQuery.build( + query = new WindowOperatorQuery( InlineDataSource.fromIterable(new ArrayList<>(), RowSignature.empty()), new LegacySegmentSpec(Intervals.ETERNITY), ImmutableMap.of("sally", "sue"), RowSignature.empty(), - new ArrayList<>() + new ArrayList<>(), + null ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 7ad67e48bf7a..1cf79b6dc123 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -277,7 +277,6 @@ public static DruidQuery fromPartialQuery( if (partialQuery.getWindow() != null) { if (plannerContext.featureAvailable(EngineFeature.WINDOW_FUNCTIONS)) { - assert (virtualColumnRegistry.isEmpty()); windowing = Preconditions.checkNotNull( Windowing.fromCalciteStuff( partialQuery, @@ -1443,13 +1442,13 @@ private WindowOperatorQuery toWindowQuery() return null; } - // all virtual cols are needed - these columns are only referenced from the aggregates - return WindowOperatorQuery.build( + return new WindowOperatorQuery( dataSource, new LegacySegmentSpec(Intervals.ETERNITY), plannerContext.queryContextMap(), windowing.getSignature(), - windowing.getOperators() + windowing.getOperators(), + null ); }