-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MSQ controller: Support in-memory shuffles; towards JVM reuse. #16168
Conversation
This patch contains two controller changes that make progress towards a lower-latency MSQ. First, support for in-memory shuffles. The main feature of in-memory shuffles, as far as the controller is concerned, is that they are not fully buffered. That means that whenever a producer stage uses in-memory output, its consumer must run concurrently. The controller determines which stages run concurrently, and when they start and stop. "Leapfrogging" allows any chain of sort-based stages to use in-memory shuffles even if we can only run two stages at once. For example, in a linear chain of stages 0 -> 1 -> 2 where all do sort-based shuffles, we can use in-memory shuffling for each one while only running two at once. (When stage 1 is done reading input and about to start writing its output, we can stop 0 and start 2.) 1) New OutputChannelMode enum attached to WorkOrders that tells workers whether stage output should be in memory (MEMORY), or use local or durable storage. 2) New logic in the ControllerQueryKernel to determine which stages can use in-memory shuffling (ControllerUtils#computeStageGroups) and to launch them at the appropriate time (ControllerQueryKernel#createNewKernels). 3) New "doneReadingInput" method on Controller (passed down to the stage kernels) which allows stages to transition to POST_READING even if they are not gathering statistics. This is important because it enables "leapfrogging" for HASH_LOCAL_SORT shuffles, and for GLOBAL_SORT shuffles with 1 partition. 4) Moved result-reading from ControllerContext#writeReports to new QueryListener interface, which ControllerImpl feeds results to row-by-row while the query is still running. Important so we can read query results from the final stage using an in-memory channel. 5) New class ControllerQueryKernelConfig holds configs that control kernel behavior (such as whether to pipeline, maximum number of concurrent stages, etc). Generated by the ControllerContext. Second, a refactor towards running workers in persistent JVMs that are able to cache data across queries. This is helpful because I believe we'll want to reuse JVMs and cached data for latency reasons. 1) Move creation of WorkerManager and TableInputSpecSlicer to the ControllerContext, rather than ControllerImpl. This allows managing workers and work assignment differently when JVMs are reusable. 2) Lift the Controller Jersey resource out from ControllerChatHandler to a reusable resource. 3) Move memory introspection to a MemoryIntrospector interface, and introduce ControllerMemoryParameters that uses it. This makes it easier to run MSQ in process types other than Indexer and Peon. Both of these areas will have follow-ups that make similar changes on the worker side.
...nsions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
Fixed
Show resolved
Hide resolved
@Consumes(MediaType.APPLICATION_JSON) | ||
public Response httpPostWorkerError( | ||
final MSQErrorReport errorReport, | ||
@PathParam("taskId") final String taskId, |
Check notice
Code scanning / CodeQL
Useless parameter Note
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was just moved; the parameter is unused in master as well
@Consumes(MediaType.APPLICATION_JSON) | ||
public Response httpPostPartialKeyStatistics( | ||
final Object partialKeyStatisticsObject, | ||
@PathParam("queryId") final String queryId, |
Check notice
Code scanning / CodeQL
Useless parameter Note
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was just moved; the parameter is unused in master as well
...e-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
Fixed
Show resolved
Hide resolved
@@ -132,9 +72,9 @@ | |||
} | |||
|
|||
@JsonProperty("results") | |||
public Yielder<Object[]> getResultYielder() | |||
public List<Object[]> getResults() |
Check notice
Code scanning / CodeQL
Exposing internal representation Note
after this call to getResults
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We trust the caller to not modify the arrays here.
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Fixed
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I am leaving a partial review, primarily centered around the computeStageGroups
method. My understanding is incomplete, and there seem to be some assumptions baked into the code, which if clarified, would make the code easier to reason about for someone unfamiliar with the logic.
*/ | ||
public class StageGroup | ||
{ | ||
private final List<StageId> stageIds; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that there's an implicit contract that the flow of the data between the stage group is linear (A -> B -> C ...) , and cannot be branched (
A
|
B
|
C D
| /
E
)
We should probably mention that. Unrelated, but is it subject to change in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is required to be linear at this point (mostly for simplicity). It could be changed in the future. I have added a comment.
} | ||
|
||
/** | ||
* List of stage IDs in this group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned previously, there's an implicit relation between the stages in the list, and [A, B, C] is not the same as [A, C, B]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true. I have added a comment.
/** | ||
* Utilties for {@link ControllerQueryKernel}. | ||
*/ | ||
public class ControllerUtils |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ControllerQueryKernelUtils seems appropriate. There are many classes prefixed with Controller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. I have changed it.
return null; | ||
} | ||
|
||
private static void removeStageFlow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Javadoc about preconditions and postconditions of the method, and what modifications it makes to the input maps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added javadocs.
// Two things happening here: | ||
// 1) Stages cannot both stream and broadcast their output. This is because when streaming, we can only | ||
// support a single reader. | ||
// 2) Stages can only receive a single streamed input. This isn't strictly necessary, but it simplifies the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the signifcance of streamed input? There is probably some gap in my understanding of streamed, but would we call the data lying in durable storage ready to be read as streamed? Logically the stage can stream the output in that case, and per my understanding, this code would allow that to happen, but I wouldn't call it "streamed" input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I really meant this to refer to output channel mode MEMORY. I have tried to make it more clear and have removed the word "stream". This comment now reads:
// Two things happening here:
// 1) Stages cannot use output mode MEMORY when broadcasting. This is because when using output mode MEMORY, we
// can only support a single reader.
// 2) Downstream stages can only have a single input stage with output mode MEMORY. This isn't strictly
// necessary, but it simplifies the logic around concurrently launching stages.
I have also renamed various variables, like canStream
to canUseMemoryOutput
.
|
||
// 2) Pick some stage that can stream its output, and run that as well as all ready-to-run dependents. | ||
StageId currentStageId = null; | ||
for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an implicit assumption that the inflow's keySet is sorted, which is true since it is actually a TreeMap, however isn't reflected here. Perhaps we should tighten the contract that the inflow map is a TreeMap, instead of a Map (which can be a HashMap as well, which wouldn't work well with this logic)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, as far as I know, this logic doesn't assume the keySet is sorted. It's nice if it is, since it means the execution happens in a consistent order each time this method is run, but I don't think it's required for correctness. Unless I'm missing something.
At any rate, we do use sorted sets, because it is nice for the order to be consistent. There is no harm in tightening up the contract, so I did, to SortedSet
.
} else { | ||
final StageGroup priorGroup = stageGroups.get(stageGroups.size() - 1); | ||
if (priorGroup.lastStageOutputChannelMode() == OutputChannelMode.MEMORY) { | ||
// Prior group must run concurrently with this group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this defy the logic of stage groups - each group runs individually, and all the stages within a group run simultanously. If the prior group runs concurrently with this group, shouldn't they be coalesced into a single group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a few paragraphs of class-level javadoc to discuss how "leapfrogging" works- that is the scenario where this can happen. I also added a mention here:
// Prior group must run concurrently with this group. (Can happen when leapfrogging; see class-level Javadoc.)
|
||
final int maxStageGroupSizeAllowingForDownstreamConsumer; | ||
if (queryDef.getStageDefinition(currentStageId).doesSortDuringShuffle()) { | ||
// When the current group sorts, there's a pipeline break, so we can "leapfrog": close the prior group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's this notion of a group sorting, that is tied to the stage's sorting.
When is a group considered to be sorting, when all, first, or the last of the stages sort? Logically, it should be first, but there's no indication in the code that implies that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The final stage is the only one that can possibly sort, because of the check below that closes off the stage group if currentStageId "doesSortDuringShuffle()". I think the logic here would apply if any stage in the group sorted, though. (If any stage sorts, that causes a pipeline break that means the prior stage group would no longer be needed.)
final StageGroup priorGroup = stageGroups.get(stageGroups.size() - 1); | ||
if (priorGroup.lastStageOutputChannelMode() == OutputChannelMode.MEMORY) { | ||
// Prior group must run concurrently with this group. | ||
maxStageGroupSize = config.getMaxConcurrentStages() - priorGroup.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this ever be 0, and break the logic. Partially related to the question posed above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. I added a comment with a reason why:
// Note: priorGroup.size() is strictly less than config.getMaxConcurrentStages(), because the prior group
// would have its size limited by maxStageGroupSizeAllowingForDownstreamConsumer below.
if (queryDef.getStageDefinition(currentStageId).doesSortDuringShuffle()) { | ||
// When the current group sorts, there's a pipeline break, so we can "leapfrog": close the prior group | ||
// before starting the downstream group. | ||
maxStageGroupSizeAllowingForDownstreamConsumer = config.getMaxConcurrentStages() - 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can something like following happen:
a) Prior's group ouptut type is OutputChannelMode.MEMORY (and prior + this group must run together)
b) The group sorts, therefore 'maxStageGroupSizeAllowingForDownstreamConsumer' = maxConcurrentStages - 1
c) The actual number of stages running simultaneously will be (priorGroup.size() + maxConcurrentStages - 1), which can be greater than maxConcurrentStages.
If not, what's there to prevent it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's prevented by the fact that when there's a pipeline break (i.e. some stage that sorts) in a stage group, the upstream stage group is closed before the downstream stage group is started. So the priorGroup.size()
and the 1
will not be happening simultaneously.
I extended the comment to say:
// When the current group sorts, there's a pipeline break, so we can leapfrog: close the prior group before
// starting the downstream group. In this case, we only need to reserve a single concurrent-stage slot for
// a downstream consumer.
The new class-level javadoc should help shine light on this too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR looks good to me. I was curious how folks would tune the value of maxConcurrentStages? Based on available memory?
* This may be an expensive operation. For example, the production implementation {@link MemoryIntrospectorImpl} | ||
* estimates size of all lookups as part of computing this value. | ||
*/ | ||
long jvmMemoryRequiredForUsableMemory(long usableMemory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
long jvmMemoryRequiredForUsableMemory(long usableMemory); | |
long computeJVMMemoryRequiredForUsableMemory(long usableMemory); |
the suggest name makes it clearer about the potential cost of this operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I changed the name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the addressed comments! The PR LGTM. I have added a few more questions, but nothing major.
* On workers, this is the maximum number of {@link Worker} that run simultaneously. See | ||
* {@link WorkerMemoryParameters} for how memory is divided among and within {@link WorkOrder} run by a worker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems slightly off - a single query can be run simultaneously on multiple tasks. numQueriesInJvm
should technically be 1, but by the definition in the Javadoc, it might return > 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's really meant to mean "within one JVM". So in a task it's always 1. I will update it to be clearer.
/** | ||
* Peons may have more than one processing thread, but we currently only use one of them. | ||
*/ | ||
private static final int NUM_PROCESSING_THREADS = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this hasn't changed from the original MSQ design, but is there any reason why we use a single thread in the Peon? Why are the other threads not utilized?
Also, in the indexer, we use all the threads, but as per my reasoning, we should use a single thread, per process in the indexer (since other processes share that). What's the fallacy in my thinking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the Peons, I think we can start using more threads. I am planning on changing that in a future patch, probably the follow-up to this patch that does the Worker changes.
For the Indexer, I'm not sure the current resource model actually makes sense. It probably needs some adjustment. For now, I am just keeping it the way it is.
} | ||
|
||
@Provides | ||
@LazySingleton |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's a singleton, we should probably rename the method to what resource this bouncer guards. Else there can be confusion as to whether we want to use the global bouncer or create one locally.
Digging into the code, it seems to be guarding the number of the processors that can run concurrently, so something like makeProcessorBouncer
seems non-ambiguous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable. I changed the name.
This patch is a follow-up to apache#16168, adding worker-side support for in-memory shuffles. Changes include: 1) Worker-side code now respects the same context parameter "maxConcurrentStages" that was added to the controller in apache#16168. The parameter remains undocumented for now, to give us a chance to more fully develop and test this functionality. 1) WorkerImpl is broken up into WorkerImpl, RunWorkOrder, and RunWorkOrderListener to improve readability. 2) WorkerImpl has a new StageOutputHolder + StageOutputReader concept, which abstract over memory-based or file-based stage results. 3) RunWorkOrder is updated to create in-memory stage output channels when instructed to. 4) ControllerResource is updated to add /doneReadingInput/, so the controller can tell when workers that sort, but do not gather statistics, are done reading their inputs. 5) WorkerMemoryParameters is updated to consider maxConcurrentStages. Additionally, WorkerChatHandler is split into WorkerResource, so as to match ControllerChatHandler and ControllerResource.
* MSQ worker: Support in-memory shuffles. This patch is a follow-up to #16168, adding worker-side support for in-memory shuffles. Changes include: 1) Worker-side code now respects the same context parameter "maxConcurrentStages" that was added to the controller in #16168. The parameter remains undocumented for now, to give us a chance to more fully develop and test this functionality. 1) WorkerImpl is broken up into WorkerImpl, RunWorkOrder, and RunWorkOrderListener to improve readability. 2) WorkerImpl has a new StageOutputHolder + StageOutputReader concept, which abstract over memory-based or file-based stage results. 3) RunWorkOrder is updated to create in-memory stage output channels when instructed to. 4) ControllerResource is updated to add /doneReadingInput/, so the controller can tell when workers that sort, but do not gather statistics, are done reading their inputs. 5) WorkerMemoryParameters is updated to consider maxConcurrentStages. Additionally, WorkerChatHandler is split into WorkerResource, so as to match ControllerChatHandler and ControllerResource. * Updates for static checks, test coverage. * Fixes. * Remove exception. * Changes from review. * Address static check. * Changes from review. * Improvements to docs and method names. * Update comments, add test. * Additional javadocs. * Fix throws. * Fix worker stopping in tests. * Fix stuck test.
* MSQ worker: Support in-memory shuffles. This patch is a follow-up to apache#16168, adding worker-side support for in-memory shuffles. Changes include: 1) Worker-side code now respects the same context parameter "maxConcurrentStages" that was added to the controller in apache#16168. The parameter remains undocumented for now, to give us a chance to more fully develop and test this functionality. 1) WorkerImpl is broken up into WorkerImpl, RunWorkOrder, and RunWorkOrderListener to improve readability. 2) WorkerImpl has a new StageOutputHolder + StageOutputReader concept, which abstract over memory-based or file-based stage results. 3) RunWorkOrder is updated to create in-memory stage output channels when instructed to. 4) ControllerResource is updated to add /doneReadingInput/, so the controller can tell when workers that sort, but do not gather statistics, are done reading their inputs. 5) WorkerMemoryParameters is updated to consider maxConcurrentStages. Additionally, WorkerChatHandler is split into WorkerResource, so as to match ControllerChatHandler and ControllerResource. * Updates for static checks, test coverage. * Fixes. * Remove exception. * Changes from review. * Address static check. * Changes from review. * Improvements to docs and method names. * Update comments, add test. * Additional javadocs. * Fix throws. * Fix worker stopping in tests. * Fix stuck test.
This patch contains two controller changes that make progress towards a lower-latency MSQ.
These are both larger changes, but I developed them together and it was most straightforward to put them into a single PR rather than separating into multiple PRs, especially given they both involved changes in common files like ControllerImpl and ControllerContext.
Key classes:
These files are the most interesting IMO.
Key changes:
First, support for in-memory shuffles. The main feature of in-memory shuffles, as far as the controller is concerned, is that they are not fully buffered. That means that whenever a producer stage uses in-memory output, its consumer must run concurrently. The controller determines which stages run concurrently, and when they start and stop.
"Leapfrogging" allows any chain of sort-based stages to use in-memory shuffles even if we can only run two stages at once. For example, in a linear chain of stages 0 -> 1 -> 2 where all do sort-based shuffles, we can use in-memory shuffling for each one while only running two at once. (When stage 1 is done reading input and about to start writing its output, we can stop 0 and start 2.)
New
OutputChannelMode
enum attached toWorkOrder
that tells workerswhether stage output should be in memory (
MEMORY
), or use local or durablestorage.
New logic in the
ControllerQueryKernel
to determine which stages can usein-memory shuffling (
ControllerUtils#computeStageGroups
) and to launch themat the appropriate time (
ControllerQueryKernel#createNewKernels
).New
doneReadingInput
method onController
(passed down to the stage kernels)which allows stages to transition to
POST_READING
even if they are notgathering statistics. This is important because it enables "leapfrogging"
for
HASH_LOCAL_SORT
shuffles, and forGLOBAL_SORT
shuffles with 1 partition.Moved result-reading from
ControllerContext#writeReports
to newQueryListener
interface, which
ControllerImpl
feeds results to row-by-row while the queryis still running. Important so we can read query results from the final
stage using an in-memory channel.
New class
ControllerQueryKernelConfig
holds configs that control kernelbehavior (such as whether to pipeline, maximum number of concurrent stages,
etc). Generated by the
ControllerContext
.Second, a refactor towards running workers in persistent JVMs that are able to cache data across queries. This is helpful because I believe we'll want to reuse JVMs and cached data for latency reasons.
Move creation of
WorkerManager
andTableInputSpecSlicer
to theControllerContext
, rather thanControllerImpl
. This allows managing workers andwork assignment differently when JVMs are reusable.
Lift the Controller Jersey resource out from
ControllerChatHandler
to areusable resource
ControllerResource
.Move memory introspection to a
MemoryIntrospector
interface, and introduceControllerMemoryParameters
that uses it. This makes it easier to run MSQ inprocess types other than Indexer and Peon.
Both of these areas will have follow-ups that make similar changes on the worker side.