Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
adarshsanjeev committed Oct 5, 2023
1 parent 8ca4c2b commit e910f0b
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ The following table lists the context parameters for the MSQ task engine:
| `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error. <br /><br /> | `false` |
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
| `selectDestination` | SELECT<br /><br /> Controls where the final result of the select query is written. <br />Use `taskReport`(the default) to write select results to the task report. <b> This is not scalable since task reports size explodes for large results </b> <br/>Use `durableStorage` to write results to durable storage location. <b>For large results sets, its recommended to use `durableStorage` </b>. To configure durable storage see [`this`](#durable-storage) section. | `taskReport` |
| `segmentLoadWait` | INSERT, REPLACE<br /><br /> Whether the controller should wait for segments to be loaded before exiting. If this is true, the controller queries the broker and waits till the segments created (if any) have been loaded by the load rules. The controller also provides this information in the live reports and task reports. If this is false, the controller exits immediately after finishing the query. | `false` |
| `waitTillSegmentsLoad` | INSERT, REPLACE<br /><br /> If set, the ingest query waits for the generated segment to be loaded before exiting, else the ingest query exits without waiting. The task and live reports contain the information about the status of loading segments if this flag is set. This will ensure that any future queries made after the ingestion exits will include results from the ingestion. The drawback is that the controller task will stall till the segments are loaded. | `false` |

## Joins

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,8 @@ public TaskStatus runTask(final Closer closer)
if (shouldWaitForSegmentLoad && segmentLoadWaiter != null) {
// If successful, there are segments created and segment load is enabled, segmentLoadWaiter should wait
// for them to become available.
log.info("Controller will now wait for segments to be loaded. The query has already finished executing,"
+ " and results will be included once the segments are loaded, even if this query is cancelled now.");
segmentLoadWaiter.waitForSegmentsToLoad();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public void waitForSegmentsToLoad()
final AtomicReference<Boolean> hasAnySegmentBeenLoaded = new AtomicReference<>(false);
try {
FutureUtils.getUnchecked(executorService.submit(() -> {
long lastLogMillis = -TimeUnit.MINUTES.toMillis(1);
try {
while (!(hasAnySegmentBeenLoaded.get() && versionLoadStatusReference.get().isLoadingComplete())) {
// Check the timeout and exit if exceeded.
Expand All @@ -159,11 +160,14 @@ public void waitForSegmentsToLoad()
return;
}

log.debug(
"Fetching segment load status for datasource[%s] from broker for segment versions[%s]",
datasource,
versionsInClauseString
);
if (runningMillis - lastLogMillis >= TimeUnit.MINUTES.toMillis(1)) {
lastLogMillis = runningMillis;
log.info(
"Fetching segment load status for datasource[%s] from broker for segment versions[%s]",
datasource,
versionsInClauseString
);
}

// Fetch the load status from the broker
VersionLoadStatus loadStatus = fetchLoadStatusFromBroker();
Expand Down Expand Up @@ -240,7 +244,10 @@ private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception
request.setContent(MediaType.APPLICATION_JSON, objectMapper.writeValueAsBytes(sqlQuery));
String response = brokerClient.sendQuery(request);

if (response.trim().isEmpty()) {
if (response == null) {
// Unable to query broker
return new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated);
} else if (response.trim().isEmpty()) {
// If no segments are returned for a version, all segments have been dropped by a drop rule.
return new VersionLoadStatus(0, 0, 0, 0, 0);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class MultiStageQueryContext

public static final String CTX_FAULT_TOLERANCE = "faultTolerance";
public static final boolean DEFAULT_FAULT_TOLERANCE = false;
public static final String CTX_SEGMENT_LOAD_WAIT = "segmentLoadWait";
public static final String CTX_SEGMENT_LOAD_WAIT = "waitTillSegmentsLoad";
public static final boolean DEFAULT_SEGMENT_LOAD_WAIT = false;
public static final String CTX_MAX_INPUT_BYTES_PER_WORKER = "maxInputBytesPerWorker";

Expand Down

0 comments on commit e910f0b

Please sign in to comment.