-
Notifications
You must be signed in to change notification settings - Fork 0
Query Structure
A typical query engine has two "phases": planning and execution. Planning converts an input query (typically SQL) into a set of relational operators. Execution wires up a set of operator implementations according to the plan, then running data through the resulting pipeline.
Druid does basically the same idea, but it its own unique way. For native queries, planning is done while running. Druid does not have operators in the traditional sense, but instead has a rather complex Function Programming (FP) model that, at the end of the day, accomplishes the same task.
This page tries to summarize the Druid design to help us understand the code.
The key thing to note is that Druid is, fundamentally, a distributed system build on the micro-services model. Druid consists of a set of nodes, each of which does a specific task, and most of which can be replicated to give a scale-out architecture. The query architecture evolved from REST API RPC design principles, not from the more traditional query engine theory. Thus, for those of us from a query background, it is an interesting challenge to map from the stateless FP RPC approach to the traditional data pipeline approach.
A native query in Druid is just a REST "resource" (API) configured in the Jetty web server. Druid uses Guice to wire up the components. Queries are defined by QueryResource
, and the broker-specific query resource is BrokerQueryResource
. Guice configuration is done in CliBroker
.
According to the documentation: "Druid's native query language is JSON over HTTP." So, our focus here are native queries. The basic flow is:
- Queries arrive at the broker at the
/druid/v2
endpoint which distributes queries by segment to data nodes. - Data nodes (historical, middleManager) run the query (one per segment)?
- Broker merges results.
Interestingly, the same Jetty handler serves both broker and data node queries.
The first step is to understand how Druid is structured. Many engines form queries as a pipeline of operators. Druid, instead, offers a set of pre-defined query types, each which performs some set of operations and returns results. Thus, we must understand each of these separately, though there is significant overlap.
The query classes are structured around the native query types: there is a query, factory and runner for each. The native queries are documented in Queries/Native queries. Also, to understand the attributes within each query type, see Queries/Native query components.
Various classes combine to run a query. Druid is based on the idea of a defined set of native queries; each of which runs an entire query.
-
Query<Result>
with the base classBaseQuery<Result>
- the Jackson serializable description of the query set over the wire and documented in the Druid documentation. -
QueryRunner<Query<Result>>
- executable form of the query which does the required processing and returns a result set. -
QueryRunnerFactory<Result, QueryType extends Query<Result>>
- Holds the query. When given a segment, returns a query runner for that query and segment. -
QueryLifecycle
- Tracks the query as it executes within a service. -
QueryPlus<Result>
- Wrapper around the query which also includes server-side properties such as the authentication identity, response context, and aQuerySegmentWalker
. -
QueryToolChest<Result, QueryType extends Query<Result>>
- Shared, service-specific implementation of the query. -
MapQueryToolChestWarehouse
- A global map from query type to the tool chest for that query. -
ResponseContext
- Response context shared by all query runners in the query runner stack (see below) and returned to the node which requested the query. Defines a known set of response values. -
QueryResponse
- Holds the query results and theResponseContext
.
The simplest place to get started is with the SegmentMetadataQuery
.
The basic flow on a historical node:
- Broker sends a
Query
subclass to a data node. - Message handled by
QueryResource.doPost()
- Query is given as an input stream.
- Creates a
QueryLifecycle
- Calls
readQuery
to deserialize the query, which invokes the query description constructor. - Initializes the query lifecycle with the query.
- Rewrites the query with a context given as a
Map
- Creates the shared, query-specific
QueryToolChest
which effectively maps from the description to the execution code.
- Rewrites the query with a context given as a
- If debug logging is enabled, log the rewritten query.
- Authorize the query
- Call
QueryLifecycle.execute()
to execute the query- Creates the
QueryPlus
- Calls
QueryPlus.run()
to run the query, which immediately calls the topQueryRunner.run()
. (See below) - Creates a
QueryResponse
with the results.
- Creates the
- If the
ETAG
is set, emit logs and metrics for the query. These are for the query as a whole, separate from metrics emitted by the query runner. Also, logs the native query itself, along with query stats. - JSON-serialize the results back to the caller.
-
QueryPlus.run()
executes the query- Locate the data source using
DataSourceAnalysis
: this mechanism handles nested queries, joins etc. See the Javadoc for that class for details. - Obtain the segment spec. (See
DataSourceAnalysis.getBaseQuerySegmentSpec()
.) - Call
MultipleSpecificSegmentSpec.lookup()
to create a query runner per segment. - The above delegates to
QuerySegmentWalker
to walk the segments and create query runners. - The actual implementation is in
ServerManager
, - Call
run()
on the top-most query runner. (See below)
- Locate the data source using
- (What) creates a thread to execute the query.
- The
QueryRunner
processes each segment and returns the results as anIterable
It seems that results are accumulated in a sequence, which is bubbled back up the query runner call stack.
Open questions:
- Appears that if the query hits one segment, the query runs in the Jetty thread that processes the REST call. What happens if there are multiple segments?
- Are results returned in pipeline form? Or, if we return a zillion rows, must they all be accumulated in memory?
-
FluentQueryRunnerBuilder
- builds the stack below, but just passes through at execution time -
ResultLevelCachingQueryRunner
= short-circuits execution to retrieve results from the cache if available -
CPUTimeMetricQueryRunner
- Measures CPU time taken by the child runner. Sets theCPU_CONSUMED_NANOS
response key with the value. -
FinalizeResultsQueryRunner
- Applies the "post compute manipulator function". -
ResultMergeQueryRunner
- Merges results from multiple segments using query-specific comparison and merge functions. -
SpecificSegmentQueryRunner
- CatchesSegmentMissingException
from child and adds the segment to theMISSING_SEGMENTS
response key. -
MetricsEmittingQueryRunner
- Computes run time. Emits theQueryMetrics<?>
the metrics output source. -
ReferenceCountingSegmentQueryRunner
- Adds reference count to the segment, handles failure of segment is not available. - Query-specific runner
The code appears to create a collection of results on the QueryRunner.run()
call. In fact, it creates a Sequence
which returns results, and which has events for the first and last rows. It is those events where the actual work occurs.
A query toolbox is a Query
, but one with implementation details added such as:
- Merge results, including functions for aggregation
- Create query metrics
- Caching strategy
- Filters
This class appears to be a form of "fixed DAG": the query perhaps goes though a known set of operations, with the tool chest providing the implementation for each operator. Thus, there is no overall query executor provided here; perhaps that executor is assumed and shared?
A client sends queries to the broker, or to any data (queryable) node. The broker looks at the intervals to determine the set of segments required for the query. The broker then issues another, focused query to the data node with that segment. From the data node's perspective, a query is a query whether it comes from a client or the broker. So, Druid has two kinds of queries:
- A (top level) query: A query sent from a client to the broker.
- A subquery: a query that runs on a data node as requested by another Druid node (such as the broker).
A subquery is such not from the perspective of the data node, but rather from the perspective of the sender: the sender is obligated to merge the results from multiple subqueries.
Other query engines have different terms to avoid overloading the "query" term:
- Query: a request submitted by a client. (One of the meanings of "query" in Druid.)
- Fragment: a part of a query plan which can be distributed. (A native query produced by a SQL query, say.)
- Slice: an instance of a fragment running on a specific node. (A "subquery" in Druid terms.)
Druid nodes use the DirectDruidClient
class to launch a subquery. This class is actually a kind of QueryRunner<T>
, meaning it looks to the rest of Druid just like any other query runner; it just happens to send the query over the wire to another node.
The query metrics class is a bit of a kitchen sink: its job is to provide information about the query (such as whether it has filters) and also to accept metrics about query execution (such as the number of bytes written.) The one task seems query-specific, the other seems emitter-specific.
The class provides a number of methods, such as numDimensions()
which have to be called from within the implementation itself, typically from query()
. In other cases, information is passed into the query from outside.
The metrics instance is created at multiple times. One time is in QueryPlus.withQueryMetrics()
, while another time is in QueryLifecycle.emitLogsAndMetrics()
. It seems that this idea was added later in the development cycle and is kind of hacked in.
All work of the query is done within an anonymous inner class in SegmentMetadataQueryRunnerFactory.createRunner()
. The actual work is well-defined: the factory is given a segment; the runner simply picks though the metadata on that segment to fill in the results.