Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSQ controller: Support in-memory shuffles; towards JVM reuse. #16168

Merged
merged 21 commits into from
May 1, 2024

Conversation

gianm
Copy link
Contributor

@gianm gianm commented Mar 19, 2024

This patch contains two controller changes that make progress towards a lower-latency MSQ.

These are both larger changes, but I developed them together and it was most straightforward to put them into a single PR rather than separating into multiple PRs, especially given they both involved changes in common files like ControllerImpl and ControllerContext.

Key classes:

These files are the most interesting IMO.

  • Controller
  • ControllerImpl
  • ControllerQueryKernel
  • ControllerQueryKernelConfig
  • ControllerQueryKernelUtils (especially computeStageGroups)
  • ControllerContext
  • MSQWorkerTaskLauncher

Key changes:

First, support for in-memory shuffles. The main feature of in-memory shuffles, as far as the controller is concerned, is that they are not fully buffered. That means that whenever a producer stage uses in-memory output, its consumer must run concurrently. The controller determines which stages run concurrently, and when they start and stop.

"Leapfrogging" allows any chain of sort-based stages to use in-memory shuffles even if we can only run two stages at once. For example, in a linear chain of stages 0 -> 1 -> 2 where all do sort-based shuffles, we can use in-memory shuffling for each one while only running two at once. (When stage 1 is done reading input and about to start writing its output, we can stop 0 and start 2.)

  1. New OutputChannelMode enum attached to WorkOrder that tells workers
    whether stage output should be in memory (MEMORY), or use local or durable
    storage.

  2. New logic in the ControllerQueryKernel to determine which stages can use
    in-memory shuffling (ControllerUtils#computeStageGroups) and to launch them
    at the appropriate time (ControllerQueryKernel#createNewKernels).

  3. New doneReadingInput method on Controller (passed down to the stage kernels)
    which allows stages to transition to POST_READING even if they are not
    gathering statistics. This is important because it enables "leapfrogging"
    for HASH_LOCAL_SORT shuffles, and for GLOBAL_SORT shuffles with 1 partition.

  4. Moved result-reading from ControllerContext#writeReports to new QueryListener
    interface, which ControllerImpl feeds results to row-by-row while the query
    is still running. Important so we can read query results from the final
    stage using an in-memory channel.

  5. New class ControllerQueryKernelConfig holds configs that control kernel
    behavior (such as whether to pipeline, maximum number of concurrent stages,
    etc). Generated by the ControllerContext.

Second, a refactor towards running workers in persistent JVMs that are able to cache data across queries. This is helpful because I believe we'll want to reuse JVMs and cached data for latency reasons.

  1. Move creation of WorkerManager and TableInputSpecSlicer to the
    ControllerContext, rather than ControllerImpl. This allows managing workers and
    work assignment differently when JVMs are reusable.

  2. Lift the Controller Jersey resource out from ControllerChatHandler to a
    reusable resource ControllerResource.

  3. Move memory introspection to a MemoryIntrospector interface, and introduce
    ControllerMemoryParameters that uses it. This makes it easier to run MSQ in
    process types other than Indexer and Peon.

Both of these areas will have follow-ups that make similar changes on the worker side.

This patch contains two controller changes that make progress towards a
lower-latency MSQ.

First, support for in-memory shuffles. The main feature of in-memory shuffles,
as far as the controller is concerned, is that they are not fully buffered. That
means that whenever a producer stage uses in-memory output, its consumer must run
concurrently. The controller determines which stages run concurrently, and when
they start and stop.

"Leapfrogging" allows any chain of sort-based stages to use in-memory shuffles
even if we can only run two stages at once. For example, in a linear chain of
stages 0 -> 1 -> 2 where all do sort-based shuffles, we can use in-memory shuffling
for each one while only running two at once. (When stage 1 is done reading input
and about to start writing its output, we can stop 0 and start 2.)

1) New OutputChannelMode enum attached to WorkOrders that tells workers
   whether stage output should be in memory (MEMORY), or use local or durable
   storage.

2) New logic in the ControllerQueryKernel to determine which stages can use
   in-memory shuffling (ControllerUtils#computeStageGroups) and to launch them
   at the appropriate time (ControllerQueryKernel#createNewKernels).

3) New "doneReadingInput" method on Controller (passed down to the stage kernels)
   which allows stages to transition to POST_READING even if they are not
   gathering statistics. This is important because it enables "leapfrogging"
   for HASH_LOCAL_SORT shuffles, and for GLOBAL_SORT shuffles with 1 partition.

4) Moved result-reading from ControllerContext#writeReports to new QueryListener
   interface, which ControllerImpl feeds results to row-by-row while the query
   is still running. Important so we can read query results from the final
   stage using an in-memory channel.

5) New class ControllerQueryKernelConfig holds configs that control kernel
   behavior (such as whether to pipeline, maximum number of concurrent stages,
   etc). Generated by the ControllerContext.

Second, a refactor towards running workers in persistent JVMs that are able to
cache data across queries. This is helpful because I believe we'll want to reuse
JVMs and cached data for latency reasons.

1) Move creation of WorkerManager and TableInputSpecSlicer to the
   ControllerContext, rather than ControllerImpl. This allows managing workers and
   work assignment differently when JVMs are reusable.

2) Lift the Controller Jersey resource out from ControllerChatHandler to a
   reusable resource.

3) Move memory introspection to a MemoryIntrospector interface, and introduce
   ControllerMemoryParameters that uses it. This makes it easier to run MSQ in
   process types other than Indexer and Peon.

Both of these areas will have follow-ups that make similar changes on the
worker side.
@github-actions github-actions bot added Area - Batch Ingestion Area - Querying Area - Dependencies Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Mar 19, 2024
@Consumes(MediaType.APPLICATION_JSON)
public Response httpPostWorkerError(
final MSQErrorReport errorReport,
@PathParam("taskId") final String taskId,

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'taskId' is never used.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was just moved; the parameter is unused in master as well

@Consumes(MediaType.APPLICATION_JSON)
public Response httpPostPartialKeyStatistics(
final Object partialKeyStatisticsObject,
@PathParam("queryId") final String queryId,

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'queryId' is never used.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was just moved; the parameter is unused in master as well

@@ -132,9 +72,9 @@
}

@JsonProperty("results")
public Yielder<Object[]> getResultYielder()
public List<Object[]> getResults()

Check notice

Code scanning / CodeQL

Exposing internal representation Note

getResults exposes the internal representation stored in field results. The value may be modified
after this call to getResults
.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We trust the caller to not modify the arrays here.

Copy link
Contributor

@LakshSingla LakshSingla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I am leaving a partial review, primarily centered around the computeStageGroups method. My understanding is incomplete, and there seem to be some assumptions baked into the code, which if clarified, would make the code easier to reason about for someone unfamiliar with the logic.

*/
public class StageGroup
{
private final List<StageId> stageIds;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that there's an implicit contract that the flow of the data between the stage group is linear (A -> B -> C ...) , and cannot be branched (
A
|
B
|
C D
| /
E
)
We should probably mention that. Unrelated, but is it subject to change in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is required to be linear at this point (mostly for simplicity). It could be changed in the future. I have added a comment.

}

/**
* List of stage IDs in this group.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned previously, there's an implicit relation between the stages in the list, and [A, B, C] is not the same as [A, C, B]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. I have added a comment.

/**
* Utilties for {@link ControllerQueryKernel}.
*/
public class ControllerUtils
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ControllerQueryKernelUtils seems appropriate. There are many classes prefixed with Controller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I have changed it.

return null;
}

private static void removeStageFlow(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Javadoc about preconditions and postconditions of the method, and what modifications it makes to the input maps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added javadocs.

// Two things happening here:
// 1) Stages cannot both stream and broadcast their output. This is because when streaming, we can only
// support a single reader.
// 2) Stages can only receive a single streamed input. This isn't strictly necessary, but it simplifies the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the signifcance of streamed input? There is probably some gap in my understanding of streamed, but would we call the data lying in durable storage ready to be read as streamed? Logically the stage can stream the output in that case, and per my understanding, this code would allow that to happen, but I wouldn't call it "streamed" input.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I really meant this to refer to output channel mode MEMORY. I have tried to make it more clear and have removed the word "stream". This comment now reads:

      // Two things happening here:
      //   1) Stages cannot use output mode MEMORY when broadcasting. This is because when using output mode MEMORY, we
      //      can only support a single reader.
      //   2) Downstream stages can only have a single input stage with output mode MEMORY. This isn't strictly
      //      necessary, but it simplifies the logic around concurrently launching stages.

I have also renamed various variables, like canStream to canUseMemoryOutput.


// 2) Pick some stage that can stream its output, and run that as well as all ready-to-run dependents.
StageId currentStageId = null;
for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an implicit assumption that the inflow's keySet is sorted, which is true since it is actually a TreeMap, however isn't reflected here. Perhaps we should tighten the contract that the inflow map is a TreeMap, instead of a Map (which can be a HashMap as well, which wouldn't work well with this logic)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, as far as I know, this logic doesn't assume the keySet is sorted. It's nice if it is, since it means the execution happens in a consistent order each time this method is run, but I don't think it's required for correctness. Unless I'm missing something.

At any rate, we do use sorted sets, because it is nice for the order to be consistent. There is no harm in tightening up the contract, so I did, to SortedSet.

} else {
final StageGroup priorGroup = stageGroups.get(stageGroups.size() - 1);
if (priorGroup.lastStageOutputChannelMode() == OutputChannelMode.MEMORY) {
// Prior group must run concurrently with this group.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this defy the logic of stage groups - each group runs individually, and all the stages within a group run simultanously. If the prior group runs concurrently with this group, shouldn't they be coalesced into a single group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a few paragraphs of class-level javadoc to discuss how "leapfrogging" works- that is the scenario where this can happen. I also added a mention here:

          // Prior group must run concurrently with this group. (Can happen when leapfrogging; see class-level Javadoc.)


final int maxStageGroupSizeAllowingForDownstreamConsumer;
if (queryDef.getStageDefinition(currentStageId).doesSortDuringShuffle()) {
// When the current group sorts, there's a pipeline break, so we can "leapfrog": close the prior group
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's this notion of a group sorting, that is tied to the stage's sorting.

When is a group considered to be sorting, when all, first, or the last of the stages sort? Logically, it should be first, but there's no indication in the code that implies that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final stage is the only one that can possibly sort, because of the check below that closes off the stage group if currentStageId "doesSortDuringShuffle()". I think the logic here would apply if any stage in the group sorted, though. (If any stage sorts, that causes a pipeline break that means the prior stage group would no longer be needed.)

final StageGroup priorGroup = stageGroups.get(stageGroups.size() - 1);
if (priorGroup.lastStageOutputChannelMode() == OutputChannelMode.MEMORY) {
// Prior group must run concurrently with this group.
maxStageGroupSize = config.getMaxConcurrentStages() - priorGroup.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this ever be 0, and break the logic. Partially related to the question posed above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. I added a comment with a reason why:

          // Note: priorGroup.size() is strictly less than config.getMaxConcurrentStages(), because the prior group
          // would have its size limited by maxStageGroupSizeAllowingForDownstreamConsumer below.

if (queryDef.getStageDefinition(currentStageId).doesSortDuringShuffle()) {
// When the current group sorts, there's a pipeline break, so we can "leapfrog": close the prior group
// before starting the downstream group.
maxStageGroupSizeAllowingForDownstreamConsumer = config.getMaxConcurrentStages() - 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can something like following happen:
a) Prior's group ouptut type is OutputChannelMode.MEMORY (and prior + this group must run together)
b) The group sorts, therefore 'maxStageGroupSizeAllowingForDownstreamConsumer' = maxConcurrentStages - 1
c) The actual number of stages running simultaneously will be (priorGroup.size() + maxConcurrentStages - 1), which can be greater than maxConcurrentStages.

If not, what's there to prevent it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's prevented by the fact that when there's a pipeline break (i.e. some stage that sorts) in a stage group, the upstream stage group is closed before the downstream stage group is started. So the priorGroup.size() and the 1 will not be happening simultaneously.

I extended the comment to say:

          // When the current group sorts, there's a pipeline break, so we can leapfrog: close the prior group before
          // starting the downstream group. In this case, we only need to reserve a single concurrent-stage slot for
          // a downstream consumer.

The new class-level javadoc should help shine light on this too.

Copy link
Contributor

@abhishekagarwal87 abhishekagarwal87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR looks good to me. I was curious how folks would tune the value of maxConcurrentStages? Based on available memory?

* This may be an expensive operation. For example, the production implementation {@link MemoryIntrospectorImpl}
* estimates size of all lookups as part of computing this value.
*/
long jvmMemoryRequiredForUsableMemory(long usableMemory);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
long jvmMemoryRequiredForUsableMemory(long usableMemory);
long computeJVMMemoryRequiredForUsableMemory(long usableMemory);

the suggest name makes it clearer about the potential cost of this operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I changed the name.

Copy link
Contributor

@LakshSingla LakshSingla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the addressed comments! The PR LGTM. I have added a few more questions, but nothing major.

Comment on lines 53 to 54
* On workers, this is the maximum number of {@link Worker} that run simultaneously. See
* {@link WorkerMemoryParameters} for how memory is divided among and within {@link WorkOrder} run by a worker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems slightly off - a single query can be run simultaneously on multiple tasks. numQueriesInJvm should technically be 1, but by the definition in the Javadoc, it might return > 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really meant to mean "within one JVM". So in a task it's always 1. I will update it to be clearer.

Comment on lines +47 to +50
/**
* Peons may have more than one processing thread, but we currently only use one of them.
*/
private static final int NUM_PROCESSING_THREADS = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this hasn't changed from the original MSQ design, but is there any reason why we use a single thread in the Peon? Why are the other threads not utilized?

Also, in the indexer, we use all the threads, but as per my reasoning, we should use a single thread, per process in the indexer (since other processes share that). What's the fallacy in my thinking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Peons, I think we can start using more threads. I am planning on changing that in a future patch, probably the follow-up to this patch that does the Worker changes.

For the Indexer, I'm not sure the current resource model actually makes sense. It probably needs some adjustment. For now, I am just keeping it the way it is.

}

@Provides
@LazySingleton
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's a singleton, we should probably rename the method to what resource this bouncer guards. Else there can be confusion as to whether we want to use the global bouncer or create one locally.
Digging into the code, it seems to be guarding the number of the processors that can run concurrently, so something like makeProcessorBouncer seems non-ambiguous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable. I changed the name.

@gianm gianm merged commit 5d1950d into apache:master May 1, 2024
88 checks passed
@gianm gianm deleted the msq-controller-refactor branch May 1, 2024 04:30
gianm added a commit to gianm/druid that referenced this pull request Jul 24, 2024
This patch is a follow-up to apache#16168, adding worker-side support for
in-memory shuffles. Changes include:

1) Worker-side code now respects the same context parameter "maxConcurrentStages"
   that was added to the controller in apache#16168. The parameter remains undocumented
   for now, to give us a chance to more fully develop and test this functionality.

1) WorkerImpl is broken up into WorkerImpl, RunWorkOrder, and RunWorkOrderListener
   to improve readability.

2) WorkerImpl has a new StageOutputHolder + StageOutputReader concept, which
   abstract over memory-based or file-based stage results.

3) RunWorkOrder is updated to create in-memory stage output channels when
   instructed to.

4) ControllerResource is updated to add /doneReadingInput/, so the controller
   can tell when workers that sort, but do not gather statistics, are done reading
   their inputs.

5) WorkerMemoryParameters is updated to consider maxConcurrentStages.

Additionally, WorkerChatHandler is split into WorkerResource, so as to match
ControllerChatHandler and ControllerResource.
gianm added a commit that referenced this pull request Jul 31, 2024
* MSQ worker: Support in-memory shuffles.

This patch is a follow-up to #16168, adding worker-side support for
in-memory shuffles. Changes include:

1) Worker-side code now respects the same context parameter "maxConcurrentStages"
   that was added to the controller in #16168. The parameter remains undocumented
   for now, to give us a chance to more fully develop and test this functionality.

1) WorkerImpl is broken up into WorkerImpl, RunWorkOrder, and RunWorkOrderListener
   to improve readability.

2) WorkerImpl has a new StageOutputHolder + StageOutputReader concept, which
   abstract over memory-based or file-based stage results.

3) RunWorkOrder is updated to create in-memory stage output channels when
   instructed to.

4) ControllerResource is updated to add /doneReadingInput/, so the controller
   can tell when workers that sort, but do not gather statistics, are done reading
   their inputs.

5) WorkerMemoryParameters is updated to consider maxConcurrentStages.

Additionally, WorkerChatHandler is split into WorkerResource, so as to match
ControllerChatHandler and ControllerResource.

* Updates for static checks, test coverage.

* Fixes.

* Remove exception.

* Changes from review.

* Address static check.

* Changes from review.

* Improvements to docs and method names.

* Update comments, add test.

* Additional javadocs.

* Fix throws.

* Fix worker stopping in tests.

* Fix stuck test.
sreemanamala pushed a commit to sreemanamala/druid that referenced this pull request Aug 6, 2024
* MSQ worker: Support in-memory shuffles.

This patch is a follow-up to apache#16168, adding worker-side support for
in-memory shuffles. Changes include:

1) Worker-side code now respects the same context parameter "maxConcurrentStages"
   that was added to the controller in apache#16168. The parameter remains undocumented
   for now, to give us a chance to more fully develop and test this functionality.

1) WorkerImpl is broken up into WorkerImpl, RunWorkOrder, and RunWorkOrderListener
   to improve readability.

2) WorkerImpl has a new StageOutputHolder + StageOutputReader concept, which
   abstract over memory-based or file-based stage results.

3) RunWorkOrder is updated to create in-memory stage output channels when
   instructed to.

4) ControllerResource is updated to add /doneReadingInput/, so the controller
   can tell when workers that sort, but do not gather statistics, are done reading
   their inputs.

5) WorkerMemoryParameters is updated to consider maxConcurrentStages.

Additionally, WorkerChatHandler is split into WorkerResource, so as to match
ControllerChatHandler and ControllerResource.

* Updates for static checks, test coverage.

* Fixes.

* Remove exception.

* Changes from review.

* Address static check.

* Changes from review.

* Improvements to docs and method names.

* Update comments, add test.

* Additional javadocs.

* Fix throws.

* Fix worker stopping in tests.

* Fix stuck test.
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants