diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md
index 5e80e318b8c8..9ec50de0a9b1 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -248,7 +248,7 @@ The following table lists the context parameters for the MSQ task engine:
| `selectDestination` | SELECT
Controls where the final result of the select query is written. Use `taskReport`(the default) to write select results to the task report. This is not scalable since task reports size explodes for large results Use `durableStorage` to write results to durable storage location. For large results sets, its recommended to use `durableStorage` . To configure durable storage see [`this`](#durable-storage) section. | `taskReport` |
| `waitTillSegmentsLoad` | INSERT, REPLACE
If set, the ingest query waits for the generated segment to be loaded before exiting, else the ingest query exits without waiting. The task and live reports contain the information about the status of loading segments if this flag is set. This will ensure that any future queries made after the ingestion exits will include results from the ingestion. The drawback is that the controller task will stall till the segments are loaded. | `false` |
| `includeSegmentSource` | SELECT, INSERT, REPLACE
Controls the sources, which will be queried for results in addition to the segments present on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only non-realtime (published and used) segments will be downloaded from deep storage. If this value is `REALTIME`, results will also be included from realtime tasks. | `NONE` |
-
+| `rowsPerPage` | SELECT
The number of rows per page to target. The actual number of rows per page may be somewhat higher or lower than this number. In most cases, use the default. This property comes into effect only when `selectDestination` is set to `durableStorage` | 100000 |
## Joins
Joins in multi-stage queries use one of two algorithms based on what you set the [context parameter](#context-parameters) `sqlJoinAlgorithm` to:
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 6f46007d93c0..3dc2e099c5e8 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -145,7 +145,6 @@
import org.apache.druid.msq.input.table.DataSegmentWithLocation;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.input.table.TableInputSpecSlicer;
-import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.StageDefinition;
@@ -1663,12 +1662,7 @@ private static QueryDefinition makeQueryDefinition(
final ShuffleSpecFactory shuffleSpecFactory;
if (MSQControllerTask.isIngestion(querySpec)) {
- shuffleSpecFactory = (clusterBy, aggregate) ->
- new GlobalSortTargetSizeShuffleSpec(
- clusterBy,
- tuningConfig.getRowsPerSegment(),
- aggregate
- );
+ shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(tuningConfig.getRowsPerSegment());
if (!columnMappings.hasUniqueOutputColumnNames()) {
// We do not expect to hit this case in production, because the SQL validator checks that column names
@@ -1693,8 +1687,9 @@ private static QueryDefinition makeQueryDefinition(
shuffleSpecFactory = ShuffleSpecFactories.singlePartition();
queryToPlan = querySpec.getQuery();
} else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) {
- // we add a final stage which generates one partition per worker.
- shuffleSpecFactory = ShuffleSpecFactories.globalSortWithMaxPartitionCount(tuningConfig.getMaxNumWorkers());
+ shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(
+ MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
+ );
queryToPlan = querySpec.getQuery();
} else {
throw new ISE("Unsupported destination [%s]", querySpec.getDestination());
@@ -1772,27 +1767,29 @@ private static QueryDefinition makeQueryDefinition(
return queryDef;
} else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) {
- // attaching new query results stage always.
+ // attaching new query results stage if the final stage does sort during shuffle so that results are ordered.
StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition();
- final QueryDefinitionBuilder builder = QueryDefinition.builder();
- for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {
- builder.add(StageDefinition.builder(stageDef));
+ if (finalShuffleStageDef.doesSortDuringShuffle()) {
+ final QueryDefinitionBuilder builder = QueryDefinition.builder();
+ builder.addAll(queryDef);
+ builder.add(StageDefinition.builder(queryDef.getNextStageNumber())
+ .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
+ .maxWorkerCount(tuningConfig.getMaxNumWorkers())
+ .signature(finalShuffleStageDef.getSignature())
+ .shuffleSpec(null)
+ .processorFactory(new QueryResultFrameProcessorFactory())
+ );
+ return builder.build();
+ } else {
+ return queryDef;
}
-
- builder.add(StageDefinition.builder(queryDef.getNextStageNumber())
- .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
- .maxWorkerCount(tuningConfig.getMaxNumWorkers())
- .signature(finalShuffleStageDef.getSignature())
- .shuffleSpec(null)
- .processorFactory(new QueryResultFrameProcessorFactory())
- );
-
- return builder.build();
} else {
throw new ISE("Unsupported destination [%s]", querySpec.getDestination());
}
}
+
+
private static DataSchema generateDataSchema(
MSQSpec querySpec,
RowSignature querySignature,
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java
index 971aa9b7e0c7..d28439c0f8e0 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java
@@ -20,6 +20,7 @@
package org.apache.druid.msq.querykit;
import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
+import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec;
import org.apache.druid.msq.kernel.MixShuffleSpec;
/**
@@ -53,4 +54,17 @@ public static ShuffleSpecFactory globalSortWithMaxPartitionCount(final int parti
{
return (clusterBy, aggregate) -> new GlobalSortMaxCountShuffleSpec(clusterBy, partitions, aggregate);
}
+
+ /**
+ * Factory that produces globally sorted partitions of a target size.
+ */
+ public static ShuffleSpecFactory getGlobalSortWithTargetSize(int targetSize)
+ {
+ return (clusterBy, aggregate) ->
+ new GlobalSortTargetSizeShuffleSpec(
+ clusterBy,
+ targetSize,
+ aggregate
+ );
+ }
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/PageInformation.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/PageInformation.java
index 6db1f371af7c..f50716f108cd 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/PageInformation.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/PageInformation.java
@@ -21,11 +21,11 @@
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
-import java.util.Comparator;
import java.util.Objects;
/**
@@ -39,6 +39,14 @@ public class PageInformation
@Nullable
private final Long sizeInBytes;
+ // Worker field should not flow to the users of SqlStatementResource API since users should not care about worker
+ @Nullable
+ private final Integer worker;
+
+ // Partition field should not flow to the users of SqlStatementResource API since users should not care about partitions
+ @Nullable
+ private final Integer partition;
+
@JsonCreator
public PageInformation(
@JsonProperty("id") long id,
@@ -49,8 +57,27 @@ public PageInformation(
this.id = id;
this.numRows = numRows;
this.sizeInBytes = sizeInBytes;
+ this.worker = null;
+ this.partition = null;
}
+
+ public PageInformation(
+ long id,
+ Long numRows,
+ Long sizeInBytes,
+ Integer worker,
+ Integer partition
+ )
+ {
+ this.id = id;
+ this.numRows = numRows;
+ this.sizeInBytes = sizeInBytes;
+ this.worker = worker;
+ this.partition = partition;
+ }
+
+
@JsonProperty
public long getId()
{
@@ -74,6 +101,20 @@ public Long getSizeInBytes()
}
+ @Nullable
+ @JsonIgnore
+ public Integer getWorker()
+ {
+ return worker;
+ }
+
+ @Nullable
+ @JsonIgnore
+ public Integer getPartition()
+ {
+ return partition;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -87,13 +128,13 @@ public boolean equals(Object o)
return id == that.id && Objects.equals(numRows, that.numRows) && Objects.equals(
sizeInBytes,
that.sizeInBytes
- );
+ ) && Objects.equals(worker, that.worker) && Objects.equals(partition, that.partition);
}
@Override
public int hashCode()
{
- return Objects.hash(id, numRows, sizeInBytes);
+ return Objects.hash(id, numRows, sizeInBytes, worker, partition);
}
@Override
@@ -103,20 +144,8 @@ public String toString()
"id=" + id +
", numRows=" + numRows +
", sizeInBytes=" + sizeInBytes +
+ ", worker=" + worker +
+ ", partition=" + partition +
'}';
}
-
- public static Comparator getIDComparator()
- {
- return new PageComparator();
- }
-
- public static class PageComparator implements Comparator
- {
- @Override
- public int compare(PageInformation s1, PageInformation s2)
- {
- return Long.compare(s1.getId(), s2.getId());
- }
- }
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
index dd4e08403006..91145985ee13 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
@@ -504,7 +504,7 @@ private Response buildNonOkResponse(DruidException exception)
}
@SuppressWarnings("ReassignedVariable")
- private Optional getSampleResults(
+ private Optional getResultSetInformation(
String queryId,
String dataSource,
SqlStatementState sqlStatementState,
@@ -617,7 +617,7 @@ private Optional getStatementStatus(
taskResponse.getStatus().getCreatedTime(),
signature.orElse(null),
taskResponse.getStatus().getDuration(),
- withResults ? getSampleResults(
+ withResults ? getResultSetInformation(
queryId,
msqControllerTask.getDataSource(),
sqlStatementState,
@@ -782,11 +782,16 @@ private Optional> getResultYielder(
|| selectedPageId.equals(pageInformation.getId()))
.map(pageInformation -> {
try {
+ if (pageInformation.getWorker() == null || pageInformation.getPartition() == null) {
+ throw DruidException.defensive(
+ "Worker or partition number is null for page id [%d]",
+ pageInformation.getId()
+ );
+ }
return new FrameChannelSequence(standardImplementation.openChannel(
finalStage.getId(),
- (int) pageInformation.getId(),
- (int) pageInformation.getId()
- // we would always have partition number == worker number
+ pageInformation.getWorker(),
+ pageInformation.getPartition()
));
}
catch (Exception e) {
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 613fac6203c2..77b11a287687 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -115,6 +115,9 @@ public class MultiStageQueryContext
public static final String CTX_ROWS_PER_SEGMENT = "rowsPerSegment";
static final int DEFAULT_ROWS_PER_SEGMENT = 3000000;
+ public static final String CTX_ROWS_PER_PAGE = "rowsPerPage";
+ static final int DEFAULT_ROWS_PER_PAGE = 100000;
+
public static final String CTX_ROWS_IN_MEMORY = "rowsInMemory";
// Lower than the default to minimize the impact of per-row overheads that are not accounted for by
// OnheapIncrementalIndex. For example: overheads related to creating bitmaps during persist.
@@ -238,6 +241,15 @@ public static int getRowsPerSegment(final QueryContext queryContext)
);
}
+ public static int getRowsPerPage(final QueryContext queryContext)
+ {
+ return queryContext.getInt(
+ CTX_ROWS_PER_PAGE,
+ DEFAULT_ROWS_PER_PAGE
+ );
+ }
+
+
public static MSQSelectDestination getSelectDestination(final QueryContext queryContext)
{
return QueryContexts.getAsEnum(
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
index 86aed98f063e..9481fc60541b 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
@@ -143,8 +143,13 @@ public static SqlStatementState getSqlStatementState(TaskStatusPlus taskStatusPl
*
*
{@link DataSourceMSQDestination} a single page is returned which adds all the counters of {@link SegmentGenerationProgressCounter.Snapshot}
*
{@link TaskReportMSQDestination} a single page is returned which adds all the counters of {@link ChannelCounters}
- *
{@link DurableStorageMSQDestination} a page is returned for each worker which has generated output rows. The list is sorted on page Id.
- * If the worker generated 0 rows, we do no populated a page for it. {@link PageInformation#id} is equal to the worker number
+ *
{@link DurableStorageMSQDestination} a page is returned for each partition, worker which has generated output rows. The pages are populated in the following order:
+ *
+ *
For each partition from 0 to N
+ *
For each worker from 0 to M
+ *
If num rows for that partition,worker combination is 0, create a page