Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSQ controller: Support in-memory shuffles; towards JVM reuse. #16168

Merged
merged 21 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions extensions-core/multi-stage-query/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@
<artifactId>datasketches-memory</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-core</artifactId>
Expand Down Expand Up @@ -288,6 +293,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,95 +19,95 @@

package org.apache.druid.msq.exec;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.client.ControllerChatHandler;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;

import javax.annotation.Nullable;
import java.util.List;

/**
* Interface for the controller of a multi-stage query.
* Interface for the controller of a multi-stage query. Each Controller is specific to a particular query.
*
* @see WorkerImpl the production implementation
*/
public interface Controller
{
/**
* POJO for capturing the status of a controller task that is currently running.
*/
class RunningControllerStatus
{
private final String id;

@JsonCreator
public RunningControllerStatus(String id)
{
this.id = id;
}

@JsonProperty("id")
public String getId()
{
return id;
}
}

/**
* Unique task/query ID for the batch query run by this controller.
*
* Controller IDs must be globally unique. For tasks, this is the task ID from {@link MSQControllerTask#getId()}.
*/
String id();

/**
* The task which this controller runs.
*/
MSQControllerTask task();
String queryId();

/**
* Runs the controller logic in the current thread. Surrounding classes provide the execution thread.
*/
TaskStatus run() throws Exception;
void run(QueryListener listener) throws Exception;

/**
* Terminate the query DAG upon a cancellation request.
* Terminate the controller upon a cancellation request. Causes a concurrently-running {@link #run} method in
* a separate thread to cancel all outstanding work and exit.
*/
void stopGracefully();
void stop();

// Worker-to-controller messages

/**
* Accepts a {@link PartialKeyStatisticsInformation} and updates the controller key statistics information. If all key
* statistics have been gathered, enqueues the task with the {@link WorkerSketchFetcher} to generate partiton boundaries.
* This is intended to be called by the {@link ControllerChatHandler}.
*
* @see ControllerClient#postPartialKeyStatistics(StageId, int, PartialKeyStatisticsInformation)
*/
void updatePartialKeyStatisticsInformation(
int stageNumber,
int workerNumber,
Object partialKeyStatisticsInformationObject
);

/**
* Sent by workers when they finish reading their input, in cases where they would not otherwise be calling
* {@link #updatePartialKeyStatisticsInformation(int, int, Object)}.
*
* @see ControllerClient#postDoneReadingInput(StageId, int)
*/
void updatePartialKeyStatisticsInformation(int stageNumber, int workerNumber, Object partialKeyStatisticsInformationObject);
void doneReadingInput(int stageNumber, int workerNumber);

/**
* System error reported by a subtask. Note that the errors are organized by
* taskId, not by query/stage/worker, because system errors are associated
* with a task rather than a specific query/stage/worker execution context.
*
* @see ControllerClient#postWorkerError(String, MSQErrorReport)
*/
void workerError(MSQErrorReport errorReport);

/**
* System warning reported by a subtask. Indicates that the worker has encountered a non-lethal error. Worker should
* continue its execution in such a case. If the worker wants to report an error and stop its execution,
* please use {@link Controller#workerError}
*
* @see ControllerClient#postWorkerWarning(List)
*/
void workerWarning(List<MSQErrorReport> errorReports);

/**
* Periodic update of {@link CounterSnapshots} from subtasks.
*
* @see ControllerClient#postCounters(String, CounterSnapshotsTree)
*/
void updateCounters(String taskId, CounterSnapshotsTree snapshotsTree);

/**
* Reports that results are ready for a subtask.
*
* @see ControllerClient#postResultsComplete(StageId, int, Object)
*/
void resultsComplete(
String queryId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;

Expand All @@ -43,6 +44,21 @@ void postPartialKeyStatistics(
PartialKeyStatisticsInformation partialKeyStatisticsInformation
) throws IOException;

/**
* Client side method to tell the controller that a particular stage and worker is done reading its input.
*
* The main purpose of this call is to let the controller know when it can stop running the input stage. This helps
* execution roll smoothly from stage to stage during pipelined execution. For backwards-compatibility reasons
* (this is a newer method, only really useful when pipelining), this call should be skipped if the query is not
* pipelining stages.
*
* Only used when {@link StageDefinition#doesSortDuringShuffle()} and *not*
* {@link StageDefinition#mustGatherResultKeyStatistics()}. When the stage gathers result key statistics, workers
* call {@link #postPartialKeyStatistics(StageId, int, PartialKeyStatisticsInformation)} instead, which has the same
* effect of telling the controller that the worker is done reading its input.
*/
void postDoneReadingInput(StageId stageId, int workerNumber) throws IOException;

/**
* Client-side method to update the controller with counters for a particular stage and worker. The controller uses
* this to compile live reports, track warnings generated etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,44 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.server.DruidNode;

/**
* Context used by multi-stage query controllers.
*
* Useful because it allows test fixtures to provide their own implementations.
* Context used by multi-stage query controllers. Useful because it allows test fixtures to provide their own
* implementations.
*/
public interface ControllerContext
{
ServiceEmitter emitter();
/**
* Configuration for {@link org.apache.druid.msq.kernel.controller.ControllerQueryKernel}.
*/
ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec, QueryDefinition queryDef);

/**
* Callback from the controller implementation to "register" the controller. Used in the indexing task implementation
* to set up the task chat web service.
*/
void registerController(Controller controller, Closer closer);

/**
* JSON-enabled object mapper.
*/
ObjectMapper jsonMapper();

/**
* Emit a metric using a {@link ServiceEmitter}.
*/
void emitMetric(String metric, Number value);

/**
* Provides a way for tasks to request injectable objects. Useful because tasks are not able to request injection
* at the time of server startup, because the server doesn't know what tasks it will be running.
Expand All @@ -51,32 +71,33 @@ public interface ControllerContext
DruidNode selfNode();

/**
* Provide access to the Coordinator service.
* Provides an {@link InputSpecSlicer} that slices {@link TableInputSpec} into {@link SegmentsInputSlice}.
*/
CoordinatorClient coordinatorClient();
InputSpecSlicer newTableInputSpecSlicer();

/**
* Provide access to segment actions in the Overlord.
* Provide access to segment actions in the Overlord. Only called for ingestion queries, i.e., where
* {@link MSQSpec#getDestination()} is {@link org.apache.druid.msq.indexing.destination.DataSourceMSQDestination}.
*/
TaskActionClient taskActionClient();

/**
* Provides services about workers: starting, canceling, obtaining status.
*
* @param queryId query ID
* @param querySpec query spec
* @param queryKernelConfig config from {@link #queryKernelConfig(MSQSpec, QueryDefinition)}
* @param workerFailureListener listener that receives callbacks when workers fail
*/
WorkerManagerClient workerManager();

/**
* Callback from the controller implementation to "register" the controller. Used in the indexing task implementation
* to set up the task chat web service.
*/
void registerController(Controller controller, Closer closer);
WorkerManager newWorkerManager(
String queryId,
MSQSpec querySpec,
ControllerQueryKernelConfig queryKernelConfig,
WorkerFailureListener workerFailureListener
);

/**
* Client for communicating with workers.
*/
WorkerClient taskClientFor(Controller controller);
/**
* Writes controller task report.
*/
void writeReports(String controllerTaskId, TaskReport.ReportMap reports);
WorkerClient newWorkerClient();
}
Loading
Loading