Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSQ window functions: Fix partition boundary issues for arrays #16780

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.column.NullableTypeStrategy;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;
Expand All @@ -59,7 +60,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

Expand All @@ -85,6 +85,10 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private ResultRow outputRow = null;
private FrameWriter frameWriter = null;

// List of type strategies to compare the partition columns across rows.
// Type strategies are pushed in the same order as column types in frameReader.signature()
private final List<NullableTypeStrategy<Object>> typeStrategies = new ArrayList<>();
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved

public WindowOperatorQueryFrameProcessor(
WindowOperatorQuery query,
ReadableFrameChannel inputChannel,
Expand All @@ -103,13 +107,17 @@ public WindowOperatorQueryFrameProcessor(
this.frameWriterFactory = frameWriterFactory;
this.operatorFactoryList = operatorFactoryList;
this.jsonMapper = jsonMapper;
this.frameReader = frameReader;
this.query = query;
this.frameRowsAndCols = new ArrayList<>();
this.resultRowAndCols = new ArrayList<>();
this.objectsOfASingleRac = new ArrayList<>();
this.maxRowsMaterialized = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;

this.frameReader = frameReader;
for (int i = 0; i < frameReader.signature().size(); i++) {
typeStrategies.add(frameReader.signature().getColumnType(i).get().getNullableStrategy());
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
Expand Down Expand Up @@ -499,7 +507,7 @@ private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<String
int match = 0;
for (String columnName : partitionColumnNames) {
int i = frameReader.signature().indexOf(columnName);
if (Objects.equals(row1.get(i), row2.get(i))) {
if (typeStrategies.get(i).compare(row1.get(i), row2.get(i)) == 0) {
match++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7750,4 +7750,25 @@ public void test_empty_and_non_empty_over_wikipedia_query_3()
{
windowQueryTest();
}

@DrillTest("druid_queries/partition_by_array/wikipedia_query_1")
@Test
public void test_partition_by_array_wikipedia_query_1()
{
windowQueryTest();
}

@DrillTest("druid_queries/partition_by_array/wikipedia_query_2")
@Test
public void test_partition_by_array_wikipedia_query_2()
{
windowQueryTest();
}

@DrillTest("druid_queries/partition_by_array/wikipedia_query_3")
@Test
public void test_partition_by_array_wikipedia_query_3()
{
windowQueryTest();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Austria null #de.wikipedia 1
Republic of Korea null #en.wikipedia 2
Republic of Korea null #ja.wikipedia 3
Republic of Korea null #ko.wikipedia 4
Republic of Korea Seoul #ko.wikipedia 1
Austria Vienna #de.wikipedia 1
Austria Vienna #es.wikipedia 2
Austria Vienna #tr.wikipedia 3
Republic of Korea Jeonju #ko.wikipedia 4
Republic of Korea Suwon-si #ko.wikipedia 1
Austria Horsching #de.wikipedia 1
Republic of Korea Seongnam-si #ko.wikipedia 1
Republic of Korea Yongsan-dong #ko.wikipedia 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
select
countryName, cityName, channel,
row_number() over (partition by array[1,2,length(cityName)] order by countryName) as c
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
from wikipedia
where countryName in ('Austria', 'Republic of Korea')
group by countryName, cityName, channel
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Austria null #de.wikipedia 1
Austria Horsching #de.wikipedia 2
Austria Vienna #de.wikipedia 3
Austria Vienna #es.wikipedia 4
Austria Vienna #tr.wikipedia 5
Republic of Korea null #en.wikipedia 6
Republic of Korea null #ja.wikipedia 7
Republic of Korea null #ko.wikipedia 8
Republic of Korea Jeonju #ko.wikipedia 9
Republic of Korea Seongnam-si #ko.wikipedia 10
Republic of Korea Seoul #ko.wikipedia 11
Republic of Korea Suwon-si #ko.wikipedia 12
Republic of Korea Yongsan-dong #ko.wikipedia 13
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
select
countryName, cityName, channel,
row_number() over (partition by array[1,2,3] order by countryName) as c
from wikipedia
where countryName in ('Austria', 'Republic of Korea')
group by countryName, cityName, channel
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Austria null #de.wikipedia 1
Austria Vienna #de.wikipedia 1
Austria Vienna #es.wikipedia 2
Austria Vienna #tr.wikipedia 3
Austria Horsching #de.wikipedia 1
Republic of Korea null #en.wikipedia 1
Republic of Korea null #ja.wikipedia 2
Republic of Korea null #ko.wikipedia 3
Republic of Korea Seoul #ko.wikipedia 1
Republic of Korea Jeonju #ko.wikipedia 1
Republic of Korea Suwon-si #ko.wikipedia 1
Republic of Korea Seongnam-si #ko.wikipedia 1
Republic of Korea Yongsan-dong #ko.wikipedia 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
select
countryName, cityName, channel,
row_number() over (partition by array[1,length(countryName),length(cityName)] order by countryName) as c
from wikipedia
where countryName in ('Austria', 'Republic of Korea')
group by countryName, cityName, channel
Loading