Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit pages size to a configurable limit #14994

Merged
merged 12 commits into from
Oct 12, 2023
1 change: 1 addition & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ The following table lists the context parameters for the MSQ task engine:
| `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error. <br /><br /> | `false` |
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
| `selectDestination` | SELECT<br /><br /> Controls where the final result of the select query is written. <br />Use `taskReport`(the default) to write select results to the task report. <b> This is not scalable since task reports size explodes for large results </b> <br/>Use `durableStorage` to write results to durable storage location. <b>For large results sets, its recommended to use `durableStorage` </b>. To configure durable storage see [`this`](#durable-storage) section. | `taskReport` |
| `rowsPerPage` | SELECT<br /><br />The number of rows per page to target. The actual number of rows per page may be somewhat higher or lower than this number. In most cases, use the default.<br /> This property comes into effect only when `selectDestination` is set to `durableStorage` | 100000 |
LakshSingla marked this conversation as resolved.
Show resolved Hide resolved

## Joins

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1556,8 +1556,13 @@ private static QueryDefinition makeQueryDefinition(
shuffleSpecFactory = ShuffleSpecFactories.singlePartition();
queryToPlan = querySpec.getQuery();
} else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) {
// we add a final stage which generates one partition per worker.
shuffleSpecFactory = ShuffleSpecFactories.globalSortWithMaxPartitionCount(tuningConfig.getMaxNumWorkers());

shuffleSpecFactory = (clusterBy, aggregate) ->
new GlobalSortTargetSizeShuffleSpec(
clusterBy,
MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()),
aggregate
);
LakshSingla marked this conversation as resolved.
Show resolved Hide resolved
queryToPlan = querySpec.getQuery();
} else {
throw new ISE("Unsupported destination [%s]", querySpec.getDestination());
Expand Down Expand Up @@ -1635,22 +1640,25 @@ private static QueryDefinition makeQueryDefinition(
return queryDef;
} else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) {

// attaching new query results stage always.
// attaching new query results stage if the final stage does sort during shuffle so that results are ordered.
StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition();
final QueryDefinitionBuilder builder = QueryDefinition.builder();
for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {
builder.add(StageDefinition.builder(stageDef));
}

builder.add(StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
.signature(finalShuffleStageDef.getSignature())
.shuffleSpec(null)
.processorFactory(new QueryResultFrameProcessorFactory())
);
if (finalShuffleStageDef.doesSortDuringShuffle()) {
final QueryDefinitionBuilder builder = QueryDefinition.builder();
for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could use org.apache.druid.msq.kernel.QueryDefinitionBuilder#addAll(org.apache.druid.msq.kernel.QueryDefinition) here

builder.add(StageDefinition.builder(stageDef));
}

return builder.build();
builder.add(StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
.signature(finalShuffleStageDef.getSignature())
.shuffleSpec(null)
.processorFactory(new QueryResultFrameProcessorFactory())
);
return builder.build();
} else {
return queryDef;
}
} else {
throw new ISE("Unsupported destination [%s]", querySpec.getDestination());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@


import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.Objects;

/**
Expand All @@ -39,6 +39,14 @@ public class PageInformation
@Nullable
private final Long sizeInBytes;

// Worker field should not flow to the users of SqlStatementResource API since users should not care about worker
@Nullable
private final Integer worker;

// Partition field should not flow to the users of SqlStatementResource API since users should not care about partitions
@Nullable
private final Integer partition;

@JsonCreator
public PageInformation(
@JsonProperty("id") long id,
Expand All @@ -49,8 +57,27 @@ public PageInformation(
this.id = id;
this.numRows = numRows;
this.sizeInBytes = sizeInBytes;
this.worker = null;
this.partition = null;
}


public PageInformation(
long id,
Long numRows,
Long sizeInBytes,
Integer worker,
Integer partition
)
{
this.id = id;
this.numRows = numRows;
this.sizeInBytes = sizeInBytes;
this.worker = worker;
this.partition = partition;
}


@JsonProperty
public long getId()
{
Expand All @@ -74,6 +101,20 @@ public Long getSizeInBytes()
}


@Nullable
@JsonIgnore
public Integer getWorker()
{
return worker;
}

@Nullable
@JsonIgnore
public Integer getPartition()
{
return partition;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -87,13 +128,13 @@ public boolean equals(Object o)
return id == that.id && Objects.equals(numRows, that.numRows) && Objects.equals(
sizeInBytes,
that.sizeInBytes
);
) && Objects.equals(worker, that.worker) && Objects.equals(partition, that.partition);
}

@Override
public int hashCode()
{
return Objects.hash(id, numRows, sizeInBytes);
return Objects.hash(id, numRows, sizeInBytes, worker, partition);
}

@Override
Expand All @@ -103,20 +144,8 @@ public String toString()
"id=" + id +
", numRows=" + numRows +
", sizeInBytes=" + sizeInBytes +
", worker=" + worker +
", partition=" + partition +
'}';
}

public static Comparator<PageInformation> getIDComparator()
{
return new PageComparator();
}

public static class PageComparator implements Comparator<PageInformation>
{
@Override
public int compare(PageInformation s1, PageInformation s2)
{
return Long.compare(s1.getId(), s2.getId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ private Response buildNonOkResponse(DruidException exception)
}

@SuppressWarnings("ReassignedVariable")
private Optional<ResultSetInformation> getSampleResults(
private Optional<ResultSetInformation> getResultSetInformation(
String queryId,
String dataSource,
SqlStatementState sqlStatementState,
Expand Down Expand Up @@ -598,7 +598,7 @@ private Optional<SqlStatementResult> getStatementStatus(String queryId, String c
taskResponse.getStatus().getCreatedTime(),
signature.orElse(null),
taskResponse.getStatus().getDuration(),
withResults ? getSampleResults(
withResults ? getResultSetInformation(
queryId,
msqControllerTask.getDataSource(),
sqlStatementState,
Expand Down Expand Up @@ -737,11 +737,16 @@ private Optional<Yielder<Object[]>> getResultYielder(
|| selectedPageId.equals(pageInformation.getId()))
.map(pageInformation -> {
try {
if (pageInformation.getWorker() == null || pageInformation.getPartition() == null) {
throw DruidException.defensive(
"Worker or partition number is null for page id [%d]",
pageInformation.getId()
);
}
return new FrameChannelSequence(standardImplementation.openChannel(
finalStage.getId(),
(int) pageInformation.getId(),
(int) pageInformation.getId()
// we would always have partition number == worker number
pageInformation.getWorker(),
pageInformation.getPartition()
));
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public class MultiStageQueryContext
public static final String CTX_ROWS_PER_SEGMENT = "rowsPerSegment";
static final int DEFAULT_ROWS_PER_SEGMENT = 3000000;

public static final String CTX_ROWS_PER_PAGE = "rowsPerPage";
static final int DEFAULT_ROWS_PER_PAGE = 100000;

public static final String CTX_ROWS_IN_MEMORY = "rowsInMemory";
// Lower than the default to minimize the impact of per-row overheads that are not accounted for by
// OnheapIncrementalIndex. For example: overheads related to creating bitmaps during persist.
Expand Down Expand Up @@ -206,6 +209,15 @@ public static int getRowsPerSegment(final QueryContext queryContext)
);
}

public static int getRowsPerPage(final QueryContext queryContext)
{
return queryContext.getInt(
CTX_ROWS_PER_PAGE,
DEFAULT_ROWS_PER_PAGE
);
}


public static MSQSelectDestination getSelectDestination(final QueryContext queryContext)
{
return QueryContexts.getAsEnum(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,13 @@ public static SqlStatementState getSqlStatementState(TaskStatusPlus taskStatusPl
* <ol>
* <li>{@link DataSourceMSQDestination} a single page is returned which adds all the counters of {@link SegmentGenerationProgressCounter.Snapshot}</li>
* <li>{@link TaskReportMSQDestination} a single page is returned which adds all the counters of {@link ChannelCounters}</li>
* <li>{@link DurableStorageMSQDestination} a page is returned for each worker which has generated output rows. The list is sorted on page Id.
* If the worker generated 0 rows, we do no populated a page for it. {@link PageInformation#id} is equal to the worker number</li>
* <li>{@link DurableStorageMSQDestination} a page is returned for each partition, worker which has generated output rows. The pages are populated in the following order:
* <ul>
* <li>For each partition from 0 to N</li>
* <li>For each worker from 0 to M</li>
* <li>If num rows for that partition,worker combination is 0, create a page</li>
* so that we maintain the record ordering.
* </ul>
* </ol>
*/
public static Optional<List<PageInformation>> populatePageList(
Expand All @@ -155,9 +160,9 @@ public static Optional<List<PageInformation>> populatePageList(
if (msqTaskReportPayload.getStages() == null || msqTaskReportPayload.getCounters() == null) {
return Optional.empty();
}
int finalStage = msqTaskReportPayload.getStages().getStages().size() - 1;
MSQStagesReport.Stage finalStage = getFinalStage(msqTaskReportPayload);
CounterSnapshotsTree counterSnapshotsTree = msqTaskReportPayload.getCounters();
Map<Integer, CounterSnapshots> workerCounters = counterSnapshotsTree.snapshotForStage(finalStage);
Map<Integer, CounterSnapshots> workerCounters = counterSnapshotsTree.snapshotForStage(finalStage.getStageNumber());
if (workerCounters == null || workerCounters.isEmpty()) {
return Optional.empty();
}
Expand Down Expand Up @@ -193,27 +198,56 @@ public static Optional<List<PageInformation>> populatePageList(
}

} else if (msqDestination instanceof DurableStorageMSQDestination) {
List<PageInformation> pageList = new ArrayList<>();
for (Map.Entry<Integer, CounterSnapshots> counterSnapshots : workerCounters.entrySet()) {
long rows = 0L;
long size = 0L;
QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getValue().getMap().getOrDefault("output", null);
if (queryCounterSnapshot != null && queryCounterSnapshot instanceof ChannelCounters.Snapshot) {
rows += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getRows()).sum();
size += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getBytes()).sum();
}
// do not populate a page if the worker generated 0 rows.
if (rows != 0L) {
pageList.add(new PageInformation(counterSnapshots.getKey(), rows, size));
}
}
pageList.sort(PageInformation.getIDComparator());
return Optional.of(pageList);

return populatePagesForDurableStorageDestination(finalStage, workerCounters);
} else {
return Optional.empty();
}
}

private static Optional<List<PageInformation>> populatePagesForDurableStorageDestination(
MSQStagesReport.Stage finalStage,
Map<Integer, CounterSnapshots> workerCounters
)
{
// figure out number of partitions and number of workers
int totalPartitions = finalStage.getPartitionCount();
int totalWorkerCount = finalStage.getWorkerCount();

if (totalPartitions == -1) {
throw DruidException.defensive("Expected partition count to be set for stage[%d]", finalStage);
}
if (totalWorkerCount == -1) {
throw DruidException.defensive("Expected worker count to be set for stage[%d]", finalStage);
}


List<PageInformation> pages = new ArrayList<>();
for (int partitionNumber = 0; partitionNumber < totalPartitions; partitionNumber++) {
for (int workerNumber = 0; workerNumber < totalWorkerCount; workerNumber++) {
CounterSnapshots workerCounter = workerCounters.get(workerNumber);

if (workerCounter != null && workerCounter.getMap() != null) {
QueryCounterSnapshot channelCounters = workerCounter.getMap().get("output");

if (channelCounters != null && channelCounters instanceof ChannelCounters.Snapshot) {
long rows = 0L;
long size = 0L;

if (((ChannelCounters.Snapshot) channelCounters).getRows().length > partitionNumber) {
rows += ((ChannelCounters.Snapshot) channelCounters).getRows()[partitionNumber];
size += ((ChannelCounters.Snapshot) channelCounters).getBytes()[partitionNumber];
}
if (rows != 0L) {
pages.add(new PageInformation(pages.size(), rows, size, workerNumber, partitionNumber));
}
}
}
}
}
return Optional.of(pages);
}

public static Optional<SqlStatementResult> getExceptionPayload(
String queryId,
TaskStatusResponse taskResponse,
Expand Down Expand Up @@ -336,6 +370,7 @@ public static MSQStagesReport.Stage getFinalStage(MSQTaskReportPayload msqTaskRe
}
return null;
}

public static Map<String, Object> getQueryExceptionDetails(Map<String, Object> payload)
{
return getMap(getMap(payload, "status"), "errorReport");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class MSQSelectTest extends MSQTestBase
public static final Map<String, Object> QUERY_RESULTS_WITH_DURABLE_STORAGE_CONTEXT =
ImmutableMap.<String, Object>builder()
.putAll(DURABLE_STORAGE_MSQ_CONTEXT)
.put(MultiStageQueryContext.CTX_ROWS_PER_PAGE, 2)
.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
MSQSelectDestination.DURABLESTORAGE.getName().toLowerCase(Locale.ENGLISH)
Expand Down Expand Up @@ -215,16 +216,6 @@ public void testSelectOnFoo()
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing the assertion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its was more of an influx thing. Fixed it

.setExpectedResultRows(ImmutableList.of(
new Object[]{1L, ""},
new Object[]{1L, "10.1"},
Expand Down Expand Up @@ -341,16 +332,6 @@ public void testSelectOnFooDuplicateColumnNames()
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1L, ""},
new Object[]{1L, "10.1"},
Expand Down
Loading