-
Notifications
You must be signed in to change notification settings - Fork 0
Query Profile
A query profile is a collection of information about a single query used to diagnose performance issues for both developers and users. Query profiles are a standard feature of Drill, Impala (Cloudera version), Presto and other query tools. When aggregated, the information in the profile can show query access patterns needed to optimize performance of a cluster.
Each profile generally includes the query, the query plan and metrics about query execution. Metrics are unique to each product and reflect how the product operates internally. Our goal here is to design a query profile specifically for Druid.
Druid offers blindly fast performance, but only if the system is properly tuned for the data and queries. Historically, tuning Druid was more of an art than a science because one had to observe performance, know what to look for in data and queries, know how Druid operates internally, and thus know which changes might be beneficial. Though many experts developed the necessary skills, newer users are often left with trial and error since they've not yet developed a feel for how Druid works.
One way to address this issue is to make the performance drivers explicit: to show the areas that take time. One could use code-level solutions such as flame graphs, but our goal here is not to optimize the code, but rather the application. Knowing that function foo()
takes x% of the time might tell an expert that something is amiss, but it is unlikely to provide useful knowledge to the average user.
Similarly, measuring query time is useful to tell us where we stand, but just knowing query run time does not tell us why the query takes that amount of time, or what we can do about it.
Our premise here is that if we know, at an application level, why some part of a query takes time, then we are in a better position to make application-level changes that improves performance. Such changes typically include:
- Revisions to the data structure: roll-up "grain", number of dimensions, number of tables, etc.
- Partitioning: a form of top-level indexing
- Segment sizes which influence the cost of full table scans
- Distribution (the number of data nodes), which allows us to do more work in parallel.
Experience with query profiles in other products has shown that detailed query-level information is essential to visualizing, understanding, and improving each of the above factors.
The goal here is simply to gather the required information and make it available via a new REST call. We leave it to others to add analysis, visualization or other tools on top of the basic query profile mechanism.
Druid is a distributed system that offers queries as one of its services. As with any good REST API: each API call does one thing well. Thus, Druid has a collection of native queries, each of which does one kind of operation. Druid also offers a SQL layer that converts SQL queries into one of the native Druid queries. As a result, Druid does not use the classic "DAG"-based query execution model, but rather a series of RPC calls, each of which does an entire query, but with a fixed set of query-related operations.
A typical query profile uses the DAG structure to organize data. The Drill page linked above shows a visualization of the DAG, and the metrics gathered for each node (operator) within the DAG. Since Druid has a different model, the profile will be structured differently. Though Druid does not make a query DAG explicit, the Druid code implicitly operates as a set of steps which form a kind of "hard-coded DAG." For this reason, we will gather metrics according to the native query structure, but present them in the query profile in the form of a DAG. One advantage of such a structure is that the DAG for a query is determined by the query, and is fairly stable, while the native query implementation is subject to constant revision to increase performance. Thus, a DAG structure allows the profile to change more slowly than the actual code.
The profile presented to the user is JSON object that can be serialized into Java or other languages. The top-level object includes the following top-level fields:
- Query type: Native or SQL
- Query ID of the top-level query
- Status: succeeded or failed.
- Tables: list of tables which the query accesses
- Kind: Kind of data source
- Name: Name of the data source
- Dimension: list of dimension which the query accesses
- Measures: list of measures which the query accesses
- Intervals: list of intervals (rounded to segment boundaries) which the query accesses
- Root slice (see below)
- Attributes: open-ended map of key/value pairs to be used for any purpose and for information which does not fit above.
The other fields depend on whether the query is SQL or native. If SQL:
- SQL query text (if SQL)
- Has wildcard:
true
if the statement is of the formSELECT *
,false
if columns are listed explicitly. - Query plan
If native:
- Native query text (if native)
A distributed query plan consists of a set of logical operators that describe what is to be done. Then, a set of physical operators are distributed across the network to do the work. One or more physical operator is grouped in a fragment which executes on a node. Thus three an be many concurrent physical operators for each logical operator. In a full DAG-based engine, multiple exchanges shift data from one fragment to another. In Druid, the native query is roughly the equivalent of a fragment, and exchanges are limited to a single level scatter/gather process between data nodes (historical, peons) and the broker. Operators are implicit within the implementation of each native query. Here, we generalize Druid's structure into a DAG form.
The first level of structure for query metrics is the slice. For Druid a slice is either the "Jetty thread" in the broker, or a native query on a data node. For data nodes, a slice is implemented by a set of fragments: actually worker threads that do work.
A slice is called that because it contains a "slice" of the overall DAG. Again, in fully distributed system, multiple slices occur: data can flow across many exchanges during a single query. In Druid, there are two slices: the broker and a native query. Each slice contains a "sub-DAG" with its own slice-local root and leaf nodes. All slices have a Sender as its root. Broker slices have Receivers as its leaf, while data nodes have Scans at the leaf.
Slice fields include:
- Kind: root, native query kind
- Role: broker, data node
- Fragments: instances running on a node.
Fragment fields include:
- Host name: The host (and port) on which the fragment runs
- ID: the query ID assigned to the sub-query
- Status: succeeded or failed
- Time
- Rows
- Sender: the sender operator at the root of the fragment sub-DAG
The execution metrics present the query as a set of operators, defined below. The operators include:
- Kind: the kind of operator
- Rows: the number of rows produced by the operator
- Total Time: the wall-clock time for the operator and all its children
- Time: the wall-clock time for the one operator (total time minus the total time of any children)
The set of operators includes:
- Sender: the operator which sends results to the client (top-level query) or broker (data node queries)
- Child: the operator which sends data to the sender
- Merge: the operator which receives data from a subquery and merges it
- Children: the list of subqueries which send data to the merge node
- Segment metadata scan
- Segment read time
- Segment scan
- Open time: time to read the segment header, etc. (Details needed)
- Open bytes
- Blocks read
- Total scan time
- Total scan bytes
- Columns
- Name
- Bytes read
- I/O time (or total time to read, including decode)
(More TBD)
A "scan" operator is one which reads data from one of Druid's several data sources. Each kind of scan has its own unique characteristics.
Data nodes send results to the broker using the "response trailer" mechanism which Gian designed. That is, the data nodes gather metrics for a native query, and return them to the broker after the data portion of the response. The structure used within each data node is different than the above: it is an intermediate form more closely tied to the structure of the native query.
As per Gian's design, the data node returns the trailer only when requested. The broker (when profiles are enabled) will request the trailer. Druid clients can also optionally request a trailer. But, by default, no trailer is sent so that behavior is backward-compatible with existing clients who work with data nodes directly.
The broker manages each query and is extended to gather the response trailers from each data node. The information is merged with broker-level information to create the user form of the profile.
The profile is not returned with the query, doing so would add unwanted complexity to the many existing Druid clients. Further, most clients would not know what to do with the profile even if they received it.
Instead, the broker maintains a disk-based cache of profiles. At a high level:
- The broker plans the query, scatters subqueries, gathers results and trailers
- The broker sends data to the client, and gathers trailers into a "pre-profile" object
- After the last data byte is sent, before closing the connection, the broker places the "pre-profile" onto an internal queue
- The broker sends the final response to the client
- A worker thread picks up the pre-profile object, generates the user-level profile, and writes it to disk
- The worker may also prune old profiles, if configured to do so.
A new API, /v2/profile/<id>
, requests a profile, which is returned as JSON. The flow here is designed to avoid race conditions, especially if a client runs a query and immediately asks for its profile.
- First check the profile queue. If found in the queue, force (or wait for the completion of) generation of the user profile, and remove the entry from the queue.
- Check the disk cache for the profile.
- Copy the disk file contents to the client.
Note that this API is meant for short-term, immediate access to a query profile. In a production system, the broker should not store profiles for more than a brief time: minutes to hours. After that, to conserve resources, profiles are either deleted (because the probability of any single profile being requested drops exponentially with time), or are gathered and sent to some other system for aggregation or storage. Once the profile is removed, it is no longer available from the broker. It may be available from some other, third-party system.
This does create an awkward gap in coverage: during the time that the profile is removed from the broker, before it becomes available in that third-party system. The thought is that if a user wants to profile a single query, they will run the query, then immediately (with a few minutes) request the profile from the broker. Longer term, profiles are used in the aggregate, or to diagnose issues, and so the "gap" is less of an issue.
The profile queue is flushed asynchronously to ensure that that queries don't wait for the profile to be written to disk. As a result, profiles may be pending writes when the server exits. To ensure that no profiles are lost, the queue must flush all remaining profiles to disk on shutdown.
BrokerQueryResource
is the broker-specific handler for query requests. Extends QueryResource
. Use this as the lever to create broker-specific query profile handling. Have a profile handler in the base that does nothing. Replace it in broker with one that does the fancy stuff.
BrokerServerView
appears to be the place to hold the implementation of the query profile manager.
The Graceful
interface adds a shutdown feature to handlers. Implement this on BrokerQueryResource
to return a Future
which will flush the profile queue.
The ResponseContext
is used to gather profile data. The context is merged automatically. To retain specific values, we use a map in which each server and segment has an entry, and we merge maps to preserve individual keys. Thus we have:
ResponseContext["Fragment"] -> "Broker" | "DataNode"
ResponseContext["HostName"] -> <host name>
ResponseContext["PreProfile"] -> {<fragment>: {<host name>: {<segment>: <pre-profile>}}}
ResponseContext["NativeProfile"] -> <pre-profile>
The "Fragment"
and "HostName"
values are local-only, and are filled in as they become known. To speed up access within a single native query, we cache the native query pre-profile in "NativeProfile"
.
Each native query has an associated pre-profile object which gathers stats.