Skip to content

Query Execution in Tests

Paul Rogers edited this page Jun 13, 2022 · 4 revisions
  • Planner produces a PlannerResult
  • Which holds a Supplier<Sequence<Object[]>> which wraps query execution.
  • The supplier is a lambda, defined in planWithDruidConvention(), that contains the planner and DruidQueryRel.
  • The lambda does a final "sanity check" of resources, then calls DruidRel<?>.run().
  • DruidRel<?>.run() converts the DruidRel to a Druid query, in the form of DruidQuery, which seems to be the thing that does the query translation.
  • And then gets the query maker from the planner context and calls runQuery().
  • NativeQueryMaker.runQuery() obtains the Druid query and an ExecutionPlan, then calls execute().
  • ExecutionPlan holds the query, row order, new fields and new types (whatever those are).
  • execute() creates a QueryLifecycle to run the query by calling QueryLifecycle.runSimple(). This method also uses the Hook to record the actual final query.
  • execute() uses a QueryToolChest to get the signature and convert rows.
  • runSimple() manages some state, checks authorization again, and calls execute() to run the query.
  • QueryLifecycle.execute() creates a ResponseContext, wraps the query in a QueryPlus (including the QuerySegmentWalker) and calls run().
  • QueryPlus.run() calls Query.getRunner() to get the QueryRunner, which is produced by DataSourceAnalysis.
  • DataSourceAnalysis is a complex beast, and handles the odd way Druid does joins, etc.
  • getBaseQuerySegmentSpec then obtains the segment spec (intervals) for the target datasource.
  • getRunner() combines the spec with the walker in MultipleIntervalSegmentSpec.lookup() can calls QuerySegmentWalker.getQueryRunnerForIntervals().
  • This calls ClientQuerySegmentWalker.getQueryRunnerForIntervals(). This walker is "for the Broker process".
  • After much computation, returns a QuerySwappingQueryRunner to run on the "cluster".
  • Input to the above is a `` created in ClientQuerySegmentWalker.decorateClusterRunner(). This does "planning" by creating a stack of query runners packaged in a `FluentQueryRunner`.
  • QueryPlus.run() then calls QueryRunner.run().
  • QuerySwappingQueryRunner.run() invokes a base runner, which is a FluentQueryRunner.
  • FluentQueryRunner.run() invokes its base runner, which is CPUTimeMetricQueryRunner
  • CPUTimeMetricQueryRunner measures CPU, then invokes ResultLevelCachingQueryRunner.
  • ResultLevelCachingQueryRunner notices we are not fetching from, or writing to, the cache, so just calls FinalizeResultsQueryRunner.
  • FinalizeResultsQueryRunner finalizes results, then calls UnionQueryRunner.
  • UnionQueryRunner notices that the test query has no union, so calls its input.
  • The input is a lambda defined in ScanQueryQueryToolChest.preMergeQueryDecoration() which just calls its input.
  • SetAndVerifyContextQueryRunner adjusts timeout and scatter/gather bytes in the context. No execution part. Just calls its input.
  • RetryQueryRunner handles segment retries and merges results from exchanges. This runner handles retries, its child does the initial "wave" of distribution.
  • TestClusterQuerySegmentWalker.getQueryRunnerForIntervals() lambda uses DataSourceAnalysis to work out the datasource, then works out the segments for the datasource given the query intervals. This then calls getQueryRunnerForSegments() which builds a new tree of runners that includes post-merge, merge, pre-merge and table runners. Uses a "conglomerate" to find the query runner factory for a query class, which is just a map from class to factory. For our test query, the factory is ScanQueryRunnerFactory. Part of this work uses JoinableFactoryWrapper, which does certain join-level query optimizations, though it does nothing for our test query. Then, we get a list of "windowed segments" (WindowedSegment) which is a pair of (segment, interval).
  • The interesting bit is then in TestClusterQuerySegmentWalker.makeTableRunner() which converts the list of segment windows into a set of SpecificSegmentQueryRunner instances. (This adds a FinalizeResultsQueryRunner, but one was already added by the caller.) Each has as input a ReferenceCountingSegmentQueryRunner. QueryRunnerFactory handles "the nitty-gritty details of a query". (Question: do we group segments by server? Seems not.)
  • The per-segment queries run in parallel in a QueryProcessingPool, though tests use DirectExecutorService which runs the items sequentially in a single thread. Thus, the SpecificSegmentQueryRunner runs in a thread managed by the processing pool. This is where the scatter parallelism comes in on the Broker side.
  • The result is wrapped in a query runner which ...
  • getQueryRunnerForSegments.getQueryRunnerForSegments() lambda rewrites queries to identify the specific segments. It also handles scheduling this part of the query for execution if a scheduler is present. The QueryScheduler holds the query until it can be run, then invokes the corresponding child runner and returns its sequence. Thus, the main thread may pause here awaiting execution. Execution itself is handled via a lazy sequence which is creates outside of the scheduler, but read by the downstream (and thus started) only after the wait.
  • At this point, control will bubble back up to the top and will continue downward again only when the caller reads from the sequence (at which time the downstream runners will have converted to operators, if they can.)
  • FinalizeResultsQueryRunner does nothing.
  • ScanQueryQueryToolChest.mergeResults lambda handles limit and offset (merging is just a union.)
  • ScanQueryQueryToolChest.preMergeQueryDecoration() lambda does nothing.
  • FinalizeResultsQueryRunner does nothing.
  • ScanQueryQueryToolChest.mergeResults lambda as above.
  • ScanQueryRunnerFactory.mergeRunners lambda does the actual merge (as a concatenation in this case.)
  • SpecificSegmentQueryRunner renames the thread and catches SegmentMissingException from upstream and writes it into the response context.
  • ReferenceCountingSegmentQueryRunner does what it sounds like.
  • ScanQueryRunnerFactory.ScanQueryRunner is the leaf runner which invokes the scan query engine.
  • QueryLifecycle.execute() returns a QueryLifecycle.QueryResponse object.
  • runSimple() calls QueryResponse.getResults() to obtain the results as a Sequence.

Notes:

  • Spend time to fully understand the DataSourceAnalysis: it seem to be core to the whole show.
  • This whole area is complex and seems to require untangling (later).
  • Understand the QuerySegmentWalker mechanism. Looks like this does quite a bit of "planning" for running queries.
  • Much of what seems to be done in the above two is a key part of planning. Understanding the above is non-trivial.
  • Somewhere in the above the FluentQueryRunner was called to build up a stack of query runners. This is the thing to convert to operators.
  • QuerySwappingQueryRunner swaps a rewritten query, but does not seem to be optimized away if the two queries are identical.
  • There is no materialization of the stack of query runners: no "toString" no way to grab the runners to inspect in tests. Could we change this?
  • The "extra" query runners are not that costly as they simply pass along the sequence from the input, so they add no per-row overhead.
  • Converting RetryQueryRunner will be a bit of a project. Do tests exist?
  • Query laning strategy seems oddly placed: the query starts running and is queued at the point of scatter. Understand better. It is under the retry runner, meaning that each retry could get queued and delayed separately. Very odd.
  • At present, after the call to open on the top set of operators, the top of the stack has a WrappingSequence, ConcatSequence, MappedSequence and WrappingSequence. Can these be replaced with operators?
  • The test stack seems to have redundant elements, which produces redundant limit operators. Probably because the query runner stack is impossible to visualize.
  • QueryPlanner.runSpecificSegment does not handle the case of a missing segment (catching SegmentMissingException and placing the segment in the response context.) Should it? Or, did I handle this elsewhere? Are there two versions? One for Broker, one for Historical?
  • In the conversion to operators, the scatter/gather operation should not be handled within an operator: it should be part of query run planning done earlier. This means this bit of the flow won't be a simple conversion.
  • ScanQueryQueryToolChest.resultArraySignature discusses limitations of array-based results. Basically, Druid cannot handle "schema changes."

Higher-Level Abstraction Version

The general structure is:

  • Calcite parses SQL, creates a logical plan, which Druid creates to a native query.
  • The native query is then "parsed" and "planned" by constructing a set of query runners.
  • The query runners then to task-specific "planning" and create an executor, wrapped in a sequence, to do the work. While doing the "planning", they alter the query passed down to the children, so that the runners are not associated with a "plan", but rather are handed a revised native query from its "parent" (downstream) runner. Thus, runners operate from root (most downstream) to leaf (most upstream).
  • Sequences do the execution as part of following the sequence protocol.

This means we need to understand four different aspects:

  • Calcite-to-native
  • Native-to-query-runner
  • Query-runner-to-execution
  • Execution

The goal is to reduce this down to:

  • Calcite-to-logical-plan
  • Logical-to-physical plan
  • Operators for execution

The current approach places the query plan in both the native query and its associated set of query runners. The NG approach is to put all of this in a plan, which operators then execute. Short-term, we use runners as the "plan", allowing the runners to generate the operators (based on information in both the runner and native query.) Longer, term, that same information would be in the physical plan instead.

Revisit the detailed analysis with the above structure in mind:

  • ...
  • The QuerySegmentWalker is responsible for working out how to access data in segments. Datasources can be inline, broadcast, local or remote. QuerySegmentWalker.getQueryRunnerForIntervals() Creates a query runner which knows how to access the target segments wherever they happen to reside. It is the first step in "planning" a native query.
  • Since the segment-access strategy may depend on the native query type, a query-specific QueryToolChest is obtained from the QueryToolChestWarehouse to provide the type-specific information.
  • ResultLevelCachingQueryRunner uses flags to indicate if caching is needed. This can be converted to distinct operators. (In fact, that was done in the earlier "pipe" branch.)
  • UnionQueryRunner handles unions, even if a query has no unions. Could be optimized away.
  • A query will hit multiple segments, each on a separate node. It appears that a separate request is made to the data node for each of the segments (requests don't seem to be batched in a single request.) Any of these can report the segment missing, so a retry mechanism (RetryQueryRunner) handles retrying of segments which are reported missing on the first pass.
  • Merging is done on both the data node and historical side. Sometimes this is a concatenation, other times it is a keyed merge.
  • After merge, aggregates may need to be finalized.

Details of a Scan Query in Test Mode

Starting with QueryPlus.run(), the stack of query runners, and the Query NG replacements, are:

Classic Query NG Comments
QuerySwappingQueryRunner -- No contents, no operator.
FluentQueryRunner -- No contents, no operator.
ResultLevelCachingQueryRunner ? Convert using From/ToCache from pipe
CPUTimeMetricQueryRunner CpuMetricOperator Done
FinalizeResultsQueryRunner TransformOperator Done
ScanQueryQueryToolChest.preMergeQueryDecoration() lambda -- Does nothing
UnionQueryRunner ? Does nothing for simple queries, not converted.
SetAndVerifyContextQueryRunner -- Plan-time only, no operator.
RetryQueryRunner ? Not yet converted.
TestClusterQuerySegmentWalker.getQueryRunnerForIntervals lambda ? Plans scatter operation
TestClusterQuerySegmentWalker.getQueryRunnerForSegments lambda ? Does nothing in test; handles scheduling if enabled.
FinalizeResultsQueryRunner TransformOperator Done
ScanQueryQueryToolChest.mergeResults() lambda ScanResultLimitOperator, ScanResultOffsetOperator if needed
ScanQueryQueryToolChest.preMergeQueryDecoration() -- Does nothing.
FinalizeResultsQueryRunner TransformOperator Done
ScanQueryQueryToolChest.mergeResults() lambda as above if needed
ScanQueryRunnerFactory.mergeRunners ConcatOperator Concat for this query.
SpecificSegmentQueryRunner ThreadLabelOperator Done
ReferenceCountingSegmentQueryRunner SegmentLockOperator Done
ScanQueryRunner ScanQueryOperator Done. Leaf operator.

Distribution Mechanism

Druid is based on a two-level scatter-gather process which works as follows:

  • TestClusterQuerySegmentWalker.getQueryRunnerForSegments() determines the distribution list as a SegmentReference provided, not as a list, but rather as a function.
  • This is then fed into a query runner stack with the makeTableRunner() as the leaf-most query runner:
    final QueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<>(
        toolChest.postMergeQueryDecoration(
            toolChest.mergeResults(
                toolChest.preMergeQueryDecoration(
                    makeTableRunner(toolChest, factory, getSegmentsForTable(dataSourceName, specs), segmentMapFn)
                )
            )
        ),
        toolChest
    );
  • Inside of makeTableRunner() is another query runner stack:
    return new FinalizeResultsQueryRunner<>(
        toolChest.mergeResults(
            factory.mergeRunners(
                Execs.directExecutor(),
                FunctionalIterable
                    .create(segmentsList)
                    .transform(
                        segment ->
                            new SpecificSegmentQueryRunner<>(
                                new ReferenceCountingSegmentQueryRunner<>(
                                    factory,
                                    segmentMapFn.apply(segment.getSegment()),
                                    segment.getDescriptor()
                                ),
                                new SpecificSegmentSpec(segment.getDescriptor())
                            )
                    )
            )
        ),
        toolChest
    );

The above says, starting from the bottom:

  • Use the given factory to create the actual distribution query runner.
  • Wrap that in a ReferenceCountingSegmentQueryRunner which locks the segment in memory (in the test host).
  • Wrap that in a SpecificSegmentQueryRunner
  • Do the above for each segment to be queried.
  • Merge these runners, in this case, in the same thread using Execs.directExecutor()
  • Merge the results using a query-specific runner given by the tool chest.

Since another merge is done below, it seems there are two levels of merges? Or, the test path presumes things work, so does the same steps that the production code does above the retry query runner?

Then:

  • RetryQueryRunner invokes a based runner (which) that partitions the query by segment, and sends a request per segment to a data node.
  • RetryQueryRunner gathers the resulting sequences, waiting for all servers to report.
  • RetryQueryRunner uses a merging sequence to combine results.

It may be that the requested segment is missing from the target data node. In this case:

  • The segment walker provides a retry function to RetryQueryRunner. For tests, it is TestClusterQuerySegmentWalker.getQueryRunnerForSegments() defined as:
  <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs);
  • The data node returns a query context that contains a (potentially truncated) list of missing segments.
  • RetryQueryRunner waits for all servers to report.
  • RetryQueryRunner gathers the list of missing segments, and runs the list through the same process as above to launch another scatter/gather wave.
  • RetryQueryRunner adds the resulting sequences to the list to merge.
  • RetryQueryRunner repeats the process until a wave reports no missing segments, or until a retry limit.

Taking this up a level, there seems to be a non-obvious structure.

  • The query runners define an implicit physical plan, with distribution as one of the steps, and with per-slice runners down to the leaf.
  • The segment walker is responsible for not just defining the set of segments, but also for planning the distribution: the per-slice sequence stack, and the merger of the results.
  • A separate mechanism, the RetryQueryRunner handles missing segments by retrying the above distribution with a smaller set of segments. RetryQueryRunner produces a sequence of sequence: one from each retry "batch."
Clone this wiki locally