Skip to content

Commit

Permalink
WINDOWING - Fix 2 nodes with same digest causing mapping issue (apach…
Browse files Browse the repository at this point in the history
…e#16301)

Fixes the mapping issue in window fucntions where 2 nodes get the same reference.
  • Loading branch information
sreemanamala authored Apr 24, 2024
1 parent 274ccbf commit 080476f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1724,6 +1724,38 @@ public void testSimpleWindowWithEmptyOverNoGroupBy(String contextName, Map<Strin
.verifyResults();
}

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSimpleWindowWithDuplicateSelectNode(String contextName, Map<String, Object> context)
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("m1", ColumnType.FLOAT)
.add("cc", ColumnType.DOUBLE)
.add("cc_dup", ColumnType.DOUBLE)
.build();

testIngestQuery().setSql(" REPLACE INTO foo OVERWRITE ALL\n"
+ "select __time, m1,SUM(m1) OVER() cc,SUM(m1) OVER() cc_dup from foo\n"
+ "PARTITIONED BY ALL CLUSTERED BY m1")
.setExpectedDataSource("foo")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedResultRows(
ImmutableList.of(
new Object[]{946684800000L, 1.0f, 21.0, 21.0},
new Object[]{946771200000L, 2.0f, 21.0, 21.0},
new Object[]{946857600000L, 3.0f, 21.0, 21.0},
new Object[]{978307200000L, 4.0f, 21.0, 21.0},
new Object[]{978393600000L, 5.0f, 21.0, 21.0},
new Object[]{978480000000L, 6.0f, 21.0, 21.0}
)
)
.setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", Intervals.ETERNITY, "test", 0)))
.verifyResults();
}

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSimpleWindowWithJoins(String contextName, Map<String, Object> context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,16 @@ public static Windowing fromCalciteStuff(

// Apply windowProject, if present.
if (partialQuery.getWindowProject() != null) {
// We know windowProject is a mapping due to the isMapping() check in DruidRules. Check for null anyway,
// as defensive programming.
// We know windowProject is a mapping due to the isMapping() check in DruidRules.
// check anyway as defensive programming.
Preconditions.checkArgument(partialQuery.getWindowProject().isMapping());
final Mappings.TargetMapping mapping = Preconditions.checkNotNull(
partialQuery.getWindowProject().getMapping(),
"mapping for windowProject[%s]", partialQuery.getWindowProject()
Project.getPartialMapping(
partialQuery.getWindowProject().getInput().getRowType().getFieldCount(),
partialQuery.getWindowProject().getProjects()
),
"mapping for windowProject[%s]",
partialQuery.getWindowProject()
);

final List<String> windowProjectOutputColumns = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
Expand All @@ -37,6 +38,7 @@
import org.apache.druid.sql.calcite.QueryVerification.QueryResultsVerifier;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand Down Expand Up @@ -231,6 +233,25 @@ public void windowQueryTestWithCustomContextMaxSubqueryBytes(String filename) th
}
}

@Test
public void testWindow()
{
testBuilder()
.sql("SELECT\n" +
"(rank() over (order by count(*) desc)),\n" +
"(rank() over (order by count(*) desc))\n" +
"FROM \"wikipedia\"")
.queryContext(ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true,
QueryContexts.WINDOWING_STRICT_VALIDATION, false
))
.expectedResults(ImmutableList.of(
new Object[]{1L, 1L}
))
.run();
}

private WindowOperatorQuery getWindowOperatorQuery(List<Query<?>> queries)
{
assertEquals(1, queries.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4698,7 +4698,6 @@ public void test_aggregates_winFnQry_21()
windowQueryTest();
}

@NotYetSupported(Modes.NPE)
@DrillTest("first_val/firstValFn_5")
@Test
public void test_first_val_firstValFn_5()
Expand Down Expand Up @@ -4922,15 +4921,13 @@ public void test_frameclause_subQueries_frmInSubQry_46()
windowQueryTest();
}

@NotYetSupported(Modes.NPE)
@DrillTest("lag_func/lag_Fn_82")
@Test
public void test_lag_func_lag_Fn_82()
{
windowQueryTest();
}

@NotYetSupported(Modes.NPE)
@DrillTest("last_val/lastValFn_5")
@Test
public void test_last_val_lastValFn_5()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ enum Modes
COLUMN_NOT_FOUND(DruidException.class, "CalciteContextException.*Column.*not found in any table"),
NULLS_FIRST_LAST(DruidException.class, "NULLS (FIRST|LAST)"),
BIGINT_TO_DATE(DruidException.class, "BIGINT to type (DATE|TIME)"),
NPE(DruidException.class, "java.lang.NullPointerException"),
AGGREGATION_NOT_SUPPORT_TYPE(DruidException.class, "Aggregation \\[(MIN|MAX)\\] does not support type \\[STRING\\]"),
ALLDATA_CSV(DruidException.class, "allData.csv"),
BIGINT_TIME_COMPARE(DruidException.class, "Cannot apply '.' to arguments of type"),
Expand Down

0 comments on commit 080476f

Please sign in to comment.