Skip to content

Query Structure

Paul Rogers edited this page Oct 18, 2021 · 8 revisions

Overview

A typical query engine has two "phases": planning and execution. Planning converts an input query (typically SQL) into a set of relational operators. Execution wires up a set of operator implementations according to the plan, then running data through the resulting pipeline.

Druid does basically the same idea, but it its own unique way. For native queries, planning is done while running. Druid does not have operators in the traditional sense, but instead has a rather complex Function Programming (FP) model that, at the end of the day, accomplishes the same task.

This page tries to summarize the Druid design to help us understand the code.

The key thing to note is that Druid is, fundamentally, a distributed system build on the micro-services model. Druid consists of a set of nodes, each of which does a specific task, and most of which can be replicated to give a scale-out architecture. The query architecture evolved from REST API RPC design principles, not from the more traditional query engine theory. Thus, for those of us from a query background, it is an interesting challenge to map from the stateless FP RPC approach to the traditional data pipeline approach.

Server Structure

A native query in Druid is just a REST "resource" (API) configured in the Jetty web server. Druid uses Guice to wire up the components. Queries are defined by QueryResource, and the broker-specific query resource is BrokerQueryResource. Guice configuration is done in CliBroker.

Query Flow

According to the documentation: "Druid's native query language is JSON over HTTP." So, our focus here are native queries. The basic flow is:

Interestingly, the same Jetty handler serves both broker and data node queries.

Native Queries

The first step is to understand how Druid is structured. Many engines form queries as a pipeline of operators. Druid, instead, offers a set of pre-defined query types, each which performs some set of operations and returns results. Thus, we must understand each of these separately, though there is significant overlap.

The query classes are structured around the native query types: there is a query, factory and runner for each. The native queries are documented in Queries/Native queries. Also, to understand the attributes within each query type, see Queries/Native query components.

Classes

Various classes combine to run a query. Druid is based on the idea of a defined set of native queries; each of which runs an entire query.

  • Query<Result> with the base class BaseQuery<Result> - the Jackson serializable description of the query set over the wire and documented in the Druid documentation.
  • QueryRunner<Query<Result>> - executable form of the query which does the required processing and returns a result set.
  • QueryRunnerFactory<Result, QueryType extends Query<Result>> - Holds the query. When given a segment, returns a query runner for that query and segment.
  • QueryLifecycle - Tracks the query as it executes within a service.
  • QueryPlus<Result> - Wrapper around the query which also includes server-side properties such as the authentication identity, response context, and a QuerySegmentWalker.
  • QueryToolChest<Result, QueryType extends Query<Result>> - Shared, service-specific implementation of the query.
  • MapQueryToolChestWarehouse - A global map from query type to the tool chest for that query.
  • ResponseContext - Response context shared by all query runners in the query runner stack (see below) and returned to the node which requested the query. Defines a known set of response values.
  • QueryResponse - Holds the query results and the ResponseContext.

The simplest place to get started is with the SegmentMetadataQuery.

Data Node Query Execution

The basic flow on a historical node:

  • Broker sends a Query subclass to a data node.
  • Message handled by QueryResource.doPost()
    • Query is given as an input stream.
    • Creates a QueryLifecycle
    • Calls readQuery to deserialize the query, which invokes the query description constructor.
    • Initializes the query lifecycle with the query.
      • Rewrites the query with a context given as a Map
      • Creates the shared, query-specific QueryToolChest which effectively maps from the description to the execution code.
    • If debug logging is enabled, log the rewritten query.
    • Authorize the query
    • Call QueryLifecycle.execute() to execute the query
      • Creates the QueryPlus
      • Calls QueryPlus.run() to run the query, which immediately calls the top QueryRunner.run(). (See below)
      • Creates a QueryResponse with the results.
    • If the ETAG is set, emit logs and metrics for the query. These are for the query as a whole, separate from metrics emitted by the query runner. Also, logs the native query itself, along with query stats.
    • JSON-serialize the results back to the caller.
  • QueryPlus.run() executes the query
    • Locate the data source using DataSourceAnalysis: this mechanism handles nested queries, joins etc. See the Javadoc for that class for details.
    • Obtain the segment spec. (See DataSourceAnalysis.getBaseQuerySegmentSpec().)
    • Call MultipleSpecificSegmentSpec.lookup() to create a query runner per segment.
    • The above delegates to QuerySegmentWalker to walk the segments and create query runners.
    • The actual implementation is in ServerManager,
    • Call run() on the top-most query runner. (See below)
  • (What) creates a thread to execute the query.
  • The QueryRunner processes each segment and returns the results as an Iterable

It seems that results are accumulated in a sequence, which is bubbled back up the query runner call stack.

Open questions:

  • Appears that if the query hits one segment, the query runs in the Jetty thread that processes the REST call. What happens if there are multiple segments?
  • Are results returned in pipeline form? Or, if we return a zillion rows, must they all be accumulated in memory?

Query Runner Stack

  • FluentQueryRunnerBuilder - builds the stack below, but just passes through at execution time
  • ResultLevelCachingQueryRunner = short-circuits execution to retrieve results from the cache if available
  • CPUTimeMetricQueryRunner - Measures CPU time taken by the child runner. Sets the CPU_CONSUMED_NANOS response key with the value.
  • FinalizeResultsQueryRunner - Applies the "post compute manipulator function".
  • ResultMergeQueryRunner - Merges results from multiple segments using query-specific comparison and merge functions.
  • SpecificSegmentQueryRunner - Catches SegmentMissingException from child and adds the segment to the MISSING_SEGMENTS response key.
  • MetricsEmittingQueryRunner - Computes run time. Emits the QueryMetrics<?> the metrics output source.
  • ReferenceCountingSegmentQueryRunner - Adds reference count to the segment, handles failure of segment is not available.
  • Query-specific runner

Sequences

The code appears to create a collection of results on the QueryRunner.run() call. In fact, it creates a Sequence which returns results, and which has events for the first and last rows. It is those events where the actual work occurs.

QueryToolChest

A query toolbox is a Query, but one with implementation details added such as:

  • Merge results, including functions for aggregation
  • Create query metrics
  • Caching strategy
  • Filters

This class appears to be a form of "fixed DAG": the query perhaps goes though a known set of operations, with the tool chest providing the implementation for each operator. Thus, there is no overall query executor provided here; perhaps that executor is assumed and shared?

Subqueries

A client sends queries to the broker, or to any data (queryable) node. The broker looks at the intervals to determine the set of segments required for the query. The broker then issues another, focused query to the data node with that segment. From the data node's perspective, a query is a query whether it comes from a client or the broker. So, Druid has two kinds of queries:

  • A (top level) query: A query sent from a client to the broker.
  • A subquery: a query that runs on a data node as requested by another Druid node (such as the broker).

A subquery is such not from the perspective of the data node, but rather from the perspective of the sender: the sender is obligated to merge the results from multiple subqueries.

Other query engines have different terms to avoid overloading the "query" term:

  • Query: a request submitted by a client. (One of the meanings of "query" in Druid.)
  • Fragment: a part of a query plan which can be distributed. (A native query produced by a SQL query, say.)
  • Slice: an instance of a fragment running on a specific node. (A "subquery" in Druid terms.)

Druid nodes use the DirectDruidClient class to launch a subquery. This class is actually a kind of QueryRunner<T>, meaning it looks to the rest of Druid just like any other query runner; it just happens to send the query over the wire to another node.

QueryMetrics

The query metrics class is a bit of a kitchen sink: its job is to provide information about the query (such as whether it has filters) and also to accept metrics about query execution (such as the number of bytes written.) The one task seems query-specific, the other seems emitter-specific.

The class provides a number of methods, such as numDimensions() which have to be called from within the implementation itself, typically from query(). In other cases, information is passed into the query from outside.

The metrics instance is created at multiple times. One time is in QueryPlus.withQueryMetrics(), while another time is in QueryLifecycle.emitLogsAndMetrics(). It seems that this idea was added later in the development cycle and is kind of hacked in.

SegmentMetadataQuery

All work of the query is done within an anonymous inner class in SegmentMetadataQueryRunnerFactory.createRunner(). The actual work is well-defined: the factory is given a segment; the runner simply picks though the metadata on that segment to fill in the results.

Clone this wiki locally