Skip to content

Commit

Permalink
undo+cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kgyrtkirk committed Oct 18, 2023
1 parent 43df657 commit 0918a4f
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
Expand All @@ -32,8 +31,6 @@
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;
Expand All @@ -54,74 +51,36 @@
*/
public class WindowOperatorQuery extends BaseQuery<RowsAndColumns>
{
private final RowSignature rowSignature;
private final List<OperatorFactory> operators;
private final List<OperatorFactory> leafOperators;

public static WindowOperatorQuery build(
DataSource dataSource,
QuerySegmentSpec intervals,
Map<String, Object> context,
RowSignature rowSignature,
List<OperatorFactory> operators
)
private static DataSource validateMaybeRewriteDataSource(DataSource dataSource, boolean hasLeafs)
{
List<OperatorFactory> leafOperators = new ArrayList<OperatorFactory>();
if (hasLeafs) {
return dataSource;
}

// We can re-write scan-style sub queries into an operator instead of doing the actual Scan query. So, we
// check for that and, if we are going to do the rewrite, then we return the sub datasource such that the
// parent constructor in BaseQuery stores the actual data source that we want to be distributed to.

// At this point, we could also reach into a QueryDataSource and validate that the ordering expected by the
// partitioning at least aligns with the ordering coming from the underlying query. We unfortunately don't
// have enough information to validate that the underlying ordering aligns with expectations for the actual
// window operator queries, but maybe we could get that and validate it here too.
if (dataSource instanceof QueryDataSource) {
final Query<?> subQuery = ((QueryDataSource) dataSource).getQuery();
if (subQuery instanceof ScanQuery) {
// transform the scan query into a leaf operator
ScanQuery scan = (ScanQuery) subQuery;
dataSource = subQuery.getDataSource();

ArrayList<ColumnWithDirection> ordering = new ArrayList<>();
for (ScanQuery.OrderBy orderBy : scan.getOrderBys()) {
ordering.add(
new ColumnWithDirection(
orderBy.getColumnName(),
ScanQuery.Order.DESCENDING == orderBy.getOrder()
? ColumnWithDirection.Direction.DESC
: ColumnWithDirection.Direction.ASC));
}

leafOperators.add(
new ScanOperatorFactory(
null,
scan.getFilter(),
(int) scan.getScanRowsLimit(),
scan.getColumns(),
scan.getVirtualColumns(),
ordering));
return subQuery.getDataSource();
}
return dataSource;
} else if (dataSource instanceof InlineDataSource) {
// ok
return dataSource;
} else {
throw new IAE("WindowOperatorQuery must run on top of a query or inline data source, got [%s]", dataSource);
}

return new WindowOperatorQuery(dataSource, intervals, context, rowSignature, operators, leafOperators);
}

private static VirtualColumns vc_union(VirtualColumns virtualColumns, VirtualColumns virtualColumns2)
{
if (virtualColumns2.isEmpty()) {
return virtualColumns;
}

VirtualColumn[] aa = virtualColumns.getVirtualColumns();
VirtualColumn[] aa2 = virtualColumns2.getVirtualColumns();
List<VirtualColumn> vcs = new ArrayList<VirtualColumn>();
for (VirtualColumn virtualColumn : aa) {
vcs.add(virtualColumn);

}
for (VirtualColumn virtualColumn : aa2) {
vcs.add(virtualColumn);

}
return VirtualColumns.create(vcs);
}
private final RowSignature rowSignature;
private final List<OperatorFactory> operators;
private final List<OperatorFactory> leafOperators;

@JsonCreator
public WindowOperatorQuery(
Expand All @@ -134,14 +93,51 @@ public WindowOperatorQuery(
)
{
super(
dataSource,
validateMaybeRewriteDataSource(dataSource, leafOperators != null),
intervals,
false,
context
);
this.rowSignature = rowSignature;
this.operators = operators;
this.leafOperators = Preconditions.checkNotNull(leafOperators, "leafOperators may not be null at this point!");

if (leafOperators == null) {
this.leafOperators = new ArrayList<>();
// We have to double check again because this was validated in a static context before passing to the `super()`
// and we cannot save state from that... Ah well.

if (dataSource instanceof QueryDataSource) {
final Query<?> subQuery = ((QueryDataSource) dataSource).getQuery();
if (subQuery instanceof ScanQuery) {
ScanQuery scan = (ScanQuery) subQuery;

ArrayList<ColumnWithDirection> ordering = new ArrayList<>();
for (ScanQuery.OrderBy orderBy : scan.getOrderBys()) {
ordering.add(
new ColumnWithDirection(
orderBy.getColumnName(),
ScanQuery.Order.DESCENDING == orderBy.getOrder()
? ColumnWithDirection.Direction.DESC
: ColumnWithDirection.Direction.ASC
)
);
}

this.leafOperators.add(
new ScanOperatorFactory(
null,
scan.getFilter(),
(int) scan.getScanRowsLimit(),
scan.getColumns(),
scan.getVirtualColumns(),
ordering
)
);
}
}
} else {
this.leafOperators = leafOperators;
}
}

@JsonProperty("operatorDefinition")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,29 +160,23 @@ public ColumnValueSelector makeColumnValueSelector(@Nonnull String columnName)
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return withColumnAccessor(column, columnAccessor -> {
if (columnAccessor == null) {
return ColumnCapabilitiesImpl.createDefault();
} else {
return new ColumnCapabilitiesImpl()
return withColumnAccessor(column, columnAccessor ->
new ColumnCapabilitiesImpl()
.setType(columnAccessor.getType())
.setHasMultipleValues(false)
.setDictionaryEncoded(false)
.setHasBitmapIndexes(false);
}
});
.setHasBitmapIndexes(false));
}

private <T> T withColumnAccessor(String column, Function<ColumnAccessor, T> fn)
{
@Nullable
ColumnAccessor retVal = accessorCache.get(column);
if (retVal == null) {
Column racColumn = rac.findColumn(column);
if (racColumn == null) {
throw DruidException.defensive("didnt expected this!");
throw DruidException.defensive("Can't find column[%s]", column);
}
retVal = racColumn == null ? null : racColumn.toAccessor();
retVal = racColumn.toAccessor();
accessorCache.put(column, retVal);
}
return fn.apply(retVal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ public class WindowOperatorQueryTest
@Before
public void setUp()
{
query = WindowOperatorQuery.build(
query = new WindowOperatorQuery(
InlineDataSource.fromIterable(new ArrayList<>(), RowSignature.empty()),
new LegacySegmentSpec(Intervals.ETERNITY),
ImmutableMap.of("sally", "sue"),
RowSignature.empty(),
new ArrayList<>()
new ArrayList<>(),
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ public static DruidQuery fromPartialQuery(

if (partialQuery.getWindow() != null) {
if (plannerContext.featureAvailable(EngineFeature.WINDOW_FUNCTIONS)) {
assert (virtualColumnRegistry.isEmpty());
windowing = Preconditions.checkNotNull(
Windowing.fromCalciteStuff(
partialQuery,
Expand Down Expand Up @@ -1443,13 +1442,13 @@ private WindowOperatorQuery toWindowQuery()
return null;
}

// all virtual cols are needed - these columns are only referenced from the aggregates
return WindowOperatorQuery.build(
return new WindowOperatorQuery(
dataSource,
new LegacySegmentSpec(Intervals.ETERNITY),
plannerContext.queryContextMap(),
windowing.getSignature(),
windowing.getOperators()
windowing.getOperators(),
null
);
}

Expand Down

0 comments on commit 0918a4f

Please sign in to comment.