-
Notifications
You must be signed in to change notification settings - Fork 0
Query Profile
A query profile is a collection of information about a single query used to diagnose performance issues for both developers and users. Query profiles are a standard feature of Drill, Impala (Cloudera version), Presto and other query tools. When aggregated, the information in the profile can show query access patterns needed to optimize performance of a cluster.
Each profile generally includes the query, the query plan and metrics about query execution. Metrics are unique to each product and reflect how the product operates internally. Our goal here is to design a query profile specifically for Druid.
Profiles in other products are used in two distinct ways. First, it helps answer the perennial user question: "why is my query slow?". Second, it helps developers understand query performance to identify further opportunities for improvement.
See an example profile here.
Druid offers blindly fast performance, but only if the system is properly tuned for the data and queries. Historically, tuning Druid was more of an art than a science because we were missing critical information (what actually happens within a query). Instead, one had to observe performance, know what to look for in data and queries, know how Druid operates internally, and thus know which changes might be beneficial. Then, one could intuitively know which change to make to improve performance, try it, and repeat the cycle to verify performance, or seek further improvements. Though many experts developed the necessary skills to know what to look for, newer users are often left with trial and error since they've not yet developed a feel for how Druid works.
A profile is distinct from the metrics which Druid produces today. Metrics report on the performance of the cluster as a whole and are designed to be aggregated by time, by node, etc. A profile, by contrast, is meant to explain the performance drivers, in detail, for one specific query. The structure of the profile differs depending on the query (in Druid, depending on the native query type.) Overall metrics tell us how many queries per second the system handles and average query latency. A profile explains exactly why query x tool the time it did: the amount of data processed, the way it was processed, and so on.
The profile is also not a flame graph. A flame graph shows the execution times within a Java process. While flame graphs help developers understand their code, they are not of great use to users since users don't know the mapping from query operations to Java implementation. A profile describes the query at the level of query operations, not code implementation. Our goal here is not to optimize the code, but rather the application. Knowing that function foo()
takes x% of the time might tell an expert that something is amiss, but it is unlikely to provide useful knowledge to the average user.
One way to address this issue is to make the performance drivers explicit: to show which operations within a query take time. Users will not optimize the code, but rather will change query structure, data layout or cluster configuration to improve performance: the query profile presents query details in those terms.
Similarly, measuring query time is useful to tell us where we stand, but just knowing query run time does not tell us why the query takes that amount of time, or what we can do about it.
Our premise here is that if we know, at an application level, why some part of a query takes time, then we are in a better position to make application-level changes that improves performance. For example, we want to know:
- How many segments where hit? Why? (Was a query phrased in a way that Druid must search all segments, or did it have a tight time range that let Druid select a small subset of segments?)
- Did the query make use of partitioning? Why or why not? (Ideally, most queries will have filters on the columns used for partitioning. If not, then perhaps the partitioning selection is not aligned with query usage.)
- How many rows were scanned? (If the query scans far more rows than those returned, then it is likely that the query uses filters which Druid cannot optimize. Perhaps a new column can be added that would allow index lookup instead of scanning rows.)
- How many rows were aggregated? (If the query scans many rows to aggregate down to a far smaller set of rows, then perhaps the aggregation should be done once at ingest time rather than for every query.)
- And so on. Once one realizes we can ask these questions, the community will suggest many more.
Users already make use of available metrics to make decisions about how to improve performance. A suggested above, query-level details provide more precise information where before experts relied on more general signals. The result of this information is to empower the user to make performance revisions such as:
- Revisions to the data structure: roll-up "grain", number of dimensions, number of tables, etc.
- Partitioning: a form of top-level indexing
- Segment sizes which influence the cost of full table scans
- Distribution (the number of data nodes), which allows us to do more work in parallel.
Experience with query profiles in other products has shown that detailed query-level information is essential to visualizing, understanding, and improving each of the above factors.
Query profiles in other products are the product of many years of effort: people use the profile to answer questions, realize something is missing, and modify the code to capture that information. We cannot hope to compress all that learning into this single project. So, we have more modest goals.
- Define the basic query profile structure, and gather metrics for some of the most basic query operations as a proof-of-concept.
- Provide a mechanism to transfer summary information about a query from the data node to the Broker, or from the Broker to the client. This builds on a "response trailer" mechanism which Gian prototyped.
- Since most clients won't know what to make of a query profile, provide a way for the Broker to persist profiles. This project will persist profiles on local disk, but will provide an extension point to add mechanisms to write profiles to a log collection system or other destination.
- A typical usage pattern is to run a SQL query from some favorite query tool, then inspect the results. So, this project will also offer a new REST call that can retrieve a profile, given a query ID, from the local profile storage.
We leave it later projects to add metrics for the wide range of query operations that Druid provides. We also defer to others to add analysis, visualization or other tools on top of the basic query profile mechanism.
The profile is intended for users, and provides information that can drive performance decisions. The profile preserves the essence of query execution, while abstracting away those implementation details which are not relevant to user decisions. Rather than reinvent the profile, we start with knowledge of what has worked well in the products mentioned above. For example, the Drill page linked above shows a visualization of the DAG, and the metrics gathered for each node (operator) within the DAG. The Impala query profile has a similar (though far more detailed) structure. This project will not provide such a visualization. However, the visualization comes from a JSON-structured profile organized by operator as shown above. The goal here is to provide a similar profile for Druid.
The profile must capture three essential levels of information:
- The (user submitted) query as a whole, especially for SQL queries where a planning step converts the SQL query to a native query.
- The scatter/gather model used to distribute the query.
- The detailed transformations of the data needed to implement the query, since these transformations are the primary within-fragment cost drivers.
Druid is a distributed system that offers queries as one of its services. As with any good REST API: each API call does one thing well. Thus, Druid has a collection of native queries, each of which does one kind of operation. Druid also offers a SQL layer that converts SQL queries into one of the native Druid queries.
A Druid query has two main parts. At the top level are a set of REST RPC calls that scatter the query across data nodes. Within each node, Druid executes queries using a unique functional design that combines query planning and execution in a set of (mostly) stateless functions.
A typical distributed query plan consists of a set of logical operators that describe what is to be done. Each logical operator typically corresponds to one of the well-known "relational operators" from relational database theory: scan, filter, join, sort, union, etc. Then, a set of physical operators are distributed across the network to do the actual work. One or more physical operators are grouped in a fragment which executes on a node. Thus three an be many concurrent physical operators for each logical operator. In a full DAG-based engine, multiple exchanges shift data from one fragment to another.
A typical query profile mimics this DAG structure to organize performance data for a query. The profile is also a DAG made up of fragments and operators, with performance metrics for each.
In Druid, the native query is roughly the equivalent of a fragment, and exchanges are limited to a single level scatter/gather process between data nodes (historical, peons) and the broker.
Operators are implicit within the implementation of each native query. Druid's query engine is based on a functional programming (FP) model, not a DAG model. So our first challenge is to decide how to structure the Druid query profile. Once choice is to mimic the Druid call structure, perhaps reporting statistics for each QueryRunner.run()
function. (A Druid query is simply a chain of calls to a large variety of these methods.) That is, perhaps the query profile can be structured like a frame graph with focus on the time spent in each run()
method.
However, a closer inspection reveals that the QueryRunner.run()
method is more like a QueryPlanner.plan()
method: each builds up the actual execution mechanism. There are more runners than there are execution steps. The runners represent the specific way that Druid is coded, not an essential description of the steps required to execute the query. The bulk of the work (transforms), occurs as side effects of requests to get the next item from a Guava Sequence
. Thus, a flame graph will not reveal anything about the aspects of query which the user can control (data layout, query design, cluster configuration, etc.), but will instead tell us how Druid is implemented (which is not something that the user can control.)
Instead, we want to focus on cost drivers: the actual steps (transforms) that take time. In Druid, these are represented using a Sequence
with a large number of additional components added via various ad-hoc mechanisms. As it turns out, each time Druid creates a new Sequence
more-or-less maps to a transform: the only reason to create a new sequence is to merge, filter, sort or otherwise transform data from the input sequence. Thus, Sequences
are closer to the level at which we want to gather detailed statistics.
Still, a Sequence
is a data delivery vehicle. Our interest is in the actual transform, which sits outside the sequence. In Druid, such transforms are not reified (that is, not represented as objects). Instead, they are represented by a series of lambda functions created with the QueryRunner.run()
call but executed as the Sequence
is read. These transforms have no name in Driud, they are just bits of code. Yet, these are what we want to instrument, so we'll call them (virtual) operators.
By making this move, we are now in a position to leverage the operator structure typical of a query profile. The profile is thus a tree (DAG) of (virtual) operators, each of which represents a bit of code in Druid that creates or transforms a Sequence
as defined in some QueryRunner.run()
method. The unusual details of Druid's execution are abstracted into patterns that are familiar (and relevant) to a data engineer who knows how queries typically work in Drill, Impala, Presto/Trino, Spark, Flink, etc. One advantage of such a structure is that the DAG for a query is determined by the query, and is fairly stable, while the native query implementation is subject to constant revision to increase performance. Thus, a DAG structure allows the profile to change more slowly than the actual code.
The profile presented to the user is JSON object that can be serialized into Java or other languages. We must define the structure of that JSON.
We've said we need to show information for the three main concepts: query, scatter/gather, and transforms. We've noted we must map from Druids FP design to the essential query structure relevant to the user. This gives rise to a profile structure with three levels: query, fragments and operators.
First, we have to understand a bit about how Druid queries operate in the large. A query arrives (typically) at a Broker (perhaps via the Router). If the query is SQL, the Broker translates the SQL into one or more native queries. If the query is already native, then we skip the SQL step. The Broker then determines the data nodes which hold data for that query, rewrites the query specifically for each node, and scatters the query to each data node. The data node then receives the query and does the necessary work. The key fact to notice is that the client could send a native query directly to a data node: there is no real difference between the internal query execution path for the broker and data nodes.
We map this unusual structure into typical distributed-system concepts. First, the point of entry into Druid is the (client) query: the query as sent from the client. This could be a SQL query, a native query sent to the Broker from a client, or a native query sent from the client to a specific data node. The query profile represents the "query" as a whole.
As explained above, Druid uses a scatter-gather pattern to execute a query. This is simply a two-level version of the fragment structure typical of distributed engines. So, we use the term fragment to refer each scatter step. Thus a query (as defined here) consists of one or more fragments, each of which is represented by a rewritten native query executing on a data node. Note that, from the perspective of the data node, a query is a query: it does not matter if it is client query or a fragment (often called a "subquery" in Druid). The profile, however, does make this distinction: a Druid subquery is a 8fragment*, a (non-sub)query is a client query.
Then, as explained earlier, we map Druid's functional programming based query engine into a set of operators, where each operator represents a point in the execution where data is read, or one sequence of results is transformed into another.
This gives the following basic structure:
- query (as submitted by the client)
- fragments (the scatter stage, each fragment is a subquery)
- operators (the transforms within the subquery)
- fragments (the scatter stage, each fragment is a subquery)
The actual structure is bit more subtle. The Broker does not simply issue a series of subqueries (fragments), it also combines them. As a result, the query is actually a set of Broker-side operators, one of which sends the query to a data node. Thus, the actual structure is:
- query (as submitted by the client)
- operators (the transforms within the query)
And, one of those operators is a exchange which handles one specific scatter/gather data operation. An exchange has two parts: a sender on the data node side, and a receiver on the broker side. So:
- receiver operator
- fragment (a subquery)
- operators
- fragment (a subquery)
For native queries, the query and the fragment are basically the same (since the code, for the most part, handles the two cases identically.) But, for a SQL query, we have additional steps such as planning. This gives the final structure:
* query (as submitted by the user)
* query-level information (SQL text, query plan, etc.)
* root fragment
* merge operators, etc.
* receiver
* fragment ...
* operator ...
* scan (read data from a segment, etc.)
Or, in quasi-BNF form:
query: query-attribs, fragment
fragment: fragment-attribs, operator
operator: merge | receiver | scan | ...
receiver: receiver-attribs, fragment
The set of operators depends on the specific query: a simple query may have very few operator levels, a complex query may have many.
The actual set of operators will emerge as we implement the profile: defining them is a bit of an art since we must invent them as we go along.
With the structure defined, we can now turn to the individual objects. The profile is a JSON-serializable set of Java objects created as the query executes. (See a later ticket for the details of how that mechanism works.) The objects follow the structure above. Each object holds information such as:
- The way that Druid implemented an operation (when Druid has a choice and that choice is based on something in the query or fragment.)
- The number of rows processed. (Data engineers think in terms of rows, so rows are a familiar metric.)
- The time taken.
The profile need not repeat information readily available in the native query or in the table schema, unless that information is critical to understanding a cost driver. (For example, knowing the cardinality of one dimension in one fragment is helpful for understanding the selectivity of a filter on that dimension.)
Rather than spell out the details of the profile here, we leave that to an example and detailed comments in the Java query profile objects.
The set of fields listed here are conceptual: the names are descriptive and often times a we use a single entry here for things that may take multiple fields in the actual JSON. Later revisions will fine-tune this text as the detailed structure takes shape.
TBD
Possible fields:
- Tables: list of tables which the query accesses
- Kind: Kind of data source
- Name: Name of the data source
- Dimension: list of dimension which the query accesses
- Measures: list of measures which the query accesses
- Intervals: list of intervals (rounded to segment boundaries) which the query accesses
- SQL query text (if SQL)
- Has wildcard:
true
if the statement is of the formSELECT *
,false
if columns are listed explicitly. - Query plan
When a native query arrives at a data node, we treat it as a fragment. For now, we also treat a native query as a fragment even if it arrives at the Broker.
Internally, a fragment corresponds to a "Jetty thread" in a Druid service: the one that receives a query on the /druid/v2
endpoint.
The fields of the fragment object include:
- Node: the host name and service of the node on which the fragment ran.
- Query: the native query, which includes the query ID and data source.
- Status: succeeded or failed.
- Columns: the union of all columns used anywhere within the query (retrieved, as filters, etc.)
- Wildcard: indicates if the above list of columns comes from an explicit list, or was expanded. (Important when tracking column usage.)
- Performance: metrics for the entire fragment such as run time, number of rows, number of bytes, etc.
- Root operator: the top-most transform within the fragment. See below for details.
- Attributes: open-ended map of key/value pairs to be used for any purpose and for information which does not fit above.
The above makes the following assumptions:
- The location of the query ID within the native query is fixed, so we need not repeat that ID at the fragment level.
- Each native query processes a single data source, so the entire query is about that one data source.
- The data source is also at a fixed, known location in the native query, and thus need not be repeated at the fragment level.
Notes:
- Is there a way to differentiate between a native query that acts as a subquery vs. one that acts as a user-provided query? If so, then we can wrap the client-provided native query in a Native Query object.
- If so, then we need not repeat the native query in each fragment: we can list it once in the Native Query object, then only list the rewrites in each fragment (assuming we know that, say, only the intervals or other details change.)
We've said that an operator is a user-level abstraction that describes some collection of code in Druid that transforms data. We've noted that such transforms in Druid are not reified (represented as objects), but are instead simply collections of functions, closures, lambdas and other programming knick-knacks needed to perform the desired transform. Thus, the set of operators here are an attempt to identify and name the transforms. The set of operators is a design decision, nor a simple 1:1 mapping to the (non-existent) Druid operators.
Every operator has the following fields:
- Kind: the kind of operator
- Rows: the number of rows produced by the operator
- Total Time: the wall-clock time for the operator and all its children
- Time: the wall-clock time for the one operator (total time minus the total time of any children)
Because of the way Druid is coded, it is very difficult to extract information as simple as the row count and execution time. So, we proceed on a best-effort basis: filling in those numbers that we can gather, deferring to later those that require substantial extra work to obtain.
The set of operators includes:
-
Receiver: the operator that drives a single scatter/gather RPC call on the sender side
-
Sender: the operator which sends results to the client (top-level query) or broker (data node queries)
- Child: the operator which sends data to the sender
-
Merge: the operator which receives data from a subquery and merges it
- Children: the list of subqueries which send data to the merge node
-
Segment metadata scan
- Segment read time
-
Segment scan
- Open time: time to read the segment header, etc. (Details needed)
- Open bytes
- Blocks read
- Total scan time
- Total scan bytes
- Columns
- Name
- Bytes read
- I/O time (or total time to read, including decode)
(More TBD)
A "scan" operator is one which reads data from one of Druid's several data sources. Each kind of scan has its own unique characteristics.
The Broker executes a query by distributing (scattering) fragments (subqueries) to data nodes (historicals and real-time), then receiving (gathering) the results. The Broker runs fragments in parallel, each managed by a DirectDruidClient
instance. The "receiver" operator abstracts out this structure into the part that the user cares about: what was distributed and how much data came back?
Scatter/gather information can be gathered from Druid metrics by aggregating over each of the data nodes, and aggregating the query/node/*
metrics, then assigning the results to the data nodes. (This has been prototyped in Python: doable, but not trivial.)
The receiver gives the same information, but already organized by data node. This allows us to see, for example, if there is skew in response times or row counts across nodes. If there is, then perhaps we need to rebalance the cluster, add nodes, or check if some particular node is impaired.
Ideally, we'd see that every data node returns about the same (small) number of rows. If a data node would return no data, then the Broker should have pruned that node using segment and partition information. If we see many data nodes returning zero rows, then perhaps we want to ensure the query has the proper time range and consider changes to partitioning.
A receiver executes a fragment. That fragment appears in JSON as child of the receiver node. The receiver itself adds additional information such as the total execution time on the Broker side, any timeout errors, etc.
Note that many distributed query engines pair a receiver operator on the "downstream" side of the DAG with a "sender" operator on the "upstream" side. However, in Druid, all requests are REST (request/response) based, so the HTTP response handler the "sender" role. The fragment represents an HTTP request, so we simply fold the sender concept into our definition of fragment.
Much of the Druid query "pipeline" (stack of FP functions that give rise to Sequence
s) deals with combining results. Depending on the query, combinations can be of the form of:
- Concatenate: simply combine the result sets one after another
- Merge: given a set of ordered inputs, merge them to produce a combined, ordered result
- Sort: given a set of unordered inputs, sort then merge them to produce a combined, ordered result
The costs of these strategies obviously differs greatly, so the user would prefer to use concatenation if possible, else a merge, and only resort to an expensive sort if no other choice is available.
We model each of these transforms as a separate operator, each with metrics relevant to that operator.
Note that, at present, it is fiendishly difficult to extract times and row counts from the code that implements these operators. Solving this issue is a work in progress.
The power of Druid comes from the segment file structure: the ability to find data quickly using dimension indexes. Thus, a key part of the profile is to describe how well Druid was able to use these features. The kinds of information we want to gather includes:
- The scan strategy: vectorized or not
- Indexes used to prune the query
- Number of columns retrieved
- Ideally, the number of bytes read per column
- Total number of rows materialized (since materialization is costly)
- Number of rows discarded by non-indexed "pushed down" filters
- Final number of rows returned from the scan
- Time to do the scan
- Ideally, I/O costs to read the segment file (time, bytes, etc.)
If we understand the scan, we can tune or query or data. For example, if we see that our filters discard a large number of rows, we can ask if there is a way to convert that filter into an index lookup. For example, if we are checking a range (which requires a filter step), perhaps we can change it to a list of values (which can be resolved using an index.) If we are using a regex to find a value (which requires a filter step), then perhaps we can pull out the relevant value on ingest into another dimension so the query can do an index lookup instead. And so on.
If we can see per-column details (bytes read and I/O cost), we can perhaps identify costly columns and either remove them from queries where performance is critical, or find a way to store the columns more efficiently. For example, if we are tracking web requests, and we store each URL, then we are unlikely to get much dictionary compression. If, however, the query wants just the "asset" part (web page, say), then we can tear apart the URL on ingest and store the asset separately so that we get better compression and thus lower I/O costs.
Knowing the I/O costs will tell us if we have a disk bottleneck. For example, a low cost means that we have fast storage and sufficient memory that the OS can cache our segment files. A high I/O cost tells us that we are not getting much benefit from the OS file cache and we need to increase memory (or add more nodes.) A very high I/O cost may tell us that we need to upgrade the storage used for the local segment cache.
Druid is large with many QueryRunner
implementations. We can't expect to instrument them all in one go. Instead, we provide a fall-back. Any code path that is not yet implemented will be represented in the profile as an "unknown" operator: there is something there, we just don't know what yet (that is, that operation is opaque.)
The query profile builds on the he "response trailer" mechanism which Gian designed. The response trailer introduces a new request header that asks that the results be returned in the form of a JSON object that has one field for the data, and another for the trailer. The trailer is designed so that it can contain data about the query as a whole, after all data is sent, but before the HTTP request is closed. The trailer is not sent (and the original array-form of results is used instead) if the new request header is omitted.
The Broker and data nodes use this mechanism to gather metrics for a native query, and return them to the client as one of the trailer fields. The Broker is extended to handle the response trailer, and will assemble the fragments into the overall query profile as explained above. (Note: in practice, there will likely be some option to enable and disable profiles globally: there is no need to gather them if a particular site has no use for them.)
The client is a different issue. No existing client is aware of the new request header option, nor of the request trailer. Adding the trailer changes the REST JSON response from an array of rows to an object. Thus, clients must be changed if they wish to use the trailer (and thus the query profile.)
However, taking a step back, it is clear that most clients wouldn't know what to do with a profile even if they received one. Your average JDBC client, BI tool, pydruid
client, or SQL editor has no mechanism to do anything useful with a profile. Certainly some might be extended, but not all.
Still, it is useful to allow a client to request the trailer, if only for experimentation and testing. (Indeed, given Druid's design, it would be hard to prevent a client from requesting the trailer.)
Below we'll discuss an alternative way for the client to work with profiles.
The query profile builds on earlier work to add a response trailer to a query. The response trailer changes the JSON result from an array of rows to an object which contains fields for the rows and the trailer. This change must not impact rolling upgrades. We must handle two cases:
- Upgrade from pre-profile nodes to profile-enabled nodes.
- Upgrade from one profile-enabled version to another.
This solutions below should allow for rolling upgrades with any combination of (new|old) → (new|old) versions.
In a Druid cluster, any node can be upgraded in any order. (Verify.) So, we must handle the potential protocol issues.
As per Gian's design, the data node returns the trailer only when requested. But, by default, no trailer is sent. This becomes the basis of our initial upgrade strategy.
Druid will ship with query profiles turned off. Thus, even if a data node is upgraded to one that can produce a profile, that profile will simply be discarded and not sent to the broker. This means that we can upgrade either the broker or data node first.
- A newer data node will gather the profile, but will not send it unless the required request header is present. Since old brokers won't include the header, a newer data node will return the "traditional" response so that the cluster operates properly.
- A newer broker will ship with query profiles turned off. Since the newer broker will not include the trailer-request header, we operate in the same mode as above, and so it does not matter if data nodes are older or the same version as the broker.
- If an over-eager user upgrades the broker and immediately enables query profiles, then the broker will send trailer-request header, but the old data nodes will ignore it and return the traditional response. The broker must be prepared to receive this traditional response and fill in a "stub" dummy fragment for the missing profile. Once the data nodes are upgraded, then the newer, trailer response will become available, and the query profiles will contain the full fragment information for those nodes.
The next consideration is to upgrade from an older profile-supporting version to a newer one. In this case, the structure of the JSON objects may change: the data node may add additional fields that the receiver must handle. This is an issue if the Broker deserializes profile objects sent from data nodes: deserialization will fail if the JSON structure changes.
We can expect subsequent releases to add detail to the profile as we gain experience. We may, for example, add a new foo
field, or introduce a new operator type, or perhaps remove a no-longer-needed bar
field. In all such cases, using JSON deserialization is problematic: the data node cannot return a changed structure unless it knows that the client can deserialize that structure. In the extreme, this requires both the sender and receiver to be of the same version — or that the profile structure never changes.
One solution would be to know the version of the node to which a message is sent, and provide the profile in a form compatible with that node. Such an approach is difficult, however. First, Druid is based on the HTTP protocol, there is no good way to learn the version of request sender from a REST request. We might obtain the version from ZK instead: each node registers its version along with its identity. However, the version written into ZK would likely be the (vendor specific) software are version, not the profile JSON version. Also, Druid nodes are identified by (host, port) pairs, but the HTTP request does not include the port number.
Further, the profile is likely to be complex, with fields that evolve along with the query engine implementation. Converting that information from one version to another is likely to be complex and error prone.
To avoid this problem, we can specify that the sender (the node that receives the trailer) should make no assumptions about the structure of the profile: only that it is some form of JSON. This can be done if the JSON is deserialized as a generic map rather than as a specific Java object.
Such an approach is fine if we do not require any Druid server software to interpret the profile, only to create it. We leave interpretation to some other system: one which can handle changes in the profile details.
Open Issue: the above approach suggests that the profile contains one fragment node for each data node. This is probably fine for small systems, but may be unworkable for larger clusters. In that case, we'd want to merge the results in the Broker (just as we merge the actual data), which does require that the broker deserialize the returned profile JSON.
We've just pushed version compatibility issues to the consumer of the profile. Now let's discuss the consumer's perspective.
Given the two upgrade strategies, the query profile may be incomplete or inconsistent from the consumer's perspective:
- A query may have missing fragments (during the initial upgrade)
- A query may have a "top portion" (from the Broker) at a different version level than the fragments received from data nodes.
Since queries are plentiful, and upgrades infrequent, the simplest consumer strategy is simply to discard such incomplete profiles.
One possible improvement is to define a profile format version number distinct from the Druid version number. We increment the number each time the format changes. The format version appears as a field within the fragment object.
With this, we can easily tell if all fragments (including the top-level "root" fragment) have the same format version, and what that version is. A consumer can ignore profiles with mixed versions.
In fact, the broker could help: it could discard mixed-version profiles, perhaps simply logging a message noting this decision. If the broker does the pruning, then the consumer need only consider the top-most version number, and expect that the entire profile is at that version.
The broker manages each query and is extended to gather the response trailers from each data node. The information is merged with broker-level information to create the user form of the profile.
We've noted that the profile is not typically returned with the query since doing so would add unwanted complexity to the many existing Druid clients. Further, most clients would not know what to do with the profile even if they received it.
Instead, the broker splits the profile from the query, and provides an extendable mechanism to consume the profiles.
Apache Druid provides two mechanisms: discard and disk-based ache. The discard mechanism simply throws away the profile, and is the default choice on new installs.
Apache Druid can make no assumptions about the users cluster other than that local storage is available. So, we offer a disk-backed profile cache similar to those provided by Drill and Impala. At a high level:
- The broker runs the query, returns the data, and assembles the profile.
- After the last data byte is sent, before closing the connection, the broker places the profile onto an internal queue
- The broker sends the final response to the client
- A worker thread picks up the profile object and writes it to disk
- The worker may also prune old profiles, if configured to do so.
Once profiles are on disk, they can be collected by an external system, or just used in place when needed.
The profile queue is flushed asynchronously to ensure that that queries don't wait for the profile to be written to disk. As a result, profiles may be pending writes when the server exits. To ensure that no profiles are lost, the queue must flush all remaining profiles to disk on shutdown.
A new API, /v2/profile/<id>
, requests a profile, which is returned as JSON. The flow here is designed to avoid race conditions, especially if a client runs a query and immediately asks for its profile.
- First check the profile queue. If found in the queue, force (or wait for the completion of) generation of the user profile, and remove the entry from the queue.
- Check the disk cache for the profile.
- Copy the disk file contents to the client.
Note that this API is meant for short-term, immediate access to a query profile. In a production system, the broker should not store profiles for more than a brief time: minutes to hours. After that, to conserve resources, profiles are either deleted (because the probability of any single profile being requested drops exponentially with time), or are gathered and sent to some other system for aggregation or storage. Once the profile is removed, it is no longer available from the broker. It may be available from some other, third-party system.
This does create an awkward gap in coverage: during the time that the profile is removed from the broker, before it becomes available in that third-party system. The thought is that if a user wants to profile a single query, they will run the query, then immediately (with a few minutes) request the profile from the broker. Longer term, profiles are used in the aggregate, or to diagnose issues, and so the "gap" is less of an issue.
Details of the design are available in the prototype branch. Here we discuss several higher-level design decisions.
The query profile is made up of a set of Jackson-serializable Java objects defined in the org.apache.druid.query.profile
package. Each is designed to be as simple as possible, using objects easily interpretable by consumers, which we assume to be in Python or third-party Java code. Fields are omitted aggressively if not set.
Eventually the operator profiles may want to move into the package that defines the "operator" (that is, the bit of code where the operator profile is created.)
At query run time, the profile is built up as a tree, from the top down. The profile tree represents state, and this becomes the key implementation issue: Druid is based on a (semi-)stateless functional programming model. Our task is to find chinks in Druid's stateless FP model where we can gather the stateful profile information.
It is helpful to start with an understanding of how Druid queries operate. There are two key concepts:
-
QueryRunner.run()
creates an FP-style compositions of functions that "run" the query. The method takes a query and returns aSequence
of results. However, despite the name, this function does not actually run the query, it only creates the mechanisms that will do so. - Guava
Sequence
is a non-materialized collection of objects (batches of rows in Druid) that materializes objects as requested. TheSequence
in Druid does the actual work of scanning segments, combining result sets, etc.
Thus QueryRunner.run()
should be thought of as "QueryPlanner.plan()
" while Sequence
can be thought of as an unusual form of query operator. Each request of results from a Sequence
invokes a large number of functions which generates the data by reading data or applying a transformation to the results of other Sequence
s. The entire assembly is meant to be stateless and opaque.
Most of the actual transformation code is in the form of lambda functions, anonymous inner classes, and variables provided via closures, all of which make it very difficult indeed to get at the state we want to observe.
The most direct solution would be to modify QueryRunner.run()
to explicitly accept (or return) the objects needed for the query profile. However, this class is marked as an API, and has many dozens of implementations, many a anonymous inner classes. Thus, the direct solution is too intrusive. We need a "back door."
Fortunately for us, Druid has three chinks in its stateless FP armor. The first is ResponseContext
:
public interface QueryRunner<T>
{
Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext);
The QueryPlus
would seem like a logical candidate to gather query-level information. However, it is meant to be immutable, and is constantly rewritten down the run()
call stack, making it a poor choice for our needs.
The ResponseContext
is primarily used to return query-level results to the client. A single instance is passed down the entire call stack. We extend it to help us build the profile tree.
The second is QueryMetrics
:
public FilterAnalysis analyzeFilter(
@Nullable final Filter filter,
ColumnSelectorBitmapIndexSelector indexSelector,
@Nullable QueryMetrics queryMetrics
)
QueryMetrics
is often passed into lower-level bits of code that don't have access to the ResponseContext
, such as the example above. It is used to gather metrics sent to an external metrics system. We extend it to capture some of those metrics for the query profile, and to also build up the profile tree.
The third back-door is the use of closures. Druid has many functions that do not accept either of the two "back doors". In some cases we can simply extend the function signature. In other cases, the function signature is marked as part of the public API, and so we cannot extend it. Here, we exploit the fact that, in Druid code, most of those functions are created inline, so that state can "leak in" via closures. This gives a back-door way to "inject" an operator profile into a lower-level function that does not accept one of the above two stateful back-doors.
The response context appears to have evolved over time. It probably started as a set of fields to return in the header. Later, additional internal-only fields were added. Now, we add trailer-only fields.
The response context operates in a scatter/gather pattern. Individual sub-queries (Druid has no good name for these) populate the context at the level of either multiple fragments or individual fragments. Then, higher levels gather and merge the individual contexts. Some of the merging is done within a process, some is done across processes.
When a server exchange occurs, the context can be truncated to fit a header size limit. When that is done, information is discarded. For the profile, we want to retain all information, so we must be careful when we pull information from the response context to avoid pulling truncated information. Because of this limitation, it turns to be clearer to not use response context information, but instead gather information directly into the profile objects.
Still, the ResponseProfile
class is the Swiss Army knife for this project. It gathers fields which are returned in the response trailer: the profile becomes one of those fields. And, since it provide one of the three stateful back-doors, we add methods to build up the operator tree.
Each QueryRunnner.run()
that builds a transform also creates an instance of an operator profile designed for that transform. That profile is then passed into whatever code will do the work (via one of the other back doors). Further, the profile is then pushed onto an operator profile stack in the ResponseProfile
.
Most run()
functions turn around and call run()
on one or more child QueryRunner
s. In this case, those children have pushed their own operator profiles onto the stack in ResponseContext
. So, to complete the tree, the parent run()
method "harvests" its child profiles from the ResponseContext
stack and places them in some slot in its own profile.
This mechanism works as long as all the profile reads and writes to ResponseContext
happen within a single thread. Interestingly, it does not matter if the updates to the operator profile happen in other threads as long as only one thread writes to each profile object. That is, once the profile is create in the main thread, the profile can be updated in a separate worker thread with no concurrency issues. (Profile objects are written, but never read except at the end for deserialization.)
The (awkward) flow is thus:
// Get the usual suspects inside each runner
QueryPlus<?> queryPlus = ...
QueryRunner<?> runner = ...
ResponseContext responseContext = ...
// Create a profile for this operation
MyProfile profile = new MyProfile(...);
responseContext.pushProfile(profile);
// Run (really, "build") the child operation
runner.run(queryPlus, responseContext);
// Pop the child's profile. (Also a version for multiple children)
profile.child = responseContext.popProfile();
// Fill in other interesting details for this operation
profile.something = ...
Runners that do nothing "statsworthy" (just build structure but don't define a transform leave the child node unchanged, and no new profile node.
To handle the initial state, when most code paths are not yet instrumented, a call to the pop()
method will return the OpaqueOperator
(see above) to mark that code path, yet still provide a structurally-valid profile.
The top level QueryLifecycle
pops the top-most operator to place as the root for that fragment.
Let's be frank: this is an ugly design. But, given the constraints of the public APIs and the (semi-)stateless FP code within Druid, it has the lone virtue of actually working to allow us to build the profile.
The QueryMetrics
class seems like it should be a handy tool for gathering profile data. However, design limitations prevent that use. QueryMetrics
is shared by all code in the query, but we want to gather per-transform metrics. The class is tailored to writing data into a metrics system: it does not gather, retain or aggregate data on its own. Context is provided by a set of dimensions, but that context does not give us per-operator visibility. It is too closely tied with the idea of emitting metrics to a metrics system and is not well suited to gathering metrics to build up a profile: the API is just too wrong for this task.
Instead, we use QueryMetrics
primarily as a back door to pass an operator profile back from code that has no other way to introduce state. This is done by adding the ResponseContext
to the QueryMetrics
. Since QueryMetrics
is a public API, we cannot add a new method. Instead, we create wrapper classes and static methods to do the required work.
Further, QueryMetrics
can be null
if metrics are not enabled, and so is thus not available to carry the operator profile out of a function. Thus, in this case, we have a different wrapper that implements the functions we needs, but just discards values passed in to the other functions.
Druid should have this concept anyway to void the many "if null" checks required by the current scheme. The place to create the wrapper would be DruidMetrics.makeRequestMetrics()
. (We leave this as an exercise for later.)
Another role for QueryMetrics
is to point to places in the code where developers thought it useful to gather metrics. Though it seems we can't piggy-back on top of this class, we can add code to the same spot to gather the same data in the profile format.
QueryResource
is the HTTP handler for query requests. It is extended to return the response footer if requested.
QueryLifecycle
manages the overall query and reports results. It contains most of the information needed to create the fragment profile, but is missing some information available only in the BrokerQueryResource
.
QueryLifecycle
is revised to more cleanly manage the two existing, and two new output destination: logs, metrics and query profile and response trailer. A new LifecycleStats
class gathers the metrics-like information and emits it to the four destinations.
- The
pydruid
client, which is managed by the Druid project, should be extended to capture the profile, to allow easier debugging and to give users a way to exploit the profile.