From 90564dc07b7ad3ea3bbbf02c032248d4230b4abb Mon Sep 17 00:00:00 2001 From: cryptoe Date: Fri, 15 Sep 2023 11:30:09 +0530 Subject: [PATCH 01/10] Initial patch for page size limiting --- docs/multi-stage-query/reference.md | 1 + .../apache/druid/msq/exec/ControllerImpl.java | 40 ++- .../druid/msq/sql/entity/PageInformation.java | 63 +++- .../sql/resources/SqlStatementResource.java | 15 +- .../msq/util/MultiStageQueryContext.java | 12 + .../msq/util/SqlStatementResourceHelper.java | 75 ++-- .../apache/druid/msq/exec/MSQSelectTest.java | 21 +- .../sql/entity/ResultSetInformationTest.java | 2 +- .../sql/entity/SqlStatementResultTest.java | 2 +- .../apache/druid/msq/test/MSQTestBase.java | 47 ++- .../util/SqlStatementResourceHelperTest.java | 334 ++++++++++++++++++ .../java/org/apache/druid/frame/Frame.java | 2 +- 12 files changed, 517 insertions(+), 97 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 6236b6545258..c758715f7709 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -246,6 +246,7 @@ The following table lists the context parameters for the MSQ task engine: | `durableShuffleStorage` | SELECT, INSERT, REPLACE

Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error.

| `false` | | `faultTolerance` | SELECT, INSERT, REPLACE

Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` | | `selectDestination` | SELECT

Controls where the final result of the select query is written.
Use `taskReport`(the default) to write select results to the task report. This is not scalable since task reports size explodes for large results
Use `durableStorage` to write results to durable storage location. For large results sets, its recommended to use `durableStorage` . To configure durable storage see [`this`](#durable-storage) section. | `taskReport` | +| `rowsPerPage` | SELECT

The number of rows per page to target. The actual number of rows per page may be somewhat higher or lower than this number. In most cases, use the default.
This property comes into effect only when `selectDestination` is set to `durableStorage` | 100000 | ## Joins diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index d883a587e9b5..2e987c76b575 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1556,8 +1556,13 @@ private static QueryDefinition makeQueryDefinition( shuffleSpecFactory = ShuffleSpecFactories.singlePartition(); queryToPlan = querySpec.getQuery(); } else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) { - // we add a final stage which generates one partition per worker. - shuffleSpecFactory = ShuffleSpecFactories.globalSortWithMaxPartitionCount(tuningConfig.getMaxNumWorkers()); + + shuffleSpecFactory = (clusterBy, aggregate) -> + new GlobalSortTargetSizeShuffleSpec( + clusterBy, + MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()), + aggregate + ); queryToPlan = querySpec.getQuery(); } else { throw new ISE("Unsupported destination [%s]", querySpec.getDestination()); @@ -1635,22 +1640,25 @@ private static QueryDefinition makeQueryDefinition( return queryDef; } else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) { - // attaching new query results stage always. + // attaching new query results stage if the final stage does sort during shuffle so that results are ordered. StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition(); - final QueryDefinitionBuilder builder = QueryDefinition.builder(); - for (final StageDefinition stageDef : queryDef.getStageDefinitions()) { - builder.add(StageDefinition.builder(stageDef)); - } - - builder.add(StageDefinition.builder(queryDef.getNextStageNumber()) - .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) - .maxWorkerCount(tuningConfig.getMaxNumWorkers()) - .signature(finalShuffleStageDef.getSignature()) - .shuffleSpec(null) - .processorFactory(new QueryResultFrameProcessorFactory()) - ); + if (finalShuffleStageDef.doesSortDuringShuffle()) { + final QueryDefinitionBuilder builder = QueryDefinition.builder(); + for (final StageDefinition stageDef : queryDef.getStageDefinitions()) { + builder.add(StageDefinition.builder(stageDef)); + } - return builder.build(); + builder.add(StageDefinition.builder(queryDef.getNextStageNumber()) + .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) + .maxWorkerCount(tuningConfig.getMaxNumWorkers()) + .signature(finalShuffleStageDef.getSignature()) + .shuffleSpec(null) + .processorFactory(new QueryResultFrameProcessorFactory()) + ); + return builder.build(); + } else { + return queryDef; + } } else { throw new ISE("Unsupported destination [%s]", querySpec.getDestination()); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/PageInformation.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/PageInformation.java index 6db1f371af7c..f50716f108cd 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/PageInformation.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/PageInformation.java @@ -21,11 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import javax.annotation.Nullable; -import java.util.Comparator; import java.util.Objects; /** @@ -39,6 +39,14 @@ public class PageInformation @Nullable private final Long sizeInBytes; + // Worker field should not flow to the users of SqlStatementResource API since users should not care about worker + @Nullable + private final Integer worker; + + // Partition field should not flow to the users of SqlStatementResource API since users should not care about partitions + @Nullable + private final Integer partition; + @JsonCreator public PageInformation( @JsonProperty("id") long id, @@ -49,8 +57,27 @@ public PageInformation( this.id = id; this.numRows = numRows; this.sizeInBytes = sizeInBytes; + this.worker = null; + this.partition = null; } + + public PageInformation( + long id, + Long numRows, + Long sizeInBytes, + Integer worker, + Integer partition + ) + { + this.id = id; + this.numRows = numRows; + this.sizeInBytes = sizeInBytes; + this.worker = worker; + this.partition = partition; + } + + @JsonProperty public long getId() { @@ -74,6 +101,20 @@ public Long getSizeInBytes() } + @Nullable + @JsonIgnore + public Integer getWorker() + { + return worker; + } + + @Nullable + @JsonIgnore + public Integer getPartition() + { + return partition; + } + @Override public boolean equals(Object o) { @@ -87,13 +128,13 @@ public boolean equals(Object o) return id == that.id && Objects.equals(numRows, that.numRows) && Objects.equals( sizeInBytes, that.sizeInBytes - ); + ) && Objects.equals(worker, that.worker) && Objects.equals(partition, that.partition); } @Override public int hashCode() { - return Objects.hash(id, numRows, sizeInBytes); + return Objects.hash(id, numRows, sizeInBytes, worker, partition); } @Override @@ -103,20 +144,8 @@ public String toString() "id=" + id + ", numRows=" + numRows + ", sizeInBytes=" + sizeInBytes + + ", worker=" + worker + + ", partition=" + partition + '}'; } - - public static Comparator getIDComparator() - { - return new PageComparator(); - } - - public static class PageComparator implements Comparator - { - @Override - public int compare(PageInformation s1, PageInformation s2) - { - return Long.compare(s1.getId(), s2.getId()); - } - } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index 1a1acaa008e5..f1fdcc4e0e3f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -489,7 +489,7 @@ private Response buildNonOkResponse(DruidException exception) } @SuppressWarnings("ReassignedVariable") - private Optional getSampleResults( + private Optional getResultSetInformation( String queryId, String dataSource, SqlStatementState sqlStatementState, @@ -598,7 +598,7 @@ private Optional getStatementStatus(String queryId, String c taskResponse.getStatus().getCreatedTime(), signature.orElse(null), taskResponse.getStatus().getDuration(), - withResults ? getSampleResults( + withResults ? getResultSetInformation( queryId, msqControllerTask.getDataSource(), sqlStatementState, @@ -737,11 +737,16 @@ private Optional> getResultYielder( || selectedPageId.equals(pageInformation.getId())) .map(pageInformation -> { try { + if (pageInformation.getWorker() == null || pageInformation.getPartition() == null) { + throw DruidException.defensive( + "Worker or partition number is null for page id [%d]", + pageInformation.getId() + ); + } return new FrameChannelSequence(standardImplementation.openChannel( finalStage.getId(), - (int) pageInformation.getId(), - (int) pageInformation.getId() - // we would always have partition number == worker number + pageInformation.getWorker(), + pageInformation.getPartition() )); } catch (Exception e) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 265f5eae0fe1..b1445745e86b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -105,6 +105,9 @@ public class MultiStageQueryContext public static final String CTX_ROWS_PER_SEGMENT = "rowsPerSegment"; static final int DEFAULT_ROWS_PER_SEGMENT = 3000000; + public static final String CTX_ROWS_PER_PAGE = "rowsPerPage"; + static final int DEFAULT_ROWS_PER_PAGE = 100000; + public static final String CTX_ROWS_IN_MEMORY = "rowsInMemory"; // Lower than the default to minimize the impact of per-row overheads that are not accounted for by // OnheapIncrementalIndex. For example: overheads related to creating bitmaps during persist. @@ -206,6 +209,15 @@ public static int getRowsPerSegment(final QueryContext queryContext) ); } + public static int getRowsPerPage(final QueryContext queryContext) + { + return queryContext.getInt( + CTX_ROWS_PER_PAGE, + DEFAULT_ROWS_PER_PAGE + ); + } + + public static MSQSelectDestination getSelectDestination(final QueryContext queryContext) { return QueryContexts.getAsEnum( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java index 08bc3dc54d93..c0f282288c21 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java @@ -143,8 +143,13 @@ public static SqlStatementState getSqlStatementState(TaskStatusPlus taskStatusPl *
    *
  1. {@link DataSourceMSQDestination} a single page is returned which adds all the counters of {@link SegmentGenerationProgressCounter.Snapshot}
  2. *
  3. {@link TaskReportMSQDestination} a single page is returned which adds all the counters of {@link ChannelCounters}
  4. - *
  5. {@link DurableStorageMSQDestination} a page is returned for each worker which has generated output rows. The list is sorted on page Id. - * If the worker generated 0 rows, we do no populated a page for it. {@link PageInformation#id} is equal to the worker number
  6. + *
  7. {@link DurableStorageMSQDestination} a page is returned for each partition, worker which has generated output rows. The pages are populated in the following order: + *
      + *
    • For each partition from 0 to N
    • + *
    • For each worker from 0 to M
    • + *
    • If num rows for that partition,worker combination is 0, create a page
    • + * so that we maintain the record ordering. + *
    *
*/ public static Optional> populatePageList( @@ -155,9 +160,9 @@ public static Optional> populatePageList( if (msqTaskReportPayload.getStages() == null || msqTaskReportPayload.getCounters() == null) { return Optional.empty(); } - int finalStage = msqTaskReportPayload.getStages().getStages().size() - 1; + MSQStagesReport.Stage finalStage = getFinalStage(msqTaskReportPayload); CounterSnapshotsTree counterSnapshotsTree = msqTaskReportPayload.getCounters(); - Map workerCounters = counterSnapshotsTree.snapshotForStage(finalStage); + Map workerCounters = counterSnapshotsTree.snapshotForStage(finalStage.getStageNumber()); if (workerCounters == null || workerCounters.isEmpty()) { return Optional.empty(); } @@ -193,27 +198,56 @@ public static Optional> populatePageList( } } else if (msqDestination instanceof DurableStorageMSQDestination) { - List pageList = new ArrayList<>(); - for (Map.Entry counterSnapshots : workerCounters.entrySet()) { - long rows = 0L; - long size = 0L; - QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getValue().getMap().getOrDefault("output", null); - if (queryCounterSnapshot != null && queryCounterSnapshot instanceof ChannelCounters.Snapshot) { - rows += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getRows()).sum(); - size += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getBytes()).sum(); - } - // do not populate a page if the worker generated 0 rows. - if (rows != 0L) { - pageList.add(new PageInformation(counterSnapshots.getKey(), rows, size)); - } - } - pageList.sort(PageInformation.getIDComparator()); - return Optional.of(pageList); + + return populatePagesForDurableStorageDestination(finalStage, workerCounters); } else { return Optional.empty(); } } + private static Optional> populatePagesForDurableStorageDestination( + MSQStagesReport.Stage finalStage, + Map workerCounters + ) + { + // figure out number of partitions and number of workers + int totalPartitions = finalStage.getPartitionCount(); + int totalWorkerCount = finalStage.getWorkerCount(); + + if (totalPartitions == -1) { + throw DruidException.defensive("Expected partition count to be set for stage[%d]", finalStage); + } + if (totalWorkerCount == -1) { + throw DruidException.defensive("Expected worker count to be set for stage[%d]", finalStage); + } + + + List pages = new ArrayList<>(); + for (int partitionNumber = 0; partitionNumber < totalPartitions; partitionNumber++) { + for (int workerNumber = 0; workerNumber < totalWorkerCount; workerNumber++) { + CounterSnapshots workerCounter = workerCounters.get(workerNumber); + + if (workerCounter != null && workerCounter.getMap() != null) { + QueryCounterSnapshot channelCounters = workerCounter.getMap().get("output"); + + if (channelCounters != null && channelCounters instanceof ChannelCounters.Snapshot) { + long rows = 0L; + long size = 0L; + + if (((ChannelCounters.Snapshot) channelCounters).getRows().length > partitionNumber) { + rows += ((ChannelCounters.Snapshot) channelCounters).getRows()[partitionNumber]; + size += ((ChannelCounters.Snapshot) channelCounters).getBytes()[partitionNumber]; + } + if (rows != 0L) { + pages.add(new PageInformation(pages.size(), rows, size, workerNumber, partitionNumber)); + } + } + } + } + } + return Optional.of(pages); + } + public static Optional getExceptionPayload( String queryId, TaskStatusResponse taskResponse, @@ -336,6 +370,7 @@ public static MSQStagesReport.Stage getFinalStage(MSQTaskReportPayload msqTaskRe } return null; } + public static Map getQueryExceptionDetails(Map payload) { return getMap(getMap(payload, "status"), "errorReport"); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index ae0bfa71f1e1..606d9243df1c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -109,6 +109,7 @@ public class MSQSelectTest extends MSQTestBase public static final Map QUERY_RESULTS_WITH_DURABLE_STORAGE_CONTEXT = ImmutableMap.builder() .putAll(DURABLE_STORAGE_MSQ_CONTEXT) + .put(MultiStageQueryContext.CTX_ROWS_PER_PAGE, 2) .put( MultiStageQueryContext.CTX_SELECT_DESTINATION, MSQSelectDestination.DURABLESTORAGE.getName().toLowerCase(Locale.ENGLISH) @@ -215,16 +216,6 @@ public void testSelectOnFoo() .with().totalFiles(1), 0, 0, "input0" ) - .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher - .with().rows(6).frames(1), - 0, 0, "output" - ) - .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher - .with().rows(6).frames(1), - 0, 0, "shuffle" - ) .setExpectedResultRows(ImmutableList.of( new Object[]{1L, ""}, new Object[]{1L, "10.1"}, @@ -341,16 +332,6 @@ public void testSelectOnFooDuplicateColumnNames() .with().totalFiles(1), 0, 0, "input0" ) - .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher - .with().rows(6).frames(1), - 0, 0, "output" - ) - .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher - .with().rows(6).frames(1), - 0, 0, "shuffle" - ) .setExpectedResultRows(ImmutableList.of( new Object[]{1L, ""}, new Object[]{1L, "10.1"}, diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/ResultSetInformationTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/ResultSetInformationTest.java index ce84ac91fd40..0d3ca30b0f5a 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/ResultSetInformationTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/ResultSetInformationTest.java @@ -66,7 +66,7 @@ public void sanityTest() throws JsonProcessingException MAPPER.readValue(MAPPER.writeValueAsString(RESULTS), ResultSetInformation.class).hashCode() ); Assert.assertEquals( - "ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{id=0, numRows=null, sizeInBytes=1}]}", + "ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{id=0, numRows=null, sizeInBytes=1, worker=null, partition=null}]}", RESULTS.toString() ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/SqlStatementResultTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/SqlStatementResultTest.java index 0434c89ce193..03c017b7442d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/SqlStatementResultTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/SqlStatementResultTest.java @@ -86,7 +86,7 @@ public void sanityTest() throws JsonProcessingException + " createdAt=2023-05-31T12:00:00.000Z," + " sqlRowSignature=[ColumnNameAndTypes{colName='_time', sqlTypeName='TIMESTAMP', nativeTypeName='LONG'}, ColumnNameAndTypes{colName='alias', sqlTypeName='VARCHAR', nativeTypeName='STRING'}, ColumnNameAndTypes{colName='market', sqlTypeName='VARCHAR', nativeTypeName='STRING'}]," + " durationInMs=100," - + " resultSetInformation=ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{id=0, numRows=null, sizeInBytes=1}]}," + + " resultSetInformation=ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{id=0, numRows=null, sizeInBytes=1, worker=null, partition=null}]}," + " errorResponse={error=druidException, errorCode=QueryNotSupported, persona=USER, category=UNCATEGORIZED, errorMessage=QueryNotSupported, context={}}}", SQL_STATEMENT_RESULT.toString() ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 3ce2d18e40de..0bb240d17ae6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -107,6 +107,7 @@ import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory; import org.apache.druid.msq.sql.MSQTaskQueryMaker; import org.apache.druid.msq.sql.MSQTaskSqlEngine; +import org.apache.druid.msq.sql.entity.PageInformation; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.msq.util.SqlStatementResourceHelper; import org.apache.druid.query.DruidProcessingConfig; @@ -199,6 +200,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -1321,6 +1323,7 @@ public Pair, List>> } else { MSQControllerTask msqControllerTask = indexingServiceClient.getMSQControllerTask(controllerId); + final MSQSpec spec = msqControllerTask.getQuerySpec(); final List rows; @@ -1329,24 +1332,36 @@ public Pair, List>> } else { StageDefinition finalStage = Objects.requireNonNull(SqlStatementResourceHelper.getFinalStage( payload)).getStageDefinition(); - Closer closer = Closer.create(); - InputChannelFactory inputChannelFactory = DurableStorageInputChannelFactory.createStandardImplementation( - controllerId, - localFileStorageConnector, - closer, - true + + Optional> pages = SqlStatementResourceHelper.populatePageList( + payload, + spec.getDestination() ); - rows = new FrameChannelSequence(inputChannelFactory.openChannel( - finalStage.getId(), - 0, - 0 - )).flatMap(frame -> SqlStatementResourceHelper.getResultSequence( - msqControllerTask, - finalStage, - frame, - objectMapper - )).withBaggage(closer).toList(); + if (!pages.isPresent()) { + throw new ISE("Query no results found"); + } + + rows = new ArrayList<>(); + for (PageInformation pageInformation : pages.get()) { + Closer closer = Closer.create(); + InputChannelFactory inputChannelFactory = DurableStorageInputChannelFactory.createStandardImplementation( + controllerId, + localFileStorageConnector, + closer, + true + ); + rows.addAll(new FrameChannelSequence(inputChannelFactory.openChannel( + finalStage.getId(), + pageInformation.getWorker(), + pageInformation.getPartition() + )).flatMap(frame -> SqlStatementResourceHelper.getResultSequence( + msqControllerTask, + finalStage, + frame, + objectMapper + )).withBaggage(closer).toList()); + } } if (rows == null) { throw new ISE("Query successful but no results found"); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java new file mode 100644 index 000000000000..419896e619d8 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.util; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import org.apache.datasketches.memory.Memory; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.counters.CounterSnapshots; +import org.apache.druid.msq.counters.CounterSnapshotsTree; +import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination; +import org.apache.druid.msq.indexing.report.MSQStagesReport; +import org.apache.druid.msq.indexing.report.MSQStatusReport; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.msq.indexing.report.MSQTaskReportTest; +import org.apache.druid.msq.sql.entity.PageInformation; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayDeque; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; + +public class SqlStatementResourceHelperTest +{ + + private static final Logger log = new Logger(SqlStatementResourceHelperTest.class); + + @Test + public void testDistinctPartitionsOnEachWorker() + { + CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree(); + ChannelCounters worker0 = createChannelCounters(new int[]{0, 3, 6}); + ChannelCounters worker1 = createChannelCounters(new int[]{1, 4, 4, 7, 9, 10, 13}); + ChannelCounters worker2 = createChannelCounters(new int[]{2, 5, 8, 11, 14}); + + counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of("output", worker0.snapshot()))); + counterSnapshots.put(0, 1, new CounterSnapshots(ImmutableMap.of("output", worker1.snapshot()))); + counterSnapshots.put(0, 2, new CounterSnapshots(ImmutableMap.of("output", worker2.snapshot()))); + + MSQTaskReportPayload payload = new MSQTaskReportPayload(new MSQStatusReport( + TaskState.SUCCESS, + null, + new ArrayDeque<>(), + null, + 0, + 1, + 2 + ), MSQStagesReport.create( + MSQTaskReportTest.QUERY_DEFINITION, + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of(0, 3), + ImmutableMap.of(0, 15) + ), counterSnapshots, null); + + Optional> pages = SqlStatementResourceHelper.populatePageList( + payload, + DurableStorageMSQDestination.instance() + ); + validatePages(pages.get(), createValidationMap(worker0, worker1, worker2)); + } + + @Test + public void testOnePartitionOnEachWorker() + { + CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree(); + ChannelCounters worker0 = createChannelCounters(new int[]{0}); + ChannelCounters worker1 = createChannelCounters(new int[]{1}); + ChannelCounters worker2 = createChannelCounters(new int[]{2}); + ChannelCounters worker3 = createChannelCounters(new int[]{4}); + + counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of("output", worker0.snapshot()))); + counterSnapshots.put(0, 1, new CounterSnapshots(ImmutableMap.of("output", worker1.snapshot()))); + counterSnapshots.put(0, 2, new CounterSnapshots(ImmutableMap.of("output", worker2.snapshot()))); + counterSnapshots.put(0, 3, new CounterSnapshots(ImmutableMap.of("output", worker3.snapshot()))); + + MSQTaskReportPayload payload = new MSQTaskReportPayload(new MSQStatusReport( + TaskState.SUCCESS, + null, + new ArrayDeque<>(), + null, + 0, + 1, + 2 + ), MSQStagesReport.create( + MSQTaskReportTest.QUERY_DEFINITION, + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of(0, 4), + ImmutableMap.of(0, 4) + ), counterSnapshots, null); + + Optional> pages = SqlStatementResourceHelper.populatePageList( + payload, + DurableStorageMSQDestination.instance() + ); + validatePages(pages.get(), createValidationMap(worker0, worker1, worker2)); + } + + + @Test + public void testCommonPartitionsOnEachWorker() + { + CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree(); + ChannelCounters worker0 = createChannelCounters(new int[]{0, 1, 2, 3, 8, 9}); + ChannelCounters worker1 = createChannelCounters(new int[]{1, 4, 12}); + ChannelCounters worker2 = createChannelCounters(new int[]{20}); + ChannelCounters worker3 = createChannelCounters(new int[]{2, 2, 5, 6, 7, 9, 15}); + + counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of("output", worker0.snapshot()))); + counterSnapshots.put(0, 1, new CounterSnapshots(ImmutableMap.of("output", worker1.snapshot()))); + counterSnapshots.put(0, 2, new CounterSnapshots(ImmutableMap.of("output", worker2.snapshot()))); + counterSnapshots.put(0, 3, new CounterSnapshots(ImmutableMap.of("output", worker3.snapshot()))); + + MSQTaskReportPayload payload = new MSQTaskReportPayload(new MSQStatusReport( + TaskState.SUCCESS, + null, + new ArrayDeque<>(), + null, + 0, + 1, + 2 + ), MSQStagesReport.create( + MSQTaskReportTest.QUERY_DEFINITION, + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of(0, 4), + ImmutableMap.of(0, 21) + ), counterSnapshots, null); + + Optional> pages = + SqlStatementResourceHelper.populatePageList(payload, DurableStorageMSQDestination.instance()); + validatePages(pages.get(), createValidationMap(worker0, worker1, worker2, worker3)); + } + + + @Test + public void testNullChannelCounters() + { + CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree(); + ChannelCounters worker0 = createChannelCounters(new int[0]); + ChannelCounters worker1 = createChannelCounters(new int[]{1, 4, 12}); + ChannelCounters worker2 = createChannelCounters(new int[]{20}); + ChannelCounters worker3 = createChannelCounters(new int[]{2, 2, 5, 6, 7, 9, 15}); + + counterSnapshots.put(0, 0, new CounterSnapshots(Maps.newHashMap())); + counterSnapshots.put(0, 1, new CounterSnapshots(ImmutableMap.of("output", worker1.snapshot()))); + counterSnapshots.put(0, 2, new CounterSnapshots(ImmutableMap.of("output", worker2.snapshot()))); + counterSnapshots.put(0, 3, new CounterSnapshots(ImmutableMap.of("output", worker3.snapshot()))); + + MSQTaskReportPayload payload = new MSQTaskReportPayload(new MSQStatusReport( + TaskState.SUCCESS, + null, + new ArrayDeque<>(), + null, + 0, + 1, + 2 + ), MSQStagesReport.create( + MSQTaskReportTest.QUERY_DEFINITION, + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of(0, 4), + ImmutableMap.of(0, 21) + ), counterSnapshots, null); + + Optional> pages = SqlStatementResourceHelper.populatePageList( + payload, + DurableStorageMSQDestination.instance() + ); + validatePages(pages.get(), createValidationMap(worker0, worker1, worker2, worker3)); + } + + + @Test + public void testConsecutivePartitionsOnEachWorker() + { + CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree(); + ChannelCounters worker0 = createChannelCounters(new int[]{0, 1, 2}); + ChannelCounters worker1 = createChannelCounters(new int[]{3, 4, 5}); + ChannelCounters worker2 = createChannelCounters(new int[]{6, 7, 8}); + ChannelCounters worker3 = createChannelCounters(new int[]{9, 10, 11, 12}); + + counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of("output", worker0.snapshot()))); + counterSnapshots.put(0, 1, new CounterSnapshots(ImmutableMap.of("output", worker1.snapshot()))); + counterSnapshots.put(0, 2, new CounterSnapshots(ImmutableMap.of("output", worker2.snapshot()))); + counterSnapshots.put(0, 3, new CounterSnapshots(ImmutableMap.of("output", worker3.snapshot()))); + + MSQTaskReportPayload payload = new MSQTaskReportPayload(new MSQStatusReport( + TaskState.SUCCESS, + null, + new ArrayDeque<>(), + null, + 0, + 1, + 2 + ), MSQStagesReport.create( + MSQTaskReportTest.QUERY_DEFINITION, + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of(0, 4), + ImmutableMap.of(0, 13) + ), counterSnapshots, null); + + Optional> pages = SqlStatementResourceHelper.populatePageList( + payload, + DurableStorageMSQDestination.instance() + ); + validatePages(pages.get(), createValidationMap(worker0, worker1, worker2, worker3)); + } + + + private void validatePages( + List pageList, + Map>> partitionToWorkerToRowsBytes + ) + { + int currentPage = 0; + for (Map.Entry>> partitionWorker : partitionToWorkerToRowsBytes.entrySet()) { + for (Map.Entry> workerRowsBytes : partitionWorker.getValue().entrySet()) { + PageInformation pageInformation = pageList.get(currentPage); + Assert.assertEquals(currentPage, pageInformation.getId()); + Assert.assertEquals(workerRowsBytes.getValue().lhs, pageInformation.getNumRows()); + Assert.assertEquals(workerRowsBytes.getValue().rhs, pageInformation.getSizeInBytes()); + Assert.assertEquals(partitionWorker.getKey(), pageInformation.getPartition()); + Assert.assertEquals(workerRowsBytes.getKey(), pageInformation.getWorker()); + log.debug(pageInformation.toString()); + currentPage++; + } + } + Assert.assertEquals(currentPage, pageList.size()); + } + + private Map>> createValidationMap( + ChannelCounters... workers + ) + { + if (workers == null || workers.length == 0) { + return Maps.newHashMap(); + } else { + Map>> partitionToWorkerToRowsBytes = new TreeMap<>(); + for (int worker = 0; worker < workers.length; worker++) { + ChannelCounters.Snapshot workerCounter = workers[worker].snapshot(); + for (int partition = 0; workerCounter != null && partition < workerCounter.getRows().length; partition++) { + Map> workerMap = partitionToWorkerToRowsBytes.computeIfAbsent( + partition, + k -> new TreeMap<>() + ); + + if (workerCounter.getRows()[partition] != 0) { + workerMap.put( + worker, + new Pair<>( + workerCounter.getRows()[partition], + workerCounter.getBytes()[partition] + ) + ); + } + + } + } + return partitionToWorkerToRowsBytes; + } + } + + + private ChannelCounters createChannelCounters(int[] partitions) + { + if (partitions == null || partitions.length == 0) { + return new ChannelCounters(); + } + ChannelCounters channelCounters = new ChannelCounters(); + int prev = -1; + for (int current : partitions) { + if (prev > current) { + throw new IllegalArgumentException("Channel numbers should be in increasing order"); + } + channelCounters.addFrame(current, new TestFrame(current * 10 + 1, 100)); + prev = current; + } + return channelCounters; + } + + public static class TestFrame extends Frame + { + + int numRows; + int numBytes; + + public TestFrame(int numRows, int numBytes) + { + super(Memory.wrap(new byte[0]), FrameType.COLUMNAR, numBytes, numRows, 1, false); + this.numRows = numRows; + this.numBytes = numBytes; + } + + @Override + public long numBytes() + { + return numBytes; + } + + @Override + public int numRows() + { + return numRows; + } + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/Frame.java b/processing/src/main/java/org/apache/druid/frame/Frame.java index 8185190a28da..752f034ce157 100644 --- a/processing/src/main/java/org/apache/druid/frame/Frame.java +++ b/processing/src/main/java/org/apache/druid/frame/Frame.java @@ -105,7 +105,7 @@ public class Frame private final int numRegions; private final boolean permuted; - private Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted) + protected Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted) { this.memory = memory; this.frameType = frameType; From 6cd5b36d26733c2e8be75119c2936153d20cab71 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Mon, 9 Oct 2023 13:44:53 +0530 Subject: [PATCH 02/10] Upstream merge things. --- .../msq/util/SqlStatementResourceHelperTest.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java index 419896e619d8..ca0ae7f4b617 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java @@ -69,7 +69,8 @@ public void testDistinctPartitionsOnEachWorker() null, 0, 1, - 2 + 2, + null ), MSQStagesReport.create( MSQTaskReportTest.QUERY_DEFINITION, ImmutableMap.of(), @@ -106,7 +107,8 @@ public void testOnePartitionOnEachWorker() null, 0, 1, - 2 + 2, + null ), MSQStagesReport.create( MSQTaskReportTest.QUERY_DEFINITION, ImmutableMap.of(), @@ -144,7 +146,8 @@ public void testCommonPartitionsOnEachWorker() null, 0, 1, - 2 + 2, + null ), MSQStagesReport.create( MSQTaskReportTest.QUERY_DEFINITION, ImmutableMap.of(), @@ -180,7 +183,8 @@ public void testNullChannelCounters() null, 0, 1, - 2 + 2, + null ), MSQStagesReport.create( MSQTaskReportTest.QUERY_DEFINITION, ImmutableMap.of(), @@ -218,7 +222,8 @@ public void testConsecutivePartitionsOnEachWorker() null, 0, 1, - 2 + 2, + null ), MSQStagesReport.create( MSQTaskReportTest.QUERY_DEFINITION, ImmutableMap.of(), From be9b736e3512ea061d64794dbb794ab1c02f1126 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 11 Oct 2023 18:13:19 +0530 Subject: [PATCH 03/10] Addressing review comments. --- .../apache/druid/msq/exec/ControllerImpl.java | 20 ++++------- .../msq/querykit/ShuffleSpecFactories.java | 14 ++++++++ .../apache/druid/msq/exec/MSQSelectTest.java | 28 +++++++++++++++ .../util/SqlStatementResourceHelperTest.java | 35 +++++-------------- .../java/org/apache/druid/frame/Frame.java | 2 +- 5 files changed, 58 insertions(+), 41 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 00ea8f6da9e6..a308d983eeef 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -145,7 +145,6 @@ import org.apache.druid.msq.input.table.DataSegmentWithLocation; import org.apache.druid.msq.input.table.TableInputSpec; import org.apache.druid.msq.input.table.TableInputSpecSlicer; -import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.QueryDefinitionBuilder; import org.apache.druid.msq.kernel.StageDefinition; @@ -1663,12 +1662,7 @@ private static QueryDefinition makeQueryDefinition( final ShuffleSpecFactory shuffleSpecFactory; if (MSQControllerTask.isIngestion(querySpec)) { - shuffleSpecFactory = (clusterBy, aggregate) -> - new GlobalSortTargetSizeShuffleSpec( - clusterBy, - tuningConfig.getRowsPerSegment(), - aggregate - ); + shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(tuningConfig.getRowsPerSegment()); if (!columnMappings.hasUniqueOutputColumnNames()) { // We do not expect to hit this case in production, because the SQL validator checks that column names @@ -1693,13 +1687,9 @@ private static QueryDefinition makeQueryDefinition( shuffleSpecFactory = ShuffleSpecFactories.singlePartition(); queryToPlan = querySpec.getQuery(); } else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) { - - shuffleSpecFactory = (clusterBy, aggregate) -> - new GlobalSortTargetSizeShuffleSpec( - clusterBy, - MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()), - aggregate - ); + shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize( + MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()) + ); queryToPlan = querySpec.getQuery(); } else { throw new ISE("Unsupported destination [%s]", querySpec.getDestination()); @@ -1801,6 +1791,8 @@ private static QueryDefinition makeQueryDefinition( } } + + private static DataSchema generateDataSchema( MSQSpec querySpec, RowSignature querySignature, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java index 971aa9b7e0c7..d28439c0f8e0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java @@ -20,6 +20,7 @@ package org.apache.druid.msq.querykit; import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec; +import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec; import org.apache.druid.msq.kernel.MixShuffleSpec; /** @@ -53,4 +54,17 @@ public static ShuffleSpecFactory globalSortWithMaxPartitionCount(final int parti { return (clusterBy, aggregate) -> new GlobalSortMaxCountShuffleSpec(clusterBy, partitions, aggregate); } + + /** + * Factory that produces globally sorted partitions of a target size. + */ + public static ShuffleSpecFactory getGlobalSortWithTargetSize(int targetSize) + { + return (clusterBy, aggregate) -> + new GlobalSortTargetSizeShuffleSpec( + clusterBy, + targetSize, + aggregate + ); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index c86ed35c0cfc..29b02a2c5398 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -219,6 +219,18 @@ public void testSelectOnFoo() .with().totalFiles(1), 0, 0, "input0" ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with() + .rows(isPageSizeLimited() ? new long[]{1, 2, 3} : new long[]{6}) + .frames(isPageSizeLimited() ? new long[]{1, 1, 1} : new long[]{1}), + 0, 0, "shuffle" + ) .setExpectedResultRows(ImmutableList.of( new Object[]{1L, ""}, new Object[]{1L, "10.1"}, @@ -334,6 +346,17 @@ public void testSelectOnFooDuplicateColumnNames() CounterSnapshotMatcher .with().totalFiles(1), 0, 0, "input0" + ).setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with() + .rows(isPageSizeLimited() ? new long[]{1, 2, 3} : new long[]{6}) + .frames(isPageSizeLimited() ? new long[]{1, 1, 1} : new long[]{1}), + 0, 0, "shuffle" ) .setExpectedResultRows(ImmutableList.of( new Object[]{1L, ""}, @@ -2411,4 +2434,9 @@ public boolean isDurableStorageDestination() { return QUERY_RESULTS_WITH_DURABLE_STORAGE.equals(contextName) || QUERY_RESULTS_WITH_DEFAULT_CONTEXT.equals(context); } + + public boolean isPageSizeLimited() + { + return QUERY_RESULTS_WITH_DURABLE_STORAGE.equals(contextName); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java index ca0ae7f4b617..e94241e74d9e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java @@ -21,9 +21,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import org.apache.datasketches.memory.Memory; import org.apache.druid.frame.Frame; -import org.apache.druid.frame.FrameType; import org.apache.druid.indexer.TaskState; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.logger.Logger; @@ -36,6 +34,7 @@ import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; import org.apache.druid.msq.indexing.report.MSQTaskReportTest; import org.apache.druid.msq.sql.entity.PageInformation; +import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -305,35 +304,19 @@ private ChannelCounters createChannelCounters(int[] partitions) if (prev > current) { throw new IllegalArgumentException("Channel numbers should be in increasing order"); } - channelCounters.addFrame(current, new TestFrame(current * 10 + 1, 100)); + channelCounters.addFrame(current, createFrame(current * 10 + 1, 100L)); prev = current; } return channelCounters; } - public static class TestFrame extends Frame - { - - int numRows; - int numBytes; - - public TestFrame(int numRows, int numBytes) - { - super(Memory.wrap(new byte[0]), FrameType.COLUMNAR, numBytes, numRows, 1, false); - this.numRows = numRows; - this.numBytes = numBytes; - } - @Override - public long numBytes() - { - return numBytes; - } - - @Override - public int numRows() - { - return numRows; - } + private Frame createFrame(int numRows, long numBytes) + { + Frame frame = EasyMock.mock(Frame.class); + EasyMock.expect(frame.numRows()).andReturn(numRows).anyTimes(); + EasyMock.expect(frame.numBytes()).andReturn(numBytes).anyTimes(); + EasyMock.replay(frame); + return frame; } } diff --git a/processing/src/main/java/org/apache/druid/frame/Frame.java b/processing/src/main/java/org/apache/druid/frame/Frame.java index 752f034ce157..8185190a28da 100644 --- a/processing/src/main/java/org/apache/druid/frame/Frame.java +++ b/processing/src/main/java/org/apache/druid/frame/Frame.java @@ -105,7 +105,7 @@ public class Frame private final int numRegions; private final boolean permuted; - protected Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted) + private Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted) { this.memory = memory; this.frameType = frameType; From 65cea5c0c1f5d796a5daf5d0a33768ff01b8a6a1 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 11 Oct 2023 18:16:20 +0530 Subject: [PATCH 04/10] Addressing review comments. --- .../src/test/java/org/apache/druid/msq/test/MSQTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index ba47987040fb..2fc2489ab7c6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -1375,7 +1375,7 @@ public Pair, List>> ); if (!pages.isPresent()) { - throw new ISE("Query no results found"); + throw new ISE("No query results found"); } rows = new ArrayList<>(); From cb4101f5acf47d2e6d41bf4ab736be39f7af7224 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 11 Oct 2023 19:52:07 +0530 Subject: [PATCH 05/10] Fixing forbidden check --- .../druid/msq/util/SqlStatementResourceHelperTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java index e94241e74d9e..9ace9ef193ff 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java @@ -39,6 +39,7 @@ import org.junit.Test; import java.util.ArrayDeque; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -170,7 +171,7 @@ public void testNullChannelCounters() ChannelCounters worker2 = createChannelCounters(new int[]{20}); ChannelCounters worker3 = createChannelCounters(new int[]{2, 2, 5, 6, 7, 9, 15}); - counterSnapshots.put(0, 0, new CounterSnapshots(Maps.newHashMap())); + counterSnapshots.put(0, 0, new CounterSnapshots(new HashMap<>())); counterSnapshots.put(0, 1, new CounterSnapshots(ImmutableMap.of("output", worker1.snapshot()))); counterSnapshots.put(0, 2, new CounterSnapshots(ImmutableMap.of("output", worker2.snapshot()))); counterSnapshots.put(0, 3, new CounterSnapshots(ImmutableMap.of("output", worker3.snapshot()))); @@ -265,7 +266,7 @@ private Map>> createValidationMap( ) { if (workers == null || workers.length == 0) { - return Maps.newHashMap(); + return new HashMap<>(); } else { Map>> partitionToWorkerToRowsBytes = new TreeMap<>(); for (int worker = 0; worker < workers.length; worker++) { From 79a4eb0d5d5d8e9f68fea5d76526ad02db34fb79 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 11 Oct 2023 20:40:28 +0530 Subject: [PATCH 06/10] Fixing checkstyle. --- .../apache/druid/msq/util/SqlStatementResourceHelperTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java index 9ace9ef193ff..806bd8ebe988 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java @@ -20,7 +20,6 @@ package org.apache.druid.msq.util; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; import org.apache.druid.frame.Frame; import org.apache.druid.indexer.TaskState; import org.apache.druid.java.util.common.Pair; From 6766b346fd8d0e17069101b0751c5adebfea1191 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 11 Oct 2023 21:28:27 +0530 Subject: [PATCH 07/10] Adding some more tests. --- .../SqlMSQStatementResourcePostTest.java | 41 +++++++++++++++---- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java index 415e36a02d49..eaef7065db08 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java @@ -307,6 +307,7 @@ public void testWithDurableStorage() throws IOException { Map context = defaultAsyncContext(); context.put(MultiStageQueryContext.CTX_SELECT_DESTINATION, MSQSelectDestination.DURABLESTORAGE.getName()); + context.put(MultiStageQueryContext.CTX_ROWS_PER_PAGE, 2); SqlStatementResult sqlStatementResult = (SqlStatementResult) resource.doPost( new SqlQuery( @@ -321,6 +322,12 @@ public void testWithDurableStorage() throws IOException SqlStatementResourceTest.makeOkRequest() ).getEntity(); + Assert.assertEquals(ImmutableList.of( + new PageInformation(0, 1L, 75L, 0, 0), + new PageInformation(1, 2L, 121L, 0, 1), + new PageInformation(2, 3L, 164L, 0, 2) + ), sqlStatementResult.getResultSetInformation().getPages()); + assertExpectedResults( "{\"cnt\":1,\"dim1\":\"\"}\n" + "{\"cnt\":1,\"dim1\":\"10.1\"}\n" @@ -335,23 +342,33 @@ public void testWithDurableStorage() throws IOException ResultFormat.OBJECTLINES.name(), SqlStatementResourceTest.makeOkRequest() ), - objectMapper); + objectMapper + ); assertExpectedResults( - "{\"cnt\":1,\"dim1\":\"\"}\n" - + "{\"cnt\":1,\"dim1\":\"10.1\"}\n" - + "{\"cnt\":1,\"dim1\":\"2\"}\n" - + "{\"cnt\":1,\"dim1\":\"1\"}\n" + "{\"cnt\":1,\"dim1\":\"\"}\n\n", + resource.doGetResults( + sqlStatementResult.getQueryId(), + 0L, + ResultFormat.OBJECTLINES.name(), + SqlStatementResourceTest.makeOkRequest() + ), + objectMapper + ); + + assertExpectedResults( + "{\"cnt\":1,\"dim1\":\"1\"}\n" + "{\"cnt\":1,\"dim1\":\"def\"}\n" + "{\"cnt\":1,\"dim1\":\"abc\"}\n" + "\n", resource.doGetResults( sqlStatementResult.getQueryId(), - 0L, + 2L, ResultFormat.OBJECTLINES.name(), SqlStatementResourceTest.makeOkRequest() ), - objectMapper); + objectMapper + ); } @Test @@ -457,7 +474,12 @@ public void testResultFormatWithParamInSelect() throws IOException ))); } - private byte[] createExpectedResultsInFormat(ResultFormat resultFormat, List resultsList, List rowSignature, ObjectMapper jsonMapper) throws Exception + private byte[] createExpectedResultsInFormat( + ResultFormat resultFormat, + List resultsList, + List rowSignature, + ObjectMapper jsonMapper + ) throws Exception { ByteArrayOutputStream os = new ByteArrayOutputStream(); try (final ResultFormat.Writer writer = resultFormat.createFormatter(os, jsonMapper)) { @@ -466,7 +488,8 @@ private byte[] createExpectedResultsInFormat(ResultFormat resultFormat, List Date: Wed, 11 Oct 2023 21:43:10 +0530 Subject: [PATCH 08/10] Review --- .../main/java/org/apache/druid/msq/exec/ControllerImpl.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index a308d983eeef..3dc2e099c5e8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1771,10 +1771,7 @@ private static QueryDefinition makeQueryDefinition( StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition(); if (finalShuffleStageDef.doesSortDuringShuffle()) { final QueryDefinitionBuilder builder = QueryDefinition.builder(); - for (final StageDefinition stageDef : queryDef.getStageDefinitions()) { - builder.add(StageDefinition.builder(stageDef)); - } - + builder.addAll(queryDef); builder.add(StageDefinition.builder(queryDef.getNextStageNumber()) .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) .maxWorkerCount(tuningConfig.getMaxNumWorkers()) From 00e08297d1997c24dd0d894d43520656fe3923ba Mon Sep 17 00:00:00 2001 From: cryptoe Date: Thu, 12 Oct 2023 11:32:17 +0530 Subject: [PATCH 09/10] Adding multiWorker test case. --- .../apache/druid/msq/exec/MSQSelectTest.java | 144 +++++++++++++++++- 1 file changed, 143 insertions(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 29b02a2c5398..d54808baac32 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -62,6 +62,7 @@ import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.filter.LikeDimFilter; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; @@ -1253,7 +1254,7 @@ public void testGroupByOrderByAggregationWithLimitAndOffset() } @Test - public void testExternSelect1() throws IOException + public void testExternGroupBy() throws IOException { final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json"); final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath()); @@ -1339,6 +1340,147 @@ public void testExternSelect1() throws IOException .verifyResults(); } + + @Test + public void testExternSelectWithMultipleWorkers() throws IOException + { + Map multipleWorkerContext = new HashMap<>(context); + multipleWorkerContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 3); + + final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json"); + final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath()); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("user", ColumnType.STRING) + .build(); + + final ScanQuery expectedQuery = + newScanQueryBuilder().dataSource( + new ExternalDataSource( + new LocalInputSource(null, null, ImmutableList.of(toRead.getAbsoluteFile(), toRead.getAbsoluteFile())), + new JsonInputFormat(null, null, null, null, null), + RowSignature.builder() + .add("timestamp", ColumnType.STRING) + .add("page", ColumnType.STRING) + .add("user", ColumnType.STRING) + .build() + ) + ).eternityInterval().virtualColumns( + new ExpressionVirtualColumn( + "v0", + "timestamp_floor(timestamp_parse(\"timestamp\",null,'UTC'),'P1D',null,'UTC')", + ColumnType.LONG, + CalciteTests.createExprMacroTable() + ) + ).columns("user", "v0").filters(new LikeDimFilter("user", "%bot%", null, null)) + .context(defaultScanQueryContext(multipleWorkerContext, RowSignature.builder() + .add( + "user", + ColumnType.STRING + ) + .add( + "v0", + ColumnType.LONG + ) + .build())) + .build(); + + SelectTester selectTester = testSelectQuery() + .setSql("SELECT\n" + + " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n" + + " user\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{ \"files\": [" + toReadAsJson + "," + toReadAsJson + "],\"type\":\"local\"}',\n" + + " '{\"type\": \"json\"}',\n" + + " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n" + + " )\n" + + ") where user like '%bot%'") + .setExpectedRowSignature(rowSignature) + .setQueryContext(multipleWorkerContext) + .setExpectedResultRows(ImmutableList.of( + new Object[]{1466985600000L, "Lsjbot"}, + new Object[]{1466985600000L, "Lsjbot"}, + new Object[]{1466985600000L, "Beau.bot"}, + new Object[]{1466985600000L, "Beau.bot"}, + new Object[]{1466985600000L, "Lsjbot"}, + new Object[]{1466985600000L, "Lsjbot"} + )) + .setExpectedMSQSpec( + MSQSpec + .builder() + .query(expectedQuery) + .columnMappings(new ColumnMappings( + ImmutableList.of( + new ColumnMapping("v0", "__time"), + new ColumnMapping("user", "user") + ) + )) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(isDurableStorageDestination() + ? DurableStorageMSQDestination.INSTANCE + : TaskReportMSQDestination.INSTANCE) + .build() + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(3).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with() + .rows(isPageSizeLimited() ? new long[]{1L, 2L} : new long[]{3L}) + .frames(isPageSizeLimited() ? new long[]{1L, 1L} : new long[]{1L}), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1), + 0, 1, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(3).frames(1), + 0, 1, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with() + .rows(isPageSizeLimited() ? new long[]{1L, 2L} : new long[]{3L}) + .frames(isPageSizeLimited() ? new long[]{1L, 1L} : new long[]{1L}), + 0, 1, "shuffle" + ); + // adding result stage counter checks + if (isPageSizeLimited()) { + selectTester = selectTester.setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(2).frames(1), + 1, 0, "input0" + ).setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(2).frames(1), + 1, 0, "output" + ); + selectTester = selectTester.setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(0, 4).frames(0, 1), + 1, 1, "input0" + ).setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(0, 4).frames(0, 1), + 1, 1, "output" + ); + } + selectTester.verifyResults(); + } + @Test public void testIncorrectSelectQuery() { From 2f0fba8a73b2ecae1a1d7d4692d3435be3277fc6 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Thu, 12 Oct 2023 12:43:27 +0530 Subject: [PATCH 10/10] Adding more tests. --- .../apache/druid/msq/exec/MSQSelectTest.java | 30 +++--- .../SqlMSQStatementResourcePostTest.java | 94 +++++++++++++++++++ .../apache/druid/msq/test/MSQTestBase.java | 8 +- 3 files changed, 115 insertions(+), 17 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index d54808baac32..000e3e7ebe79 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -1373,7 +1373,7 @@ public void testExternSelectWithMultipleWorkers() throws IOException ColumnType.LONG, CalciteTests.createExprMacroTable() ) - ).columns("user", "v0").filters(new LikeDimFilter("user", "%bot%", null, null)) + ).columns("user", "v0").filters(new LikeDimFilter("user", "%ot%", null, null)) .context(defaultScanQueryContext(multipleWorkerContext, RowSignature.builder() .add( "user", @@ -1396,7 +1396,7 @@ public void testExternSelectWithMultipleWorkers() throws IOException + " '{\"type\": \"json\"}',\n" + " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n" + " )\n" - + ") where user like '%bot%'") + + ") where user like '%ot%'") .setExpectedRowSignature(rowSignature) .setQueryContext(multipleWorkerContext) .setExpectedResultRows(ImmutableList.of( @@ -1405,7 +1405,11 @@ public void testExternSelectWithMultipleWorkers() throws IOException new Object[]{1466985600000L, "Beau.bot"}, new Object[]{1466985600000L, "Beau.bot"}, new Object[]{1466985600000L, "Lsjbot"}, - new Object[]{1466985600000L, "Lsjbot"} + new Object[]{1466985600000L, "Lsjbot"}, + new Object[]{1466985600000L, "TaxonBot"}, + new Object[]{1466985600000L, "TaxonBot"}, + new Object[]{1466985600000L, "GiftBot"}, + new Object[]{1466985600000L, "GiftBot"} )) .setExpectedMSQSpec( MSQSpec @@ -1430,14 +1434,14 @@ public void testExternSelectWithMultipleWorkers() throws IOException ) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(3).frames(1), + .with().rows(5).frames(1), 0, 0, "output" ) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with() - .rows(isPageSizeLimited() ? new long[]{1L, 2L} : new long[]{3L}) - .frames(isPageSizeLimited() ? new long[]{1L, 1L} : new long[]{1L}), + .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 2L} : new long[]{5L}) + .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L} : new long[]{1L}), 0, 0, "shuffle" ) .setExpectedCountersForStageWorkerChannel( @@ -1447,34 +1451,34 @@ public void testExternSelectWithMultipleWorkers() throws IOException ) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(3).frames(1), + .with().rows(5).frames(1), 0, 1, "output" ) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with() - .rows(isPageSizeLimited() ? new long[]{1L, 2L} : new long[]{3L}) - .frames(isPageSizeLimited() ? new long[]{1L, 1L} : new long[]{1L}), + .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 2L} : new long[]{5L}) + .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L} : new long[]{1L}), 0, 1, "shuffle" ); // adding result stage counter checks if (isPageSizeLimited()) { selectTester = selectTester.setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(2).frames(1), + .with().rows(2, 0, 2).frames(1, 0, 1), 1, 0, "input0" ).setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(2).frames(1), + .with().rows(2, 0, 2).frames(1, 0, 1), 1, 0, "output" ); selectTester = selectTester.setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(0, 4).frames(0, 1), + .with().rows(0, 2, 0, 4).frames(0, 1, 0, 1), 1, 1, "input0" ).setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(0, 4).frames(0, 1), + .with().rows(0, 2, 0, 4).frames(0, 1, 0, 1), 1, 1, "output" ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java index eaef7065db08..6650c7785555 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java @@ -39,6 +39,7 @@ import org.apache.druid.msq.sql.entity.ResultSetInformation; import org.apache.druid.msq.sql.entity.SqlStatementResult; import org.apache.druid.msq.test.MSQTestBase; +import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.msq.test.MSQTestOverlordServiceClient; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.ExecutionMode; @@ -55,6 +56,7 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -371,6 +373,98 @@ public void testWithDurableStorage() throws IOException ); } + + @Test + public void testMultipleWorkersWithPageSizeLimiting() throws IOException + { + Map context = defaultAsyncContext(); + context.put(MultiStageQueryContext.CTX_SELECT_DESTINATION, MSQSelectDestination.DURABLESTORAGE.getName()); + context.put(MultiStageQueryContext.CTX_ROWS_PER_PAGE, 2); + context.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 3); + + final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json"); + final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath()); + + + SqlStatementResult sqlStatementResult = (SqlStatementResult) resource.doPost( + new SqlQuery( + "SELECT\n" + + " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n" + + " user\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{ \"files\": [" + toReadAsJson + "," + toReadAsJson + "],\"type\":\"local\"}',\n" + + " '{\"type\": \"json\"}',\n" + + " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n" + + " )\n" + + ") where user like '%ot%'", + null, + false, + false, + false, + context, + null + ), + SqlStatementResourceTest.makeOkRequest() + ).getEntity(); + + Assert.assertEquals(ImmutableList.of( + new PageInformation(0, 2L, 128L, 0, 0), + new PageInformation(1, 2L, 132L, 1, 1), + new PageInformation(2, 2L, 128L, 0, 2), + new PageInformation(3, 4L, 228L, 1, 3) + ), sqlStatementResult.getResultSetInformation().getPages()); + + + List> rows = new ArrayList<>(); + rows.add(ImmutableList.of(1466985600000L, "Lsjbot")); + rows.add(ImmutableList.of(1466985600000L, "Lsjbot")); + rows.add(ImmutableList.of(1466985600000L, "Beau.bot")); + rows.add(ImmutableList.of(1466985600000L, "Beau.bot")); + rows.add(ImmutableList.of(1466985600000L, "Lsjbot")); + rows.add(ImmutableList.of(1466985600000L, "Lsjbot")); + rows.add(ImmutableList.of(1466985600000L, "TaxonBot")); + rows.add(ImmutableList.of(1466985600000L, "TaxonBot")); + rows.add(ImmutableList.of(1466985600000L, "GiftBot")); + rows.add(ImmutableList.of(1466985600000L, "GiftBot")); + + + Assert.assertEquals(rows, SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults( + sqlStatementResult.getQueryId(), + null, + ResultFormat.ARRAY.name(), + SqlStatementResourceTest.makeOkRequest() + ))); + + Assert.assertEquals(rows.subList(0, 2), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults( + sqlStatementResult.getQueryId(), + 0L, + ResultFormat.ARRAY.name(), + SqlStatementResourceTest.makeOkRequest() + ))); + + Assert.assertEquals(rows.subList(2, 4), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults( + sqlStatementResult.getQueryId(), + 1L, + ResultFormat.ARRAY.name(), + SqlStatementResourceTest.makeOkRequest() + ))); + + Assert.assertEquals(rows.subList(4, 6), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults( + sqlStatementResult.getQueryId(), + 2L, + ResultFormat.ARRAY.name(), + SqlStatementResourceTest.makeOkRequest() + ))); + + Assert.assertEquals(rows.subList(6, 10), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults( + sqlStatementResult.getQueryId(), + 3L, + ResultFormat.ARRAY.name(), + SqlStatementResourceTest.makeOkRequest() + ))); + } + @Test public void testResultFormat() throws Exception { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 2fc2489ab7c6..3caf1c4ed574 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -1350,16 +1350,12 @@ public Pair, List>> } MSQTaskReportPayload payload = getPayloadOrThrow(controllerId); - verifyCounters(payload.getCounters()); - verifyWorkerCount(payload.getCounters()); - if (payload.getStatus().getErrorReport() != null) { throw new ISE("Query %s failed due to %s", sql, payload.getStatus().getErrorReport().toString()); } else { MSQControllerTask msqControllerTask = indexingServiceClient.getMSQControllerTask(controllerId); - final MSQSpec spec = msqControllerTask.getQuerySpec(); final List rows; @@ -1410,6 +1406,10 @@ public Pair, List>> } log.info("Found spec: %s", objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(spec)); + + verifyCounters(payload.getCounters()); + verifyWorkerCount(payload.getCounters()); + return new Pair<>(spec, Pair.of(payload.getResults().getSignature(), rows)); } }