Skip to content

Commit

Permalink
MSQ window functions: Fix partition boundary issues for arrays (apach…
Browse files Browse the repository at this point in the history
…e#16780)

* MSQ window functions: Fix partition boundary issues for arrays

* Address review comments

* Cache type strategies

* Trigger Build

* Convert typeStrategies from list to array
  • Loading branch information
Akshat-Jain authored Jul 24, 2024
1 parent 302739a commit a0437b6
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.column.NullableTypeStrategy;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;
Expand All @@ -59,7 +60,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

Expand All @@ -85,6 +85,10 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private ResultRow outputRow = null;
private FrameWriter frameWriter = null;

// List of type strategies to compare the partition columns across rows.
// Type strategies are pushed in the same order as column types in frameReader.signature()
private final NullableTypeStrategy[] typeStrategies;

public WindowOperatorQueryFrameProcessor(
WindowOperatorQuery query,
ReadableFrameChannel inputChannel,
Expand All @@ -103,13 +107,18 @@ public WindowOperatorQueryFrameProcessor(
this.frameWriterFactory = frameWriterFactory;
this.operatorFactoryList = operatorFactoryList;
this.jsonMapper = jsonMapper;
this.frameReader = frameReader;
this.query = query;
this.frameRowsAndCols = new ArrayList<>();
this.resultRowAndCols = new ArrayList<>();
this.objectsOfASingleRac = new ArrayList<>();
this.maxRowsMaterialized = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;

this.frameReader = frameReader;
this.typeStrategies = new NullableTypeStrategy[frameReader.signature().size()];
for (int i = 0; i < frameReader.signature().size(); i++) {
typeStrategies[i] = frameReader.signature().getColumnType(i).get().getNullableStrategy();
}
}

@Override
Expand Down Expand Up @@ -499,7 +508,7 @@ private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<String
int match = 0;
for (String columnName : partitionColumnNames) {
int i = frameReader.signature().indexOf(columnName);
if (Objects.equals(row1.get(i), row2.get(i))) {
if (typeStrategies[i].compare(row1.get(i), row2.get(i)) == 0) {
match++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7750,4 +7750,25 @@ public void test_empty_and_non_empty_over_wikipedia_query_3()
{
windowQueryTest();
}

@DrillTest("druid_queries/partition_by_array/wikipedia_query_1")
@Test
public void test_partition_by_array_wikipedia_query_1()
{
windowQueryTest();
}

@DrillTest("druid_queries/partition_by_array/wikipedia_query_2")
@Test
public void test_partition_by_array_wikipedia_query_2()
{
windowQueryTest();
}

@DrillTest("druid_queries/partition_by_array/wikipedia_query_3")
@Test
public void test_partition_by_array_wikipedia_query_3()
{
windowQueryTest();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Austria null #de.wikipedia 1
Republic of Korea null #en.wikipedia 2
Republic of Korea null #ja.wikipedia 3
Republic of Korea null #ko.wikipedia 4
Republic of Korea Seoul #ko.wikipedia 1
Austria Vienna #de.wikipedia 1
Austria Vienna #es.wikipedia 2
Austria Vienna #tr.wikipedia 3
Republic of Korea Jeonju #ko.wikipedia 4
Republic of Korea Suwon-si #ko.wikipedia 1
Austria Horsching #de.wikipedia 1
Republic of Korea Seongnam-si #ko.wikipedia 1
Republic of Korea Yongsan-dong #ko.wikipedia 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
select
countryName, cityName, channel,
row_number() over (partition by array[1,2,length(cityName)] order by countryName) as c
from wikipedia
where countryName in ('Austria', 'Republic of Korea')
group by countryName, cityName, channel
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Austria null #de.wikipedia 1
Austria Horsching #de.wikipedia 2
Austria Vienna #de.wikipedia 3
Austria Vienna #es.wikipedia 4
Austria Vienna #tr.wikipedia 5
Republic of Korea null #en.wikipedia 6
Republic of Korea null #ja.wikipedia 7
Republic of Korea null #ko.wikipedia 8
Republic of Korea Jeonju #ko.wikipedia 9
Republic of Korea Seongnam-si #ko.wikipedia 10
Republic of Korea Seoul #ko.wikipedia 11
Republic of Korea Suwon-si #ko.wikipedia 12
Republic of Korea Yongsan-dong #ko.wikipedia 13
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
select
countryName, cityName, channel,
row_number() over (partition by array[1,2,3] order by countryName) as c
from wikipedia
where countryName in ('Austria', 'Republic of Korea')
group by countryName, cityName, channel
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Austria null #de.wikipedia 1
Austria Vienna #de.wikipedia 1
Austria Vienna #es.wikipedia 2
Austria Vienna #tr.wikipedia 3
Austria Horsching #de.wikipedia 1
Republic of Korea null #en.wikipedia 1
Republic of Korea null #ja.wikipedia 2
Republic of Korea null #ko.wikipedia 3
Republic of Korea Seoul #ko.wikipedia 1
Republic of Korea Jeonju #ko.wikipedia 1
Republic of Korea Suwon-si #ko.wikipedia 1
Republic of Korea Seongnam-si #ko.wikipedia 1
Republic of Korea Yongsan-dong #ko.wikipedia 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
select
countryName, cityName, channel,
row_number() over (partition by array[1,length(countryName),length(cityName)] order by countryName) as c
from wikipedia
where countryName in ('Austria', 'Republic of Korea')
group by countryName, cityName, channel

0 comments on commit a0437b6

Please sign in to comment.