-
Notifications
You must be signed in to change notification settings - Fork 0
Query Execution in Tests
- Planner produces a
PlannerResult
- Which holds a
Supplier<Sequence<Object[]>>
which wraps query execution. - The supplier is a lambda, defined in
planWithDruidConvention()
, that contains the planner andDruidQueryRel
. - The lambda does a final "sanity check" of resources, then calls
DruidRel<?>.run()
. -
DruidRel<?>.run()
converts theDruidRel
to a Druid query, in the form ofDruidQuery
, which seems to be the thing that does the query translation. - And then gets the query maker from the planner context and calls
runQuery()
. -
NativeQueryMaker.runQuery()
obtains the Druid query and anExecutionPlan
, then callsexecute()
. -
ExecutionPlan
holds the query, row order, new fields and new types (whatever those are). -
execute()
creates aQueryLifecycle
to run the query by callingQueryLifecycle.runSimple()
. This method also uses theHook
to record the actual final query. -
execute()
uses aQueryToolChest
to get the signature and convert rows. -
runSimple()
manages some state, checks authorization again, and callsexecute()
to run the query. -
QueryLifecycle.execute()
creates aResponseContext
, wraps the query in aQueryPlus
(including theQuerySegmentWalker
) and callsrun()
. -
QueryPlus.run()
callsQuery.getRunner()
to get theQueryRunner
, which is produced byDataSourceAnalysis
. -
DataSourceAnalysis
is a complex beast, and handles the odd way Druid does joins, etc. -
getBaseQuerySegmentSpec
then obtains the segment spec (intervals) for the target datasource. -
getRunner()
combines the spec with the walker inMultipleIntervalSegmentSpec.lookup()
can callsQuerySegmentWalker.getQueryRunnerForIntervals()
. - This calls
ClientQuerySegmentWalker.getQueryRunnerForIntervals()
. This walker is "for the Broker process". - After much computation, returns a
QuerySwappingQueryRunner
to run on the "cluster". - Input to the above is a `` created in
ClientQuerySegmentWalker.decorateClusterRunner()
. This does "planning" by creating a stack of query runners packaged in a `FluentQueryRunner`. -
QueryPlus.run()
then callsQueryRunner.run()
. -
QuerySwappingQueryRunner.run()
invokes a base runner, which is aFluentQueryRunner
. -
FluentQueryRunner.run()
invokes its base runner, which isCPUTimeMetricQueryRunner
-
CPUTimeMetricQueryRunner
measures CPU, then invokesResultLevelCachingQueryRunner
. -
ResultLevelCachingQueryRunner
notices we are not fetching from, or writing to, the cache, so just callsFinalizeResultsQueryRunner
. -
FinalizeResultsQueryRunner
finalizes results, then callsUnionQueryRunner
. -
UnionQueryRunner
notices that the test query has no union, so calls its input. - The input is a lambda defined in
ScanQueryQueryToolChest.preMergeQueryDecoration()
which just calls its input. -
SetAndVerifyContextQueryRunner
adjusts timeout and scatter/gather bytes in the context. No execution part. Just calls its input. -
RetryQueryRunner
handles segment retries and merges results from exchanges. This runner handles retries, its child does the initial "wave" of distribution. -
TestClusterQuerySegmentWalker.getQueryRunnerForIntervals()
lambda usesDataSourceAnalysis
to work out the datasource, then works out the segments for the datasource given the query intervals. This then callsgetQueryRunnerForSegments()
which builds a new tree of runners that includes post-merge, merge, pre-merge and table runners. Uses a "conglomerate" to find the query runner factory for a query class, which is just a map from class to factory. For our test query, the factory isScanQueryRunnerFactory
. Part of this work usesJoinableFactoryWrapper
, which does certain join-level query optimizations, though it does nothing for our test query. Then, we get a list of "windowed segments" (WindowedSegment
) which is a pair of (segment, interval). - The interesting bit is then in
TestClusterQuerySegmentWalker.makeTableRunner()
which converts the list of segment windows into a set ofSpecificSegmentQueryRunner
instances. (This adds aFinalizeResultsQueryRunner
, but one was already added by the caller.) Each has as input aReferenceCountingSegmentQueryRunner
.QueryRunnerFactory
handles "the nitty-gritty details of a query". (Question: do we group segments by server? Seems not.) - The per-segment queries run in parallel in a
QueryProcessingPool
, though tests useDirectExecutorService
which runs the items sequentially in a single thread. Thus, theSpecificSegmentQueryRunner
runs in a thread managed by the processing pool. This is where the scatter parallelism comes in on the Broker side. - The result is wrapped in a query runner which ...
-
getQueryRunnerForSegments.getQueryRunnerForSegments()
lambda rewrites queries to identify the specific segments. It also handles scheduling this part of the query for execution if a scheduler is present. TheQueryScheduler
holds the query until it can be run, then invokes the corresponding child runner and returns its sequence. Thus, the main thread may pause here awaiting execution. Execution itself is handled via a lazy sequence which is creates outside of the scheduler, but read by the downstream (and thus started) only after the wait. - At this point, control will bubble back up to the top and will continue downward again only when the caller reads from the sequence (at which time the downstream runners will have converted to operators, if they can.)
-
FinalizeResultsQueryRunner
does nothing. -
ScanQueryQueryToolChest.mergeResults
lambda handles limit and offset (merging is just a union.) -
ScanQueryQueryToolChest.preMergeQueryDecoration()
lambda does nothing. -
FinalizeResultsQueryRunner
does nothing. -
ScanQueryQueryToolChest.mergeResults
lambda as above. -
ScanQueryRunnerFactory.mergeRunners
lambda does the actual merge (as a concatenation in this case.) -
SpecificSegmentQueryRunner
renames the thread and catchesSegmentMissingException
from upstream and writes it into the response context. -
ReferenceCountingSegmentQueryRunner
does what it sounds like. -
ScanQueryRunnerFactory.ScanQueryRunner
is the leaf runner which invokes the scan query engine. -
QueryLifecycle.execute()
returns aQueryLifecycle.QueryResponse
object. -
runSimple()
callsQueryResponse.getResults()
to obtain the results as aSequence
.
Notes:
- Spend time to fully understand the
DataSourceAnalysis
: it seem to be core to the whole show. - This whole area is complex and seems to require untangling (later).
- Understand the
QuerySegmentWalker
mechanism. Looks like this does quite a bit of "planning" for running queries. - Much of what seems to be done in the above two is a key part of planning. Understanding the above is non-trivial.
- Somewhere in the above the
FluentQueryRunner
was called to build up a stack of query runners. This is the thing to convert to operators. -
QuerySwappingQueryRunner
swaps a rewritten query, but does not seem to be optimized away if the two queries are identical. - There is no materialization of the stack of query runners: no "toString" no way to grab the runners to inspect in tests. Could we change this?
- The "extra" query runners are not that costly as they simply pass along the sequence from the input, so they add no per-row overhead.
- Converting
RetryQueryRunner
will be a bit of a project. Do tests exist? - Query laning strategy seems oddly placed: the query starts running and is queued at the point of scatter. Understand better. It is under the retry runner, meaning that each retry could get queued and delayed separately. Very odd.
- At present, after the call to open on the top set of operators, the top of the stack has a
WrappingSequence
,ConcatSequence
,MappedSequence
andWrappingSequence
. Can these be replaced with operators? - The test stack seems to have redundant elements, which produces redundant limit operators. Probably because the query runner stack is impossible to visualize.
-
QueryPlanner.runSpecificSegment
does not handle the case of a missing segment (catchingSegmentMissingException
and placing the segment in the response context.) Should it? Or, did I handle this elsewhere? Are there two versions? One for Broker, one for Historical? - In the conversion to operators, the scatter/gather operation should not be handled within an operator: it should be part of query run planning done earlier. This means this bit of the flow won't be a simple conversion.
-
ScanQueryQueryToolChest.resultArraySignature
discusses limitations of array-based results. Basically, Druid cannot handle "schema changes."
The general structure is:
- Calcite parses SQL, creates a logical plan, which Druid creates to a native query.
- The native query is then "parsed" and "planned" by constructing a set of query runners.
- The query runners then to task-specific "planning" and create an executor, wrapped in a sequence, to do the work. While doing the "planning", they alter the query passed down to the children, so that the runners are not associated with a "plan", but rather are handed a revised native query from its "parent" (downstream) runner. Thus, runners operate from root (most downstream) to leaf (most upstream).
- Sequences do the execution as part of following the sequence protocol.
This means we need to understand four different aspects:
- Calcite-to-native
- Native-to-query-runner
- Query-runner-to-execution
- Execution
The goal is to reduce this down to:
- Calcite-to-logical-plan
- Logical-to-physical plan
- Operators for execution
The current approach places the query plan in both the native query and its associated set of query runners. The NG approach is to put all of this in a plan, which operators then execute. Short-term, we use runners as the "plan", allowing the runners to generate the operators (based on information in both the runner and native query.) Longer, term, that same information would be in the physical plan instead.
Revisit the detailed analysis with the above structure in mind:
- ...
- The
QuerySegmentWalker
is responsible for working out how to access data in segments. Datasources can be inline, broadcast, local or remote.QuerySegmentWalker.getQueryRunnerForIntervals()
Creates a query runner which knows how to access the target segments wherever they happen to reside. It is the first step in "planning" a native query. - Since the segment-access strategy may depend on the native query type, a query-specific
QueryToolChest
is obtained from theQueryToolChestWarehouse
to provide the type-specific information. -
ResultLevelCachingQueryRunner
uses flags to indicate if caching is needed. This can be converted to distinct operators. (In fact, that was done in the earlier "pipe" branch.) -
UnionQueryRunner
handles unions, even if a query has no unions. Could be optimized away. - A query will hit multiple segments, each on a separate node. It appears that a separate request is made to the data node for each of the segments (requests don't seem to be batched in a single request.) Any of these can report the segment missing, so a retry mechanism (
RetryQueryRunner
) handles retrying of segments which are reported missing on the first pass. - Merging is done on both the data node and historical side. Sometimes this is a concatenation, other times it is a keyed merge.
- After merge, aggregates may need to be finalized.
Starting with QueryPlus.run()
, the stack of query runners, and the Query NG replacements, are:
Classic | Query NG | Comments |
---|---|---|
QuerySwappingQueryRunner |
-- | No contents, no operator. |
FluentQueryRunner |
-- | No contents, no operator. |
ResultLevelCachingQueryRunner |
? | Convert using From/ToCache from pipe
|
CPUTimeMetricQueryRunner |
CpuMetricOperator |
Done |
FinalizeResultsQueryRunner |
TransformOperator |
Done |
ScanQueryQueryToolChest.preMergeQueryDecoration() lambda |
-- | Does nothing |
UnionQueryRunner |
? | Does nothing for simple queries, not converted. |
SetAndVerifyContextQueryRunner |
-- | Plan-time only, no operator. |
RetryQueryRunner |
? | Not yet converted. |
TestClusterQuerySegmentWalker.getQueryRunnerForIntervals lambda |
? | Plans scatter operation |
TestClusterQuerySegmentWalker.getQueryRunnerForSegments lambda |
? | Does nothing in test; handles scheduling if enabled. |
FinalizeResultsQueryRunner |
TransformOperator |
Done |
ScanQueryQueryToolChest.mergeResults() lambda |
ScanResultLimitOperator , ScanResultOffsetOperator
|
if needed |
ScanQueryQueryToolChest.preMergeQueryDecoration() |
-- | Does nothing. |
FinalizeResultsQueryRunner |
TransformOperator |
Done |
ScanQueryQueryToolChest.mergeResults() lambda |
as above | if needed |
ScanQueryRunnerFactory.mergeRunners |
ConcatOperator |
Concat for this query. |
SpecificSegmentQueryRunner |
ThreadLabelOperator |
Done |
ReferenceCountingSegmentQueryRunner |
SegmentLockOperator |
Done |
ScanQueryRunner |
ScanQueryOperator |
Done. Leaf operator. |
Druid is based on a two-level scatter-gather process which works as follows:
-
TestClusterQuerySegmentWalker.getQueryRunnerForSegments()
determines the distribution list as aSegmentReference
provided, not as a list, but rather as a function. - This is then fed into a query runner stack with the
makeTableRunner()
as the leaf-most query runner:
final QueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
toolChest.preMergeQueryDecoration(
makeTableRunner(toolChest, factory, getSegmentsForTable(dataSourceName, specs), segmentMapFn)
)
)
),
toolChest
);
- Inside of
makeTableRunner()
is another query runner stack:
return new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
Execs.directExecutor(),
FunctionalIterable
.create(segmentsList)
.transform(
segment ->
new SpecificSegmentQueryRunner<>(
new ReferenceCountingSegmentQueryRunner<>(
factory,
segmentMapFn.apply(segment.getSegment()),
segment.getDescriptor()
),
new SpecificSegmentSpec(segment.getDescriptor())
)
)
)
),
toolChest
);
The above says, starting from the bottom:
- Use the given
factory
to create the actual distribution query runner. - Wrap that in a
ReferenceCountingSegmentQueryRunner
which locks the segment in memory (in the test host). - Wrap that in a
SpecificSegmentQueryRunner
- Do the above for each segment to be queried.
- Merge these runners, in this case, in the same thread using
Execs.directExecutor()
- Merge the results using a query-specific runner given by the tool chest.
Since another merge is done below, it seems there are two levels of merges? Or, the test path presumes things work, so does the same steps that the production code does above the retry query runner?
Then:
-
RetryQueryRunner
invokes a based runner (which) that partitions the query by segment, and sends a request per segment to a data node. -
RetryQueryRunner
gathers the resulting sequences, waiting for all servers to report. -
RetryQueryRunner
uses a merging sequence to combine results.
It may be that the requested segment is missing from the target data node. In this case:
- The segment walker provides a retry function to
RetryQueryRunner
. For tests, it isTestClusterQuerySegmentWalker.getQueryRunnerForSegments()
defined as:
<T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs);
- The data node returns a query context that contains a (potentially truncated) list of missing segments.
-
RetryQueryRunner
waits for all servers to report. -
RetryQueryRunner
gathers the list of missing segments, and runs the list through the same process as above to launch another scatter/gather wave. -
RetryQueryRunner
adds the resulting sequences to the list to merge. -
RetryQueryRunner
repeats the process until a wave reports no missing segments, or until a retry limit.
Taking this up a level, there seems to be a non-obvious structure.
- The query runners define an implicit physical plan, with distribution as one of the steps, and with per-slice runners down to the leaf.
- The segment walker is responsible for not just defining the set of segments, but also for planning the distribution: the per-slice sequence stack, and the merger of the results.
- A separate mechanism, the
RetryQueryRunner
handles missing segments by retrying the above distribution with a smaller set of segments.RetryQueryRunner
produces a sequence of sequence: one from each retry "batch."