Skip to content

Commit

Permalink
Process pure ordering changes with windowing operators (#15241)
Browse files Browse the repository at this point in the history
- adds a new query build path: DruidQuery#toScanAndSortQuery which:
- builds a ScanQuery without considering the current ordering
- builds an operator to execute the sort
- fixes a null string to "null" literal string conversion in the frame serializer code
- fixes some DrillWindowQueryTest cases
- fix NPE in NaiveSortOperator in case there was no input
- enables back CoreRules.AGGREGATE_REMOVE
- adds a processing level OffsetLimit class and uses that instead of just the limit in the rac parts
- earlier window expressions on top of a subquery with an offset may have ignored the offset
  • Loading branch information
kgyrtkirk authored Oct 29, 2023
1 parent 7379477 commit f4a7471
Show file tree
Hide file tree
Showing 45 changed files with 2,367 additions and 345 deletions.
5 changes: 5 additions & 0 deletions processing/src/main/java/org/apache/druid/query/Druids.java
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,11 @@ public ScanQueryBuilder dataSource(String ds)
dataSource = new TableDataSource(ds);
return this;
}
public ScanQueryBuilder dataSource(Query<?> q)
{
dataSource = new QueryDataSource(q);
return this;
}

public ScanQueryBuilder dataSource(DataSource ds)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class NaivePartitioningOperatorFactory implements OperatorFactory
{
Expand Down Expand Up @@ -65,4 +66,23 @@ public String toString()
"partitionColumns=" + partitionColumns +
'}';
}

@Override
public final int hashCode()
{
return Objects.hash(partitionColumns);
}

@Override
public final boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
NaivePartitioningOperatorFactory other = (NaivePartitioningOperatorFactory) obj;
return Objects.equals(partitionColumns, other.partitionColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;

/**
* A naive sort operator is an operation that sorts a stream of data in-place. Generally speaking this means
Expand All @@ -33,11 +34,11 @@
public class NaiveSortOperator implements Operator
{
private final Operator child;
private final ArrayList<ColumnWithDirection> sortColumns;
private final List<ColumnWithDirection> sortColumns;

public NaiveSortOperator(
Operator child,
ArrayList<ColumnWithDirection> sortColumns
List<ColumnWithDirection> sortColumns
)
{
this.child = child;
Expand All @@ -57,7 +58,7 @@ public Closeable goOrContinue(Closeable continuation, Receiver receiver)
public Signal push(RowsAndColumns rac)
{
if (sorter == null) {
sorter = NaiveSortMaker.fromRAC(rac).make(sortColumns);
sorter = NaiveSortMaker.fromRAC(rac).make(new ArrayList<>(sortColumns));
} else {
sorter.moreData(rac);
}
Expand All @@ -67,7 +68,9 @@ public Signal push(RowsAndColumns rac)
@Override
public void completed()
{
receiver.push(sorter.complete());
if (sorter != null) {
receiver.push(sorter.complete());
}
receiver.completed();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,23 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class NaiveSortOperatorFactory implements OperatorFactory
{
private final ArrayList<ColumnWithDirection> sortColumns;
private final List<ColumnWithDirection> sortColumns;

@JsonCreator
public NaiveSortOperatorFactory(
@JsonProperty("columns") ArrayList<ColumnWithDirection> sortColumns
@JsonProperty("columns") List<ColumnWithDirection> sortColumns
)
{
this.sortColumns = sortColumns;
}

@JsonProperty("columns")
public ArrayList<ColumnWithDirection> getSortColumns()
public List<ColumnWithDirection> getSortColumns()
{
return sortColumns;
}
Expand All @@ -56,4 +57,29 @@ public boolean validateEquivalent(OperatorFactory other)
}
return false;
}

@Override
public int hashCode()
{
return Objects.hash(sortColumns);
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
NaiveSortOperatorFactory other = (NaiveSortOperatorFactory) obj;
return Objects.equals(sortColumns, other.sortColumns);
}

@Override
public String toString()
{
return "NaiveSortOperatorFactory{sortColumns=" + sortColumns + "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.operator;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import java.util.Objects;

public class OffsetLimit
{
protected final long offset;
protected final long limit;

public static final OffsetLimit NONE = new OffsetLimit(0, -1);

@JsonCreator
public OffsetLimit(
@JsonProperty("offset") long offset,
@JsonProperty("limit") long limit)
{
Preconditions.checkArgument(offset >= 0, "offset >= 0");
this.offset = offset;
this.limit = limit < 0 ? -1 : limit;
}

@JsonProperty("offset")
public long getOffset()
{
return offset;
}

@JsonProperty("limit")
public long getLimit()
{
return limit;
}

public boolean isPresent()
{
return hasOffset() || hasLimit();
}

public boolean hasOffset()
{
return offset > 0;
}

public boolean hasLimit()
{
return limit >= 0;
}

public static OffsetLimit limit(int limit2)
{
return new OffsetLimit(0, limit2);
}

public long getLimitOrMax()
{
if (limit < 0) {
return Long.MAX_VALUE;
} else {
return limit;
}
}

@Override
public final boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof OffsetLimit)) {
return false;
}
OffsetLimit that = (OffsetLimit) o;
return limit == that.limit && offset == that.offset;
}

@Override
public final int hashCode()
{
return Objects.hash(limit, offset);
}

@Override
public String toString()
{
return "OffsetLimit{" +
"offset=" + offset +
", limit=" + limit +
'}';
}

/**
* Returns the first row index to fetch.
*
* @param maxIndex maximal index accessible
*/
public long getFromIndex(long maxIndex)
{
if (maxIndex <= offset) {
return 0;
}
return offset;
}

/**
* Returns the last row index to fetch (non-inclusive).
*
* @param maxIndex maximal index accessible
*/
public long getToIndex(long maxIndex)
{
if (maxIndex <= offset) {
return 0;
}
if (hasLimit()) {
long toIndex = limit + offset;
return Math.min(maxIndex, toIndex);
} else {
return maxIndex;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ScanOperator implements Operator
private final Operator subOperator;
private final Interval timeRange;
private final Filter filter;
private final int limit;
private final OffsetLimit offsetLimit;
private final List<String> projectedColumns;
private final VirtualColumns virtualColumns;
private final List<ColumnWithDirection> ordering;
Expand All @@ -55,7 +55,7 @@ public ScanOperator(
Interval timeRange,
Filter filter,
List<ColumnWithDirection> ordering,
int limit
OffsetLimit offsetLimit
)
{
this.subOperator = subOperator;
Expand All @@ -64,7 +64,7 @@ public ScanOperator(
this.timeRange = timeRange;
this.filter = filter;
this.ordering = ordering;
this.limit = limit;
this.offsetLimit = offsetLimit == null ? OffsetLimit.NONE : offsetLimit;
}

@Nullable
Expand Down Expand Up @@ -93,8 +93,8 @@ public Signal push(RowsAndColumns rac)
decor.limitTimeRange(timeRange);
}

if (limit > 0) {
decor.setLimit(limit);
if (offsetLimit.isPresent()) {
decor.setOffsetLimit(offsetLimit);
}

if (!(ordering == null || ordering.isEmpty())) {
Expand Down
Loading

0 comments on commit f4a7471

Please sign in to comment.