Skip to content

Query Lifecycle

Paul Rogers edited this page Sep 4, 2022 · 10 revisions

Here we walk though a typical scan query to get a sense of the steps involved and the various players.

The query arrives in QueryResource.doPost() where it is authorized, etc. A QueryLifecycle manages the query itself, while QueryResource passes HTTP parameters to the lifecycle, converts the query results to JSON, and handles errors. QueryResource obtains most of the information about the rest of the system via parameters passed in via Guice bindings.

The Druid code is cleverly designed so that much of the query lifecycle is agnostic about whether it runs on the broker or in a data node. This makes describing the query a bit difficult: large parts are identical, other parts differ.

A Druid query has roughly three parts:

  1. A description of the query (the "native query"),
  2. Planning what is to be done (in the oddly-named QueryRunners),
  3. Doing it via the execution stack.

These this breakdown and these names are not actually used in Druid; we just adopt those names to help the explanation. Planning itself is split into two phases we call "early" and "main". The resulting phases are thus:

  • The "early plan phase" covers the part which is independent (mostly) of the specific kind of query. This phase deals with the data source, segments, and obtaining the query-specific mechanisms. The early phase constructs a "stack" of query runners.
  • Then, in the "main" planning phase, the query runners (really, query planners) assemble sequences that will do the work.
  • Finally, in the execution phase, the sequence start reading, transforming and delivering data.

The phases repeat on each node: once on the broker, then again on every data node that runs some part of the query. We'll walk through all three phases: first on the broker, then on the historical node.

Broker

The broker is responsible for scattering queries to the data nodes, gathering the results, and merging them. The merge operation is also done on the historical nodes and so this part of the query is very similar across the two nodes. The parts before and after the merge differ, however.

Query Definition

A Druid query is comprised of multiple parts:

  • The native query
  • A query context of name/value pairs (stored as part of the native query)
  • An authentication result which says which actions are legal for a given user

The result from running a query has two parts:

  • The data, in the form of a Sequence
  • A `ResponseContext, with the non-data information about the query

A QueryPlus abstraction wraps:

  • The native query
  • User identity
  • Metrics

QueryPlus is a holder for the query, along with two other items: metics and identity. The query itself is meant to be serialized to JSON and provides a catch-all "context" field for other data. However, the metrics and identity should not be serialized, so can't be part of the context, and are thus held outside the query in the QueryPlus. The QueryPlus is pass throughout the plan (QueryRunner) phase.

Submit a Query

Native queries arrive in one of two ways:

  • As a native query, which includes authorization and other setup steps,
  • Translated from a SQL query, where authorization and setup has already been done.

Both paths call is to QueryLifecycle.execute(), which runs (actually, plans) the query:

  • Wrap the query and identity in a QueryPlus,
  • Run the query (see below)
  • Return a QueryResponse that combines the query results (actually, a mechanism to start execution and generate results) and the ResponseContext: extra "out of band" information returned to the client about the query.

Segment Resolution Phase

QueryPlus.run(QuerySegmentWalker, ResponseContext) starts the process with the entire set of information that describes as query:

  • QueryPlus
    • Native Query (Query)
    • User identity
    • Metrics (to be added later)
  • QuerySegmentWalker: which translates data source intervals into segments
  • ResponseContext to hold non-data query state.

QueryPlus.run(QuerySegmentWalker, ResponseContext)

QueryPlus.run() rearranges the arguments and calls a query-specific top-level runner. Then, the code calls QueryRunner.run(QueryPlus, ResponseContext) to invoke that runner. (Again, a "query runner" is really a "query planner".) The run call returns a Sequence which, when read, actually runs the query.

Query.getRunner(QuerySegmentWalker)

One might expect that calling Query.getRunner() would return a runner specific to a query type. In fact, that's not what happens. Instead, this method identifies the set of segments required for the query. Instead, this step determines how to access the segments associated with the query using a QuerySegmentSpec obtained from a DataSourceAnalysis.

DataSourceAnalysis accepts the data source name and query segment spec (QuerySegmentSpec) and returns the same. DataSourceAnalysis handles three kinds of data sources: a query (like SQL view?), a join, or a simple data source. We'll assume our query is a simple datasource. In this case, it simply gives us the intervals from the query for this data source in the form of a QuerySegmentSpec.

The code then calls QuerySegmentSpec.lookup(Query, QuerySegmentWalker) to provide a QueryRunner for the intervals to get a QueryRunner.

In the typical case of multiple segments, MultipleSpecificSegmentSpec.lookup(Query, QuerySegmentWalker) calls QuerySegmentWalker.getQueryRunnerForSegments(Query, List<SegmentDescriptor>) to get a QueryRunner for those segments.

Segment lookup is quite complex. Some cases handled:

  • Global (AKA "broadcast") segments available on all nodes.
  • Segments on remote notes (e.g. historicals)

The method then rewrites the query with the "globalized" query (which is the only kind supported in the SQL "Calcite Tests").

Query-Specific Query Runner

Once the segments are identified, the QuerySegmentWalker does the actual work of obtaining a query-specific runner by calling QueryToolChestWarehouse.getToolChest(Query) which routes the request to the QueryRunnerFactoryConglomerate (that is, a collection of factories) which typically looks up the QueryRunnerFactory for a given query class. QueryRunnerFactory produces both a QueryRunner and a QueryToolChest (as well as handling some merge tasks).

Note: the above may not be entirely right. Actually, it seems that the query toolchest is fetched multiple times: the above is just one of those times; this may not be the "main" fetch.

Note: It would seem that combining segment resolution and query-specific runner resolution combines concerns which should be separate. This is likely the result of design drift over time, but is an opportunity for simplification.

In the broker, the ClientQuerySegmentWalker implements the QuerySegmentWalker interface. For the test framework TestClusterQuerySegmentWalker:

  • Obtain a full description of the data source. For a regular Druid datasource, the result seems to be just the name we started with.
  • Resolve the QueryRunnerFactory. Make sure the query is supported.
  • Analyze the data source using DataSourceAnalysis.
  • Assert that the table must be concrete.
  • Obtain the tool chest from the factory.
  • Tasks associated with joins (details omitted in this summary since we focus on a single-table query)
  • Define the upper-stack of query runners.

Top-Most Query Runners

The top-most stack of query runners handles the "gather" portion of the "scatter/gather" query mechanism. In general:

  • Merge results
  • Issue "scatter" messages to data nodes

In detail:

  • QuerySwappingQueryRunner (perhaps only for tests?)
  • FinalizeResultsQueryRunner
  • QueryToolChest.postMergeQueryDecoration
  • QueryToolChest.mergeResults
  • table runner stack: see below

Table Runner

The table runner manages the "scatter" portion of the query via a VersionedIntervalTimeline which identifies which available segments cover which time intervals. The first case is simple: if there are no (loaded) segments for a table, then there is nothing to query. Otherwise, the code identifies the segment(s) covered by each interval in the query. This produces a List<WindowedSegment>. Each WindowedSegment is a (ReferenceCountingSegment, Interval) pair: that is, a segment and the interval to query within that segment.

Note: At least in the test path, the code seems to assume that every query segment is covered by a segment: there is no obvious path to handle where a query is for an interval that matches no segments.

With the list of segments in hand, the code creates either a NoopQueryRunner if there are no segments, or another stack of query runners. See below for the test stack:

  • FinalizeResultsQueryRunner
  • QueryToolChest.mergeResults
  • QueryRunnerFactory.mergeRunners (marked as deprecated)
  • For each segment:
    • SpecificSegmentQueryRunner
    • ReferenceCountingSegmentQueryRunner

Note: There is redundancy in the two above query runner stacks. The redundancy seems harmless and may occur only in the test path.

The ReferenceCountingSegmentQueryRunner locks each segment. It then uses QueryRunnerFactory.createRunner() to create the query-specific query runner.

Test Stack

The Calcite test framework is a bit of a mixture of Broker and Historical query runners, with test-specific bits thrown into the mix. For a Scan query, the stack is roughly:

For the test framework TestClusterQuerySegmentWalker:

  • QuerySwappingQueryRunner (perhaps only for tests?)
  • FinalizeResultsQueryRunner
  • QueryToolChest.postMergeQueryDecoration
  • QueryToolChest.mergeResults

In TestClusterQuerySegmentWalker.makeTableRunner():

  • FinalizeResultsQueryRunner
  • QueryToolChest.mergeResults
  • QueryRunnerFactory.mergeRunners (marked as deprecated)
  • For each segment:
    • SpecificSegmentQueryRunner
    • ReferenceCountingSegmentQueryRunner

From ReferenceCountingSegmentQueryRunner which calls QueryRunnerFactory.mergeResults. This method is typically complex and introduces multiple new levels of query runners and sequences. For the scan query:

  • Returns an inline QueryRunner which:
    • Concatenates results, if no ordering
    • Sorts the segment results, if needed
    • Does an n-way merge, if there is ordering.
    • Applies the limit, if any

(TODO) QueryToolChest.mergeResults() - For the scan query:

  • Inline query runner which adds
    • Inline limit sequence
    • Inline offset sequence

From QueryRunnerFactory.createRunner() and a scan query:

  • ScanQueryRunner

Note: The above can be very confusing because the logic appears in multiple places:

  • SegmentWalker
  • QueryRunnerFactory
  • QueryToolChest

QueryRunner Builder Calls

  • QueryLifecycle.execute()
  • QueryPlus.run(QuerySegmentWalker, ResponseContext)
  • BaseQuery.getRunner(QuerySegmentWalker)
  • SegmentSpec.lookup(Query, QuerySegmentWalker)
  • QuerySegmentWalker.getQueryRunnerForSegments(Query, Iterable<SegmentDescriptor>)
  • ClientQuerySegmentWalker.getQueryRunnerForSegments(Query, Iterable<SegmentDescriptor>)
  • TestClusterQuerySegmentWalker.getQueryRunnerForSegments(Query, Iterable<SegmentDescriptor>)
  • TestClusterQuerySegmentWalker.makeTableRunner(...)`
  • ...
  • QueryRunnerFactory.mergeRunners()

The tortuous call path seems to be because of the need to return a QueryRunner rather than information used to create a query runner.

Once this call is done, then execution bubbles back up and we call QueryRunner.run() on the query runners produced.

Broker Stack

The stack Broker query runners includes, from to to bottom:

  • ResponseContextMetricsInjectingQueryRunner
  • QuerySwappingQueryRunner

Then, the code uses FluentQueryRunnerBuilder to create more of the stack (this part is a bit hazy):

  • ResultLevelCachingQueryRunner
  • SetAndVerifyContextQueryRunner
  • RetryQueryRunner
  • Lambda from CachingClusteredClient.getQueryRunnerForIntervals()
  • UnionQueryRunner
  • Query-specific per-merge runner from QueryToolChest.preMergeQueryDecoration()
  • Query-specific merge runner from QueryToolChest.mergeResults()
  • FinalizeResultsQueryRunner
  • Query-specific post-process runner from QueryToolChest.postMergeQueryDecoration
  • CPUTimeMetricQueryRunner

The stack now unwinds back to QueryPlus.run(), which has just received the root QueryRunner. The code now calls run() on this query runner to run (actually, plan) the query.

Query Processing Pool

The above was written assuming that the entire query runs in a single thread, as occurs in the scan query (and in the test stack). However, most queries use an async model. QueryProcessingPool is the class that manages the pool of threads.

  • ForwardingQueryProcessingPool is a wrapper for single-threaded execution.

QueryRunnerFactory.mergeRunners(QueryProcessingPool, Iterable<QueryRunner<T>>) accepts the per-segment query runner list. This method then creates another stack of query runners to handle the scatter/merge operation.


Main "Plan" Phase

We now start running the QueryRunner stack. The QueryRunner interface is the standard way that queries are run: query execution is a deeply-nested stack of QueryRunner.run() calls. Most of the discussion from here down will follow that stack. Though the function is called run(), it turns that that a QueryRunner mostly plans: it creates the thing that will later run the query. Most query runners have one or more other query runners as "children." (The code variously uses the term "base runner", "delegate", "runner", etc. Here, we'll just use "child" in the sense that the runners form a tree similar to a typical query DAG, though Druid does not call the collection of query runners by that term.)

That "thing" is a Sequence: a conceptual ordered list of items which is materialized on demand. A Sequence is like a list, but without an actual materialized backing store. It is also like an iterator, but without access to the items. See [Sequences and Yielders] for more information. For now, just realize that QueryRunner.run() actually works out the nested set of Sequences which, when drawn upon, will run the query. There is no direct correspondence between QueryRunners and Sequences: some query runners simply pass along an existing sequence, others make many levels of sequences.

This part of the planning process mixes generic and query-specific bits. We already saw above that the "early" phase created the list of query runners. Now, the query runners create the Sequences that will run the query. For a scan query, the steps are:

ResponseContextMetricsInjectingQueryRunner is an experimental way to return metrics in the response footer.

QuerySwappingQueryRunner rewrites the query for some form of inlining.

FluentQueryRunner a vestige of the build process, described above, that simply turns around and calls its "base runner".

ResultLevelCachingQueryRunner handles the case where this exact query was run previously and the results were cached. If so, those cached results are returned, bypassing the remainder of the query runner stack.

CPUTimeMetricQueryRunner measures the amount of time taken by its child query runner. It runs its child, then wraps the child sequence in a new sequence that measures time. Here we come face-to-face with the fact that the run() method only plans, and that actual running is done by the returned Sequence. This is why the sequence is instrumented, not the child query runner.

FinalizeResultsQueryRunner is a row-by-row transform of the final results and is like the "project operator" of other query engines.

Lambda from ScanQueryQueryToolChest.mergeResults() which, despite its name, does not merge on the broker. Instead a possible offset and limit applied. Suppose our query has a limit. Then the limit is applied by by wrapping the child query runner in a Sequence driven by the ScanQueryLimitRowIterator class.

As a result, this is as far down the stack that we go in this pass: we return the transform Sequence which will run the next query runner in the chain once we start pulling from the returned sequence. (See below.) Here we'll just continue with the next child even though there is quite a bit of return-descend code to get to this next runner.

UnionQueryRunner handles potential UNION operations across tables. Simple queries have no union, and so this runner simply passes control to its child.

Lambda from ScanQueryQueryToolChest.preMergeQueryDecoration which, in this case, does nothing other than call its child.

SetAndVerifyContextQueryRunner which rewrites the query to apply a timeout and byte limit to the query, then passes this new query to its child.

RetryQueryRunner which handles the case where a segment is missing from the expected data node. Creates a Sequence which calls child runners send rewritten native queries to the data nodes reported earlier. However, things may change and those nodes may report back that the segments are no longer available. This runner looks up the new locations of those segments and retries the query on those nodes. This runner again introduces a deferred sequence, so control bubbles back up the stack until we ask for the first row, at which time we create a RetryingSequenceIterator to iterate over the retries. The hasNext() method of this iterator calls the child runner. Once the retries are complete. we create a MergeSequence to merge the results. (That is, the merge happens here, not in the method above which claims to do the merge.)

CachingClusteredClient which provides an endpoint for a data node. There is a call per data node. Each call produces a...

SpecificQueryRunnable which is dynamically created by the above call (that is, not created in the "early" stage) to query a specific set of segments. Obtains the timeline for a datasource to first determine which segments (files) are needed to serve the query. For each, it checks if partial query results already exist in the broker-side cache. For those without cached results, it then groups files by nodes which host those files. Assigns a priority to the query and assigns the query to a lane. Creates a concatenation Sequence (specifically, a LazySequence) to combine results from the cache and from data nodes. The result is another deferred sequence which causes another return/descend cycle.

LazySequence creates the a driver for each node on demand by calling CachingClusteredClient.addSequencesFromServer. This method delegates to BrokerServerView to locate the target data node. QueryableDruidServer represents each node and provides the QueryRunner for that node. In this case it is a DirectDruidClient.

DirectDruidClient forms a REST request to the target node, which contains the rewritten query as payload. The response is read via a HttpResponseHandler which is handed to HttpClient to run in a thread separate from the one issuing the request. Thus, the DirectDruidClient.run() call simply forms the HTTP request, defines a response handler, hands the request to another thread, and returns a Sequence which maps a JsonParserIterator which reads from the HTTP request running in the new thread. As a result, we see a quick series of calls to DirectDruidClient.run() to launch the clients, after which we return with the resulting Sequence that will return results as read.

Once the above is done, the stack unwinds and we return from QueryLifecycle.execute() with a QueryResponse which holds the outer-most sequence which will return our data.

Execution

Conceptually, data flows from the many HTTP clients back up the tree of sequences we've just discussed. At runtime, however, the mechanism is based on a "pull" metaphor. The HTTP request handler for the query (QueryResource.doPost()) writes the sequence results to a JSON encoder by asking for each result (event or batch of events) from the top-level sequence. That sequence satisfies the request by reading from its own input sequences, and typically does some transformation, as defined above.

Thus, we have data nodes sending data to the Broker HTTP client threads into a queue which is read, on demand, by the JsonParserIterator within the leaf-most sequence. This stack of sequences is anonymous: the sequences don't have names or, in many cases, even identities. Looking at the call stack, we see lambdas and generic Sequence functions, not a clear stack of operators. (The query profile seeks to make this chain clear.)

In the broker, the chain of sequences, created above, looks a bit like this:

Consumer (`QueryResource.doPost()`)
|
Transform (AKA "project", from `FinalizeResultsQueryRunner`)
|
Limit (from `ScanQueryQueryToolChest.mergeResults()`)
|
Merge (from `RetryQueryRunner`)
|
HTTP client (to data node) + JSON decoder (Multiple of these)

Thus, we see that the many QueryRunners reduce to a smaller set of "operators" (not a Druid term). The above is for a simple query and ignores things like UNION tasks, which would introduce another merge phase.

Historical

Early "Plan" Phase

The outer steps of the "early" plan phase are identical on the broker and historical. The key difference is the implementation of the QuerySegmentWalker class. On the historical node, this is implemented by the ServerManager: "Query handler for Historical processes". ServerManager.getQueryRunnerForSegments() first uses the query class as a key to look up the QueryRunnerFactory for that query. From that we get the QueryToolChest for the query. We also repeat the DataSourceAnalysis done earlier.

Next, getQueryRunnerForSegments() uses the SegmentManager.getTimeline(analysis) to get the VersionedIntervalTimeline for the data source. The timeline holds the segments available on this historical node. The timeline could be empty (segments have been removed from the node, say), in which case a special ReportTimelineMissingSegmentQueryRunner is returned. In the normal case, buildQueryRunnerForSegment() creates a stack of query runners for each requested segment. These include (from outermost to innermost):

  • ResponseContextMetricsInjectingQueryRunner (added later)
  • QuerySwappingQueryRunner (added later)
  • ResultLevelCachingQueryRunner (added later)
  • FinalizeResultsQueryRunner (added later)
  • SetAndVerifyContextQueryRunner
  • CPUTimeMetricQueryRunner
  • PerSegmentOptimizingQueryRunner
  • SpecificSegmentQueryRunner
  • MetricsEmittingQueryRunner
  • BySegmentQueryRunner
  • CachingQueryRunner
  • MetricsEmittingQueryRunner (again)
  • ReferenceCountingSegmentQueryRunner

In the stack, the top two are materialized at this time, the others are created later on demand. Since the set of segments on the node is subject to change, at multiple places the process can fail, and a ReportTimelineMissingSegmentQueryRunner is returned instead. The query-specific query runner is created below the last level of the above stack.

Here it is worth noting that Druid mostly avoids traditional classes and prefers to use closures to capture state. So, the query runner created above keeps list of intervals as state in closure variables, not as member variables. If you look at the resulting query runner in the debugger, it is a bit hard to see the target segment(s).

Main "Plan" Phase

The stack now unwinds back to QueryPlus.run(), which has just received the root QueryRunner. The code now calls run() on this query runner to run (actually, plan) the query.

CPUTimeMetricQueryRunner as described above.

FinalizeResultsQueryRunner as described above.

Note that it is hard to debug this next step: Eclipse does not allow setting a breakpoint inside the lambda inside the map call. It can be handy to instead use the debugger to inspect the child runner and then set a breakpoint in its run() function. This can be tricky when the query runner is itself a lambda!

Lambda from ScanQueryQueryToolChest.mergeResults() which limits results and is the same as we saw in the broker.

Lambda from ScanQueryFactory.mergeRunners() which does the actual merging. Merging here means from the various segments (files, indexes) managed on this historical. Merging can be done in three different ways, depending on ordering and data size. Each does its work by wrapping the Sequence from the child in the proper merging sequence:

  • Concatenation using the Sequences.concat() function.
  • Sorting using the StableLimitingSorter.
  • N-way merge using the MergeSequence.

All three techniques again do delayed execution of their child runners. Here it is particularly difficult to determine the child runners and to step into them.

The ScanQueryFactory.mergeRunners() accepted a list of SegmentDescriptors that describe the segments (files, indexes) to scan. The merge sequences above are implemented by mapping from the SegmentDescriptor to a query runner for that segment. This is another spot which is very hard to follow in the debugger: the query runners are created dynamically and we can't easily step into the code that does that work. Instead, we have to set a breakpoint in the outer method and see what code created the iterator over query runners that the merge code executes.

???

ReferenceCountingSegmentQueryRunner locks the target segment by incrementing its reference count, which ensures the segment remains hosted by the historical for the duration of the query. Reports the concurrency corner case in which the segment was on this node when the Broker checked, but is not on the node now. The resulting "missing segment" report is what triggers the retry in the broker-side retry query runner.

ScanQueryRunnerFactory.ScanQueryRunner which is an adapter between the QueryRunner protocol and the ScanQueryEngine that does the actual work. This runner is called only during the execution phase, which we discuss below.

Once the above is done, the stack unwinds and we return from QueryLifecycle.execute() with a QueryResponse which holds the outer-most sequence.

Execution Phase

As on the broker, at execution time we start pulling from the stack of Sequences we've created. For the historical, the leaf operation is a scan of an index, giving us an "operator" stack something like this:

Consumer (`QueryResource.doPost` on historical)
|
Limit (from `ScanQueryQueryToolChest.mergeResults()`)
|
Concat (from lambda from `ScanQueryFactory.mergeRunners()`)
|
Scan (`ScanQueryEngine.process`)

In this case, most of the action is in that "Scan" operator. The scan occurs against an index (segment, file) using a Cursor as created in QueryableIndexStorageAdapter.makeCursors. Each cursor returns one Sequence of results. (It is not clear when one index would return multiple cursors, so there may be only one in the normal case.) Internally, Scan does its own merge, so the last step expands to:

...
|
Concat
|
Cursor

To optimize the scan, filters are pushed down into the cursor and implemented as bitmaps that identify the target rows. These rows are then materialized from the individual columns.

Clone this wiki locally