Skip to content

Commit

Permalink
Fix Windowing/scanAndSort query issues on top of Joins. (#15996)
Browse files Browse the repository at this point in the history
allow a hashjoin result to be converted to RowsAndColumns
added StorageAdapterRowsAndColumns
fix incorrect isConcrete() return values during early phase of planning
  • Loading branch information
kgyrtkirk authored Mar 5, 2024
1 parent 00f8041 commit bb88272
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.rowsandcols;

import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Collections;

/**
* Provides {@link RowsAndColumns} on top of a {@link StorageAdapter}.
*/
public class StorageAdapterRowsAndColumns implements CloseableShapeshifter, RowsAndColumns
{
private final StorageAdapter storageAdapter;
private RowsAndColumns materialized;

public StorageAdapterRowsAndColumns(StorageAdapter storageAdapter)
{
this.storageAdapter = storageAdapter;
}

@SuppressWarnings("unchecked")
@Override
public <T> T as(Class<T> clazz)
{
if (StorageAdapter.class == clazz) {
return (T) storageAdapter;
}
return null;
}

@Override
public Collection<String> getColumnNames()
{
return storageAdapter.getRowSignature().getColumnNames();
}

@Override
public int numRows()
{
return storageAdapter.getNumRows();
}

@Override
public Column findColumn(String name)
{
return getRealRAC().findColumn(name);
}

@Override
public void close()
{
}

protected RowsAndColumns getRealRAC()
{
if (materialized == null) {
materialized = materialize(storageAdapter);
}
return materialized;
}

@Nonnull
private static RowsAndColumns materialize(StorageAdapter as)
{
final Sequence<Cursor> cursors = as.makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);

RowSignature rowSignature = as.getRowSignature();

FrameWriter writer = cursors.accumulate(null, (accumulated, in) -> {
if (accumulated != null) {
// We should not get multiple cursors because we set the granularity to ALL. So, this should never
// actually happen, but it doesn't hurt us to defensive here, so we test against it.
throw new ISE("accumulated[%s] non-null, why did we get multiple cursors?", accumulated);
}

final ColumnSelectorFactory columnSelectorFactory = in.getColumnSelectorFactory();

final FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory(
FrameType.COLUMNAR,
new ArenaMemoryAllocatorFactory(200 << 20), // 200 MB, because, why not?
rowSignature,
Collections.emptyList()
);

final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(columnSelectorFactory);
while (!in.isDoneOrInterrupted()) {
frameWriter.addSelection();
in.advance();
}
return frameWriter;
});

if (writer == null) {
return new EmptyRowsAndColumns();
} else {
final byte[] bytes = writer.toByteArray();
return new FrameRowsAndColumns(Frame.wrap(bytes), rowSignature);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.rowsandcols.StorageAdapterRowsAndColumns;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
Expand All @@ -32,6 +34,7 @@
import org.joda.time.Interval;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -148,4 +151,14 @@ public Optional<Closeable> acquireReferences()
return Optional.empty();
}
}

@SuppressWarnings("unchecked")
@Override
public <T> T as(Class<T> clazz)
{
if (CloseableShapeshifter.class.equals(clazz)) {
return (T) new StorageAdapterRowsAndColumns(this.asStorageAdapter());
}
return SegmentReference.super.as(clazz);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ public RowsAndColumnsHelper expectColumn(String col, double[] expectedVals)
return this;
}

public RowsAndColumnsHelper expectColumn(String col, float[] expectedVals)
{
final ColumnHelper helper = columnHelper(col, expectedVals.length, ColumnType.FLOAT);
helper.setExpectation(expectedVals);
return this;
}

public RowsAndColumnsHelper expectColumn(String col, ColumnType type, Object... expectedVals)
{
return expectColumn(col, expectedVals, type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.junit.Test;

import javax.annotation.Nonnull;

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -66,7 +67,8 @@ private static ArrayList<Object[]> getMakers()
new Object[]{ArrayListRowsAndColumns.class, ArrayListRowsAndColumnsTest.MAKER},
new Object[]{ConcatRowsAndColumns.class, ConcatRowsAndColumnsTest.MAKER},
new Object[]{RearrangedRowsAndColumns.class, RearrangedRowsAndColumnsTest.MAKER},
new Object[]{FrameRowsAndColumns.class, FrameRowsAndColumnsTest.MAKER}
new Object[]{FrameRowsAndColumns.class, FrameRowsAndColumnsTest.MAKER},
new Object[]{StorageAdapterRowsAndColumns.class, StorageAdapterRowsAndColumnsTest.MAKER}
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.rowsandcols;

import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumnsTest;
import org.apache.druid.segment.StorageAdapter;

import java.util.function.Function;

public class StorageAdapterRowsAndColumnsTest extends RowsAndColumnsTestBase
{
public StorageAdapterRowsAndColumnsTest()
{
super(StorageAdapterRowsAndColumns.class);
}

public static Function<MapOfColumnsRowsAndColumns, StorageAdapterRowsAndColumns> MAKER = input -> {
return buildFrame(input);
};

private static StorageAdapterRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input)
{
FrameRowsAndColumns fRAC = FrameRowsAndColumnsTest.buildFrame(input);
return new StorageAdapterRowsAndColumns(fRAC.as(StorageAdapter.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public FrameRowsAndColumnsTest()
return buildFrame(input);
};

private static FrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input)
public static FrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input)
{
LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns(input, null, null, null, OffsetLimit.limit(Integer.MAX_VALUE), null, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.query.rowsandcols.semantic;

import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
Expand All @@ -36,6 +37,8 @@
import java.util.function.BiFunction;
import java.util.function.Function;

import static org.junit.Assume.assumeTrue;

public class ClusteredGroupPartitionerTest extends SemanticTestBase
{
public ClusteredGroupPartitionerTest(
Expand Down Expand Up @@ -132,9 +135,13 @@ public void testDefaultClusteredGroupPartitioner()
@Test
public void testDefaultClusteredGroupPartitionerWithNulls()
{
assumeTrue("testcase only enabled in sqlCompatible mode", NullHandling.sqlCompatible());

RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"sorted", new ObjectArrayColumn(new Object[]{null, null, null, 1, 1, 2, 4, 4, 4}, ColumnType.LONG),
"col_d", new ObjectArrayColumn(new Object[]{null, null, null, 1.0, 1.0, 2.0, 4.0, 4.0, 4.0}, ColumnType.DOUBLE),
"col_f", new ObjectArrayColumn(new Object[]{null, null, null, 1.0f, 1.0f, 2.0f, 4.0f, 4.0f, 4.0f}, ColumnType.FLOAT),
"unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
)
));
Expand All @@ -146,18 +153,26 @@ public void testDefaultClusteredGroupPartitionerWithNulls()
List<RowsAndColumnsHelper> expectations = Arrays.asList(
new RowsAndColumnsHelper()
.expectColumn("sorted", new Object[]{null, null, null}, ColumnType.LONG)
.expectColumn("col_d", new Object[]{null, null, null}, ColumnType.DOUBLE)
.expectColumn("col_f", new Object[]{null, null, null}, ColumnType.FLOAT)
.expectColumn("unsorted", new int[]{3, 54, 21})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{1, 1})
.expectColumn("col_d", new double[]{1.0, 1.0})
.expectColumn("col_f", new float[]{1.0f, 1.0f})
.expectColumn("unsorted", new int[]{1, 5})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{2})
.expectColumn("col_d", new double[]{2.0})
.expectColumn("col_f", new float[]{2.0f})
.expectColumn("unsorted", new int[]{54})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{4, 4, 4})
.expectColumn("col_d", new double[]{4.0, 4.0, 4.0})
.expectColumn("col_f", new float[]{4.0f, 4.0f, 4.0f})
.expectColumn("unsorted", new int[]{2, 3, 92})
.allColumnsRegistered()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,14 @@
*/
public class DruidCorrelateUnnestRel extends DruidRel<DruidCorrelateUnnestRel>
{
private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__correlate_unnest__");
private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__correlate_unnest__")
{
@Override
public boolean isConcrete()
{
return false;
}
};
private static final String BASE_UNNEST_OUTPUT_COLUMN = "unnest";

private final Correlate correlateRel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@
*/
public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
{
private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__join__");
static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__join__")
{
@Override
public boolean isConcrete()
{
return false;
}
};

private final Filter leftFilter;
private final PartialDruidQuery partialQuery;
Expand Down
Loading

0 comments on commit bb88272

Please sign in to comment.