Skip to content

Commit

Permalink
Adding multiWorker test case.
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptoe committed Oct 12, 2023
1 parent 0def1aa commit 00e0829
Showing 1 changed file with 143 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.LikeDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
Expand Down Expand Up @@ -1253,7 +1254,7 @@ public void testGroupByOrderByAggregationWithLimitAndOffset()
}

@Test
public void testExternSelect1() throws IOException
public void testExternGroupBy() throws IOException
{
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
Expand Down Expand Up @@ -1339,6 +1340,147 @@ public void testExternSelect1() throws IOException
.verifyResults();
}


@Test
public void testExternSelectWithMultipleWorkers() throws IOException
{
Map<String, Object> multipleWorkerContext = new HashMap<>(context);
multipleWorkerContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 3);

final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());

RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("user", ColumnType.STRING)
.build();

final ScanQuery expectedQuery =
newScanQueryBuilder().dataSource(
new ExternalDataSource(
new LocalInputSource(null, null, ImmutableList.of(toRead.getAbsoluteFile(), toRead.getAbsoluteFile())),
new JsonInputFormat(null, null, null, null, null),
RowSignature.builder()
.add("timestamp", ColumnType.STRING)
.add("page", ColumnType.STRING)
.add("user", ColumnType.STRING)
.build()
)
).eternityInterval().virtualColumns(
new ExpressionVirtualColumn(
"v0",
"timestamp_floor(timestamp_parse(\"timestamp\",null,'UTC'),'P1D',null,'UTC')",
ColumnType.LONG,
CalciteTests.createExprMacroTable()
)
).columns("user", "v0").filters(new LikeDimFilter("user", "%bot%", null, null))
.context(defaultScanQueryContext(multipleWorkerContext, RowSignature.builder()
.add(
"user",
ColumnType.STRING
)
.add(
"v0",
ColumnType.LONG
)
.build()))
.build();

SelectTester selectTester = testSelectQuery()
.setSql("SELECT\n"
+ " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n"
+ " user\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + toReadAsJson + "," + toReadAsJson + "],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+ " )\n"
+ ") where user like '%bot%'")
.setExpectedRowSignature(rowSignature)
.setQueryContext(multipleWorkerContext)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1466985600000L, "Lsjbot"},
new Object[]{1466985600000L, "Lsjbot"},
new Object[]{1466985600000L, "Beau.bot"},
new Object[]{1466985600000L, "Beau.bot"},
new Object[]{1466985600000L, "Lsjbot"},
new Object[]{1466985600000L, "Lsjbot"}
))
.setExpectedMSQSpec(
MSQSpec
.builder()
.query(expectedQuery)
.columnMappings(new ColumnMappings(
ImmutableList.of(
new ColumnMapping("v0", "__time"),
new ColumnMapping("user", "user")
)
))
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(isDurableStorageDestination()
? DurableStorageMSQDestination.INSTANCE
: TaskReportMSQDestination.INSTANCE)
.build()
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(3).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with()
.rows(isPageSizeLimited() ? new long[]{1L, 2L} : new long[]{3L})
.frames(isPageSizeLimited() ? new long[]{1L, 1L} : new long[]{1L}),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1),
0, 1, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(3).frames(1),
0, 1, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with()
.rows(isPageSizeLimited() ? new long[]{1L, 2L} : new long[]{3L})
.frames(isPageSizeLimited() ? new long[]{1L, 1L} : new long[]{1L}),
0, 1, "shuffle"
);
// adding result stage counter checks
if (isPageSizeLimited()) {
selectTester = selectTester.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(2).frames(1),
1, 0, "input0"
).setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(2).frames(1),
1, 0, "output"
);
selectTester = selectTester.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(0, 4).frames(0, 1),
1, 1, "input0"
).setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(0, 4).frames(0, 1),
1, 1, "output"
);
}
selectTester.verifyResults();
}

@Test
public void testIncorrectSelectQuery()
{
Expand Down

0 comments on commit 00e0829

Please sign in to comment.