From 40414cfe780b5b45375f98b26b5b61b9ca58d05f Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Mon, 23 Sep 2024 11:39:35 +0530 Subject: [PATCH] MSQ window functions: Reject MVDs during window processing (#17036) * MSQ window functions: Reject MVDs during window processing * MSQ window functions: Reject MVDs during window processing * Remove parameterization from MSQWindowTest --- .../WindowOperatorQueryFrameProcessor.java | 10 + .../apache/druid/msq/exec/MSQWindowTest.java | 367 +++++++++--------- .../rowsandcols/RearrangedRowsAndColumns.java | 13 +- 3 files changed, 200 insertions(+), 190 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index 3dc62f3a60de..aab8f1f1a6bb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -34,6 +34,7 @@ import org.apache.druid.frame.util.SettableLongVirtualColumn; import org.apache.druid.frame.write.FrameWriter; import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.indexing.error.MSQException; @@ -54,6 +55,7 @@ import org.apache.druid.segment.Cursor; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.NullableTypeStrategy; import org.apache.druid.segment.column.RowSignature; @@ -451,6 +453,14 @@ private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List data() - { - Object[][] data = new Object[][]{ - {DEFAULT, DEFAULT_MSQ_CONTEXT} - }; - - return Arrays.asList(data); - } - - - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithPartitionByAndInnerGroupBy(String contextName, Map context) + @Test + public void testWindowOnFooWithPartitionByAndInnerGroupBy() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -108,7 +97,7 @@ public void testWindowOnFooWithPartitionByAndInnerGroupBy(String contextName, Ma ColumnType.FLOAT ) )) - .setContext(context) + .setContext(DEFAULT_MSQ_CONTEXT) .build(); @@ -121,7 +110,7 @@ public void testWindowOnFooWithPartitionByAndInnerGroupBy(String contextName, Ma final WindowOperatorQuery query = new WindowOperatorQuery( new QueryDataSource(groupByQuery), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("d0", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(), ImmutableList.of( new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("d0"))), @@ -151,7 +140,7 @@ public void testWindowOnFooWithPartitionByAndInnerGroupBy(String contextName, Ma new Object[]{5.0f, 5.0}, new Object[]{6.0f, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with().totalFiles(1), @@ -170,9 +159,8 @@ public void testWindowOnFooWithPartitionByAndInnerGroupBy(String contextName, Ma .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contextName, Map context) + @Test + public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -198,7 +186,7 @@ public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contex ColumnType.DOUBLE ) )) - .setContext(context) + .setContext(DEFAULT_MSQ_CONTEXT) .build(); @@ -215,7 +203,7 @@ public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contex final WindowOperatorQuery query = new WindowOperatorQuery( new QueryDataSource(groupByQuery), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder() .add("d0", ColumnType.FLOAT) .add("d1", ColumnType.DOUBLE) @@ -258,7 +246,7 @@ public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contex new Object[]{5.0f, 5.0, 5.0, 21.0}, new Object[]{6.0f, 6.0, 6.0, 21.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with().totalFiles(1), @@ -277,12 +265,8 @@ public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contex .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy( - String contextName, - Map context - ) + @Test + public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -308,7 +292,7 @@ public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy( ColumnType.DOUBLE ) )) - .setContext(context) + .setContext(DEFAULT_MSQ_CONTEXT) .build(); @@ -325,7 +309,7 @@ public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy( final WindowOperatorQuery query = new WindowOperatorQuery( new QueryDataSource(groupByQuery), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder() .add("d0", ColumnType.FLOAT) .add("d1", ColumnType.DOUBLE) @@ -372,7 +356,7 @@ public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy( new Object[]{5.0f, 5.0, 5.0, 5.0}, new Object[]{6.0f, 6.0, 6.0, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with().totalFiles(1), @@ -391,12 +375,8 @@ public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy( .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed( - String contextName, - Map context - ) + @Test + public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -421,7 +401,7 @@ public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed( ColumnType.DOUBLE ) )) - .setContext(context) + .setContext(DEFAULT_MSQ_CONTEXT) .build(); @@ -438,7 +418,7 @@ public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed( final WindowOperatorQuery query = new WindowOperatorQuery( new QueryDataSource(groupByQuery), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder() .add("d0", ColumnType.FLOAT) .add("d1", ColumnType.DOUBLE) @@ -485,7 +465,7 @@ public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed( new Object[]{5.0f, 5.0, 5.0, 5.0}, new Object[]{6.0f, 6.0, 6.0, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with().totalFiles(1), @@ -504,9 +484,8 @@ public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed( .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithEmptyOverWithGroupBy(String contextName, Map context) + @Test + public void testWindowOnFooWithEmptyOverWithGroupBy() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -525,7 +504,7 @@ public void testWindowOnFooWithEmptyOverWithGroupBy(String contextName, Map context) + @Test + public void testWindowOnFooWithNoGroupByAndPartition() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -603,7 +581,7 @@ public void testWindowOnFooWithNoGroupByAndPartition(String contextName, Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put(DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m1\",\"type\":\"FLOAT\"}]") .build(); @@ -617,7 +595,7 @@ public void testWindowOnFooWithNoGroupByAndPartition(String contextName, Map context) + @Test + public void testWindowOnFooWithNoGroupByAndPartitionOnTwoElements() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -668,7 +645,7 @@ public void testWindowOnFooWithNoGroupByAndPartitionOnTwoElements(String context final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"m2\",\"type\":\"DOUBLE\"}]" @@ -685,7 +662,7 @@ public void testWindowOnFooWithNoGroupByAndPartitionOnTwoElements(String context .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("m1", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(), ImmutableList.of( new NaiveSortOperatorFactory(ImmutableList.of( @@ -718,13 +695,12 @@ public void testWindowOnFooWithNoGroupByAndPartitionOnTwoElements(String context new Object[]{5.0f, 5.0}, new Object[]{6.0f, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithNoGroupByAndPartitionByAnother(String contextName, Map context) + @Test + public void testWindowOnFooWithNoGroupByAndPartitionByAnother() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -739,7 +715,7 @@ public void testWindowOnFooWithNoGroupByAndPartitionByAnother(String contextName final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"m2\",\"type\":\"DOUBLE\"}]" @@ -756,7 +732,7 @@ public void testWindowOnFooWithNoGroupByAndPartitionByAnother(String contextName .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("m1", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(), ImmutableList.of( new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("m2"))), @@ -786,13 +762,12 @@ public void testWindowOnFooWithNoGroupByAndPartitionByAnother(String contextName new Object[]{5.0f, 5.0}, new Object[]{6.0f, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithGroupByAndInnerLimit(String contextName, Map context) + @Test + public void testWindowOnFooWithGroupByAndInnerLimit() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -825,10 +800,10 @@ public void testWindowOnFooWithGroupByAndInnerLimit(String contextName, Map context) + @Test + public void testWindowOnFooWithNoGroupByAndPartitionAndVirtualColumns() { final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"v0\",\"type\":\"LONG\"}]" @@ -901,7 +875,7 @@ public void testWindowOnFooWithNoGroupByAndPartitionAndVirtualColumns(String con .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder() .add("v0", ColumnType.LONG) .add("m1", ColumnType.FLOAT) @@ -936,19 +910,18 @@ public void testWindowOnFooWithNoGroupByAndPartitionAndVirtualColumns(String con new Object[]{3, 5.0f, 5.0}, new Object[]{3, 6.0f, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithNoGroupByAndEmptyOver(String contextName, Map context) + @Test + public void testWindowOnFooWithNoGroupByAndEmptyOver() { final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put(DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m1\",\"type\":\"FLOAT\"}]") .build(); @@ -973,7 +946,7 @@ public void testWindowOnFooWithNoGroupByAndEmptyOver(String contextName, Map context) + @Test + public void testWindowOnFooWithPartitionByOrderBYWithJoin() { final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"j0.m2\",\"type\":\"DOUBLE\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]" @@ -1021,7 +993,7 @@ public void testWindowOnFooWithPartitionByOrderBYWithJoin(String contextName, Ma final Map contextWithRowSignature1 = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m2\",\"type\":\"DOUBLE\"},{\"name\":\"v0\",\"type\":\"FLOAT\"}]" @@ -1070,7 +1042,7 @@ public void testWindowOnFooWithPartitionByOrderBYWithJoin(String contextName, Ma .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder() .add("m1", ColumnType.FLOAT) .add("w0", ColumnType.DOUBLE) @@ -1106,17 +1078,16 @@ public void testWindowOnFooWithPartitionByOrderBYWithJoin(String contextName, Ma new Object[]{5.0f, 5.0, 5.0}, new Object[]{6.0f, 6.0, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithEmptyOverWithJoin(String contextName, Map context) + @Test + public void testWindowOnFooWithEmptyOverWithJoin() { final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"j0.m2\",\"type\":\"DOUBLE\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]" @@ -1125,7 +1096,7 @@ public void testWindowOnFooWithEmptyOverWithJoin(String contextName, Map contextWithRowSignature1 = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m2\",\"type\":\"DOUBLE\"},{\"name\":\"v0\",\"type\":\"FLOAT\"}]" @@ -1174,7 +1145,7 @@ public void testWindowOnFooWithEmptyOverWithJoin(String contextName, Map context) + @Test + public void testWindowOnFooWithDim2() { RowSignature rowSignature = RowSignature.builder() .add("dim2", ColumnType.STRING) @@ -1230,7 +1200,7 @@ public void testWindowOnFooWithDim2(String contextName, Map cont final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"dim2\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]" @@ -1247,7 +1217,7 @@ public void testWindowOnFooWithDim2(String contextName, Map cont .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("dim2", ColumnType.STRING).add("w0", ColumnType.DOUBLE).build(), ImmutableList.of( new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("dim2"))), @@ -1287,17 +1257,16 @@ public void testWindowOnFooWithDim2(String contextName, Map cont new Object[]{"abc", 5.0}, new Object[]{null, 8.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithEmptyOverWithUnnest(String contextName, Map context) + @Test + public void testWindowOnFooWithEmptyOverWithUnnest() { final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"j0.unnest\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]" @@ -1333,7 +1302,7 @@ public void testWindowOnFooWithEmptyOverWithUnnest(String contextName, Map context) + @Test + public void testWindowOnFooWithPartitionByAndWithUnnest() { final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"j0.unnest\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]" @@ -1416,7 +1384,7 @@ public void testWindowOnFooWithPartitionByAndWithUnnest(String contextName, Map< .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder() .add("m1", ColumnType.FLOAT) .add("w0", ColumnType.DOUBLE) @@ -1454,14 +1422,13 @@ public void testWindowOnFooWithPartitionByAndWithUnnest(String contextName, Map< new Object[]{5.0f, 5.0, NullHandling.sqlCompatible() ? null : ""}, new Object[]{6.0f, 6.0, NullHandling.sqlCompatible() ? null : ""} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } // Insert Tests - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testInsertWithWindow(String contextName, Map context) + @Test + public void testInsertWithWindow() { List expectedRows = ImmutableList.of( new Object[]{946684800000L, 1.0f, 1.0}, @@ -1484,7 +1451,7 @@ public void testInsertWithWindow(String contextName, Map context + "SUM(m1) OVER(PARTITION BY m1) as summ1\n" + "from foo\n" + "GROUP BY __time, m1 PARTITIONED BY ALL") - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedResultRows(expectedRows) .setExpectedDataSource("foo1") .setExpectedRowSignature(rowSignature) @@ -1492,9 +1459,8 @@ public void testInsertWithWindow(String contextName, Map context } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testInsertWithWindowEmptyOver(String contextName, Map context) + @Test + public void testInsertWithWindowEmptyOver() { List expectedRows = ImmutableList.of( new Object[]{946684800000L, 1.0f, 21.0}, @@ -1517,7 +1483,7 @@ public void testInsertWithWindowEmptyOver(String contextName, Map context) + @Test + public void testInsertWithWindowPartitionByOrderBy() { List expectedRows = ImmutableList.of( new Object[]{946684800000L, 1.0f, 1.0}, @@ -1550,7 +1515,7 @@ public void testInsertWithWindowPartitionByOrderBy(String contextName, Map context) + @Test + public void testReplaceWithWindowsAndUnnest() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -1576,7 +1540,7 @@ public void testReplaceWithWindowsAndUnnest(String contextName, Map context) + @Test + public void testSimpleWindowWithPartitionBy() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -1609,7 +1572,7 @@ public void testSimpleWindowWithPartitionBy(String contextName, Map context) + @Test + public void testSimpleWindowWithEmptyOver() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -1640,7 +1602,7 @@ public void testSimpleWindowWithEmptyOver(String contextName, Map context) + @Test + public void testSimpleWindowWithEmptyOverNoGroupBy() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -1671,7 +1632,7 @@ public void testSimpleWindowWithEmptyOverNoGroupBy(String contextName, Map context) + @Test + public void testSimpleWindowWithDuplicateSelectNode() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -1703,7 +1663,7 @@ public void testSimpleWindowWithDuplicateSelectNode(String contextName, Map context) + @Test + public void testSimpleWindowWithJoins() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -1735,7 +1694,7 @@ public void testSimpleWindowWithJoins(String contextName, Map co + "PARTITIONED BY DAY CLUSTERED BY m1") .setExpectedDataSource("foo1") .setExpectedRowSignature(rowSignature) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY) .setExpectedResultRows( ImmutableList.of( @@ -1761,9 +1720,8 @@ public void testSimpleWindowWithJoins(String contextName, Map co } // Bigger dataset tests - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testSelectWithWikipedia(String contextName, Map context) + @Test + public void testSelectWithWikipedia() { RowSignature rowSignature = RowSignature.builder() .add("cityName", ColumnType.STRING) @@ -1779,7 +1737,7 @@ public void testSelectWithWikipedia(String contextName, Map cont final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"added\",\"type\":\"LONG\"},{\"name\":\"cityName\",\"type\":\"STRING\"}]" @@ -1797,7 +1755,7 @@ public void testSelectWithWikipedia(String contextName, Map cont .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("cityName", ColumnType.STRING) .add("added", ColumnType.LONG) .add("w0", ColumnType.LONG).build(), @@ -1830,17 +1788,16 @@ public void testSelectWithWikipedia(String contextName, Map cont new Object[]{"Albuquerque", 9L, 140L}, new Object[]{"Albuquerque", 2L, 140L} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testSelectWithWikipediaEmptyOverWithCustomContext(String contextName, Map context) + @Test + public void testSelectWithWikipediaEmptyOverWithCustomContext() { final Map customContext = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW, 200) .build(); @@ -1852,9 +1809,8 @@ public void testSelectWithWikipediaEmptyOverWithCustomContext(String contextName .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testSelectWithWikipediaWithPartitionKeyNotInSelect(String contextName, Map context) + @Test + public void testSelectWithWikipediaWithPartitionKeyNotInSelect() { RowSignature rowSignature = RowSignature.builder() .add("cityName", ColumnType.STRING) @@ -1870,7 +1826,7 @@ public void testSelectWithWikipediaWithPartitionKeyNotInSelect(String contextNam final Map innerContextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"added\",\"type\":\"LONG\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"}]" @@ -1888,7 +1844,7 @@ public void testSelectWithWikipediaWithPartitionKeyNotInSelect(String contextNam .context(innerContextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("cityName", ColumnType.STRING) .add("added", ColumnType.LONG) .add("w0", ColumnType.LONG).build(), @@ -1902,7 +1858,7 @@ public void testSelectWithWikipediaWithPartitionKeyNotInSelect(String contextNam final Map outerContextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"added\",\"type\":\"LONG\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"w0\",\"type\":\"LONG\"}]" @@ -1941,13 +1897,12 @@ public void testSelectWithWikipediaWithPartitionKeyNotInSelect(String contextNam new Object[]{"Tokyo", 0L, 12615L}, new Object[]{"Santiago", 161L, 401L} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testGroupByWithWikipedia(String contextName, Map context) + @Test + public void testGroupByWithWikipedia() { RowSignature rowSignature = RowSignature.builder() .add("cityName", ColumnType.STRING) @@ -1972,7 +1927,7 @@ public void testGroupByWithWikipedia(String contextName, Map con ColumnType.LONG ) )) - .setContext(context) + .setContext(DEFAULT_MSQ_CONTEXT) .build(); @@ -1985,7 +1940,7 @@ public void testGroupByWithWikipedia(String contextName, Map con final WindowOperatorQuery query = new WindowOperatorQuery( new QueryDataSource(groupByQuery), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("d0", ColumnType.STRING) .add("d1", ColumnType.LONG) .add("w0", ColumnType.LONG).build(), @@ -2019,13 +1974,12 @@ public void testGroupByWithWikipedia(String contextName, Map con new Object[]{"Albuquerque", 9L, 140L}, new Object[]{"Albuquerque", 129L, 140L} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testReplaceGroupByOnWikipedia(String contextName, Map context) + @Test + public void testReplaceGroupByOnWikipedia() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -2041,7 +1995,7 @@ public void testReplaceGroupByOnWikipedia(String contextName, Map context) + @Test + public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers() { - final Map multipleWorkerContext = new HashMap<>(context); + final Map multipleWorkerContext = new HashMap<>(DEFAULT_MSQ_CONTEXT); multipleWorkerContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 5); final RowSignature rowSignature = RowSignature.builder() @@ -2286,9 +2239,8 @@ public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers(String cont .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testReplaceWithPartitionedByDayOnWikipedia(String contextName, Map context) + @Test + public void testReplaceWithPartitionedByDayOnWikipedia() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -2304,7 +2256,7 @@ public void testReplaceWithPartitionedByDayOnWikipedia(String contextName, Map