-
Notifications
You must be signed in to change notification settings - Fork 0
Query Profile
A query profile is a collection of information about a single query used to diagnose performance issues for both developers and users. Query profiles are a standard feature of Drill, Impala (Cloudera version), Presto and other query tools. When aggregated, the information in the profile can show query access patterns needed to optimize performance of a cluster.
Each profile generally includes the query, the query plan and metrics about query execution. Metrics are unique to each product and reflect how the product operates internally. Our goal here is to design a query profile specifically for Druid.
See an example profile here.
Druid offers blindly fast performance, but only if the system is properly tuned for the data and queries. Historically, tuning Druid was more of an art than a science because we were missing critical information (what actually happens within a query). Instead, one had to observe performance, know what to look for in data and queries, know how Druid operates internally, and thus know which changes might be beneficial. Then, one could intuitively know which change to make to improve performance, try it, and repeat the cycle to verify performance, or seek further improvements. Though many experts developed the necessary skills to know what to look for, newer users are often left with trial and error since they've not yet developed a feel for how Druid works.
One way to address this issue is to make the performance drivers explicit: to show which areas within a query take time. One could use code-level solutions such as flame graphs, but our goal here is not to optimize the code, but rather the application. Knowing that function foo()
takes x% of the time might tell an expert that something is amiss, but it is unlikely to provide useful knowledge to the average user.
Similarly, measuring query time is useful to tell us where we stand, but just knowing query run time does not tell us why the query takes that amount of time, or what we can do about it.
Our premise here is that if we know, at an application level, why some part of a query takes time, then we are in a better position to make application-level changes that improves performance. For example, we want to know:
- How many segments where hit? Why? (Was a query phrased in a way that Druid must search all segments, or did it have a tight time range that let Druid select a small subset of segments?)
- Did the query make use of partitioning? Why or why not? (Ideally, most queries will have filters on the columns used for partitioning. If not, then perhaps the partitioning selection is not aligned with query usage.)
- How many rows were scanned? (If the query scans far more rows than those returned, then it is likely that the query uses filters which Druid cannot optimize. Perhaps a new column can be added that would allow index lookup instead of scanning rows.)
- How many rows were aggregated? (If the query scans many rows to aggregate down to a far smaller set of rows, then perhaps the aggregation should be done once at ingest time rather than for every query.)
- And so on. Once one realizes we can ask these questions, the community will suggest many more.
Such changes typically include:
- Revisions to the data structure: roll-up "grain", number of dimensions, number of tables, etc.
- Partitioning: a form of top-level indexing
- Segment sizes which influence the cost of full table scans
- Distribution (the number of data nodes), which allows us to do more work in parallel.
Experience with query profiles in other products has shown that detailed query-level information is essential to visualizing, understanding, and improving each of the above factors.
Query profiles in other products are the product of many years of effort: people use the profile to answer questions, realize something is missing, and modify the code to capture that information. We cannot hope to compress all that learning into this single project. So, we have more modest goals.
- Define the basic query profile structure, and gather metrics for some of the most basic query operations as a proof-of-concept.
- Provide a mechanism to transfer summary information about a query from the data node to the Broker, or from the Broker to the client. This builds on a "response trailer" mechanism which Gian prototyped.
- Since most clients won't know what to make of a query profile, provide a way for the Broker to persist profiles. This project will persist profiles on local disk, but will provide an extension point to add mechanisms to write profiles to a log collection system or other destination.
- A typical usage pattern is to run a SQL query from some favorite query tool, then inspect the results. So, this project will also offer a new REST call that can retrieve a profile, given a query ID, from the local profile storage.
We leave it later projects to add metrics for the wide range of query operations that Druid provides. We also defer to others to add analysis, visualization or other tools on top of the basic query profile mechanism.
Druid is a distributed system that offers queries as one of its services. As with any good REST API: each API call does one thing well. Thus, Druid has a collection of native queries, each of which does one kind of operation. Druid also offers a SQL layer that converts SQL queries into one of the native Druid queries.
A Druid query has two main parts. At the top level are a set of REST RPC calls that scatter the query across data nodes. Within each node, Druid executes queries using a unique functional design that combines query planning and execution in a set of (mostly) stateless functions.
A typical query profile uses the DAG structure to organize data. The DAG is made up of "operators" where each typically corresponds to one of the well-known "relational operators" from relational database theory: scan, filter, join, sort, union, etc. Since most query engines use such a DAG for query execution, the profile simply mimics the execution structure.
Since Druid is based on a functional programming (FP) model, not a DAG model, our first challenge is to decide how to structure the Druid query profile. Once choice is to mimic the Druid call structure, perhaps reporting statistics for each QueryRunner.run()
function. (A Druid query is simply a chain of calls to a large variety of these methods.) However, a closer inspection reveals that the QueryRunner.run()
method is more like a QueryPlanner.plan()
method: each builds up the actual execution mechanism. There are more runners than there are execution steps. The runners represent the specific way that Druid is coded, not an essential description of the steps required to execute the query.
Instead, we want to focus on cost drivers: the actual steps (transforms) that take time. In Druid, these are represented using a Guava Sequence
with a large number of additional components add via various ad-hoc mechanisms. As it turns out, each time Druid creates a new Sequence
more-or-less maps to a transform: the only reason to create a new sequence is to merge, filter, sort or otherwise transform data from the input sequence. Thus, Sequences
are closer to the level at which we want to gather detailed statistics.
Still, a Sequence
is a data delivery vehicle. Our interest is in the actual transform, which sits outside the sequence. In Druid, such transforms are not reified (that is, represented as objects). Instead, they are represented by a series of lambda functions created with the QueryRunner.run()
call but executed as the Sequence
is read. These transforms have no name in Driud, they are just bits of code. Yet, these are what we want to instrument, so we'll call them (virtual) operators.
By making this move, we are now in a position to leverage the operator structure typical of a query profile. The profile is thus a tree (DAG) of (virtual) operators, each of which represents a bit of code in Druid that creates or transforms a Sequence
as defined in some QueryRunner.run()
method. The unusual details of Druid's execution are abstracted into patterns that are familiar (and relevant) to a data engineer who knows how queries typically work in Drill, Impala, Presto/Trino, Spark, Flink, etc. One advantage of such a structure is that the DAG for a query is determined by the query, and is fairly stable, while the native query implementation is subject to constant revision to increase performance. Thus, a DAG structure allows the profile to change more slowly than the actual code.
The Drill page linked above shows a visualization of the DAG, and the metrics gathered for each node (operator) within the DAG. This project will not provide such a visualization. However, the visualization comes from a JSON-structured profile organized by operator as shown above. The goal here is to provide a similar profile for Druid.
The profile is structured at three levels: query, fragments and operators.
First, we have to understand a bit about how Druid queries operate in the large. A query arrives (typically) at a Broker (perhaps via the Router). If the query is SQL, the Broker translates the SQL into one or more native queries. If the query is already native, then we skip the SQL step. The Broker then determines the data nodes which hold data for that query, rewrites the query specifically for each node, and scatters the query to each data node. The data node then receives the query and does the necessary work. The key fact to notice is that the client could send a native query directly to a data node: there is no real difference between the internal query execution path for the broker and data nodes.
We map this unusual structure into typical distributed-system concepts. First, the point of entry into Druid is the (client) query: the query as sent from the client. This could be a SQL query, a native query sent to the Broker from a client, or a native query sent from the client to a specific data node. The query profile represents the "query" as a whole.
As explained above, Druid uses a scatter-gather pattern to execute a query. This is simply a simple version of the fragment structure typical of distributed engines. So, we use the term fragment to refer each scatter step. Thus a query (as defined here) consists of one or more fragments, each of which is represented by a rewritten native query executing on a data node. Note that, from the perspective of the data node, a query is a query: it does not matter if it is client query or a fragment (often called a "subquery" in Druid). The profile, however, does make this distinction: subquery is a fragment, a non-subquery is a client query.
Then, as explained earlier, we map Druid's functional programming based query engine into a set of operators, where each operator represents a point in the execution where data is read, or one sequence of results is transformed into another.
This gives the following structure:
- query (as submitted by the client)
- fragments (the scatter stage)
- operators (the transforms within the query)
- fragments (the scatter stage)
The actual structure is bit more subtle. The Broker does not simply issue a series of subqueries (fragments), it also combines them. As a result, the query is actually a set of Broker-side operators, one of which sends the query to a data node. Thus, the actual structure is:
- query (as submitted by the client)
- operators (the transforms within the query)
And, one of those operators is a "receiver" which handles one specific scatter/gather data node, so:
- receiver operator
- fragment (a subquery)
- operators
- fragment (a subquery)
For native queries, the query and the fragment are basically the same (since the code, for the most part, handles the two cases identically.) But, for a SQL query, we have additional steps such as planning. This gives the final structure:
- query (as submitted by the user)
- query-level information (SQL text, query plan, etc.)
- root fragment
- merge operators, etc.
- receiver
- fragment ...
- receiver
- merge operators, etc.
The profile presented to the user is JSON object that can be serialized into Java or other languages. The top-level object includes the following top-level fields:
- Query type: Native or SQL
- Query ID of the top-level query
- Status: succeeded or failed.
- Tables: list of tables which the query accesses
- Kind: Kind of data source
- Name: Name of the data source
- Dimension: list of dimension which the query accesses
- Measures: list of measures which the query accesses
- Intervals: list of intervals (rounded to segment boundaries) which the query accesses
- Root slice (see below)
- Attributes: open-ended map of key/value pairs to be used for any purpose and for information which does not fit above.
The other fields depend on whether the query is SQL or native. If SQL:
- SQL query text (if SQL)
- Has wildcard:
true
if the statement is of the formSELECT *
,false
if columns are listed explicitly. - Query plan
If native:
- Native query text (if native)
A distributed query plan consists of a set of logical operators that describe what is to be done. Then, a set of physical operators are distributed across the network to do the work. One or more physical operator is grouped in a fragment which executes on a node. Thus three an be many concurrent physical operators for each logical operator. In a full DAG-based engine, multiple exchanges shift data from one fragment to another. In Druid, the native query is roughly the equivalent of a fragment, and exchanges are limited to a single level scatter/gather process between data nodes (historical, peons) and the broker. Operators are implicit within the implementation of each native query. Here, we generalize Druid's structure into a DAG form.
The first level of structure for query metrics is the slice. For Druid a slice is either the "Jetty thread" in the broker, or a native query on a data node. For data nodes, a slice is implemented by a set of fragments: actually worker threads that do work.
A slice is called that because it contains a "slice" of the overall DAG. Again, in fully distributed system, multiple slices occur: data can flow across many exchanges during a single query. In Druid, there are two slices: the broker and a native query. Each slice contains a "sub-DAG" with its own slice-local root and leaf nodes. All slices have a Sender as its root. Broker slices have Receivers as its leaf, while data nodes have Scans at the leaf.
Slice fields include:
- Kind: root, native query kind
- Role: broker, data node
- Fragments: instances running on a node.
Fragment fields include:
- Host name: The host (and port) on which the fragment runs
- ID: the query ID assigned to the sub-query
- Status: succeeded or failed
- Time
- Rows
- Sender: the sender operator at the root of the fragment sub-DAG
The execution metrics present the query as a set of operators, defined below. The operators include:
- Kind: the kind of operator
- Rows: the number of rows produced by the operator
- Total Time: the wall-clock time for the operator and all its children
- Time: the wall-clock time for the one operator (total time minus the total time of any children)
The set of operators includes:
- Sender: the operator which sends results to the client (top-level query) or broker (data node queries)
- Child: the operator which sends data to the sender
- Merge: the operator which receives data from a subquery and merges it
- Children: the list of subqueries which send data to the merge node
- Segment metadata scan
- Segment read time
- Segment scan
- Open time: time to read the segment header, etc. (Details needed)
- Open bytes
- Blocks read
- Total scan time
- Total scan bytes
- Columns
- Name
- Bytes read
- I/O time (or total time to read, including decode)
(More TBD)
A "scan" operator is one which reads data from one of Druid's several data sources. Each kind of scan has its own unique characteristics.
Data nodes send results to the broker using the "response trailer" mechanism which Gian designed. That is, the data nodes gather metrics for a native query, and return them to the broker after the data portion of the response. The structure used within each data node is different than the above: it is an intermediate form more closely tied to the structure of the native query.
As per Gian's design, the data node returns the trailer only when requested. The broker (when profiles are enabled) will request the trailer. Druid clients can also optionally request a trailer. But, by default, no trailer is sent so that behavior is backward-compatible with existing clients who work with data nodes directly.
The broker manages each query and is extended to gather the response trailers from each data node. The information is merged with broker-level information to create the user form of the profile.
The profile is not returned with the query, doing so would add unwanted complexity to the many existing Druid clients. Further, most clients would not know what to do with the profile even if they received it.
Instead, the broker maintains a disk-based cache of profiles. At a high level:
- The broker plans the query, scatters subqueries, gathers results and trailers
- The broker sends data to the client, and gathers trailers into a "pre-profile" object
- After the last data byte is sent, before closing the connection, the broker places the "pre-profile" onto an internal queue
- The broker sends the final response to the client
- A worker thread picks up the pre-profile object, generates the user-level profile, and writes it to disk
- The worker may also prune old profiles, if configured to do so.
A new API, /v2/profile/<id>
, requests a profile, which is returned as JSON. The flow here is designed to avoid race conditions, especially if a client runs a query and immediately asks for its profile.
- First check the profile queue. If found in the queue, force (or wait for the completion of) generation of the user profile, and remove the entry from the queue.
- Check the disk cache for the profile.
- Copy the disk file contents to the client.
Note that this API is meant for short-term, immediate access to a query profile. In a production system, the broker should not store profiles for more than a brief time: minutes to hours. After that, to conserve resources, profiles are either deleted (because the probability of any single profile being requested drops exponentially with time), or are gathered and sent to some other system for aggregation or storage. Once the profile is removed, it is no longer available from the broker. It may be available from some other, third-party system.
This does create an awkward gap in coverage: during the time that the profile is removed from the broker, before it becomes available in that third-party system. The thought is that if a user wants to profile a single query, they will run the query, then immediately (with a few minutes) request the profile from the broker. Longer term, profiles are used in the aggregate, or to diagnose issues, and so the "gap" is less of an issue.
The profile queue is flushed asynchronously to ensure that that queries don't wait for the profile to be written to disk. As a result, profiles may be pending writes when the server exits. To ensure that no profiles are lost, the queue must flush all remaining profiles to disk on shutdown.
BrokerQueryResource
is the broker-specific handler for query requests. Extends QueryResource
. Use this as the lever to create broker-specific query profile handling. Have a profile handler in the base that does nothing. Replace it in broker with one that does the fancy stuff.
BrokerServerView
appears to be the place to hold the implementation of the query profile manager.
The Graceful
interface adds a shutdown feature to handlers. Implement this on BrokerQueryResource
to return a Future
which will flush the profile queue.
An existing QueryMetrics
mechanism exists, tailored to writing data into a metrics system. The profile system
should at least be aware of this mechanism, and might perhaps want to leverage it somehow. Perhaps the native query specific pre-profile can optionally hold the query metrics if no other information is available. This would be needed for custom extension queries.
The pre-profile should sit above query metrics to avoid changing the QueryMetrics
class. It can pull needed values from QueryMetrics
to avoid duplication. New values would be accumulated in a custom pre-profile. A default pre-profile would serve queries that don't yet have a custom version.
A typical QueryMetrics
implementation does not retain values: it just forwards it to be emitted. So, either we need some kind of shim to capture the values, or replace writes to QueryMetrics
with writes to the pre-profile instead, and have the pre-profile do the pass-through.
The place to create the wrapper would be DruidMetrics.makeRequestMetrics()
.
Maybe:
-
QueryMetricsProxy
- Wraps the metrics collector -
MetricsCollector
- top-level metrics interface. Replace uses ofQueryMetrics
with this one. -
QueryMetrics
- Query-specific instance as returned from theQueryToolchest
.
New code uses the collector, older code uses the proxy.
The above is only a partial solution. QueryMetrics
is a downward-facing class: it parses some query information itself, then calls its own methods. There is no good way to add a shim layer.
At the top, we create a new MetricsCollector
to gather the outside-focused metrics. QueryMetrics
then implements this new interface. A shim version gathers values for use in the profile.
Instead, we can create a MetricsTranslator
which pulls information from the default implementation.
- Get dimension values from
ServiceMetricEvent.Builder
. - Cache metric values.
More gotchas: the metrics object appears to be created late in query execution: QueryLifecycle.emitLogsAndMetrics()
, past the point of emitting the response trailer and past the point where most metrics could have been gathered.
Yet another option: refactor the lifecycle it to collect the (fixed set of) metrics prior to footer return, then emit them using the existing mechanism in a revised emitLogsAndMetrics()
. The gathered stats go into the pre-profile before returning the footer.
The ResponseContext
is used to gather profile data. The context is merged automatically. To retain specific values, we use a map in which each server and segment has an entry, and we merge maps to preserve individual keys. Thus we have:
ResponseContext["Fragment"] -> "Broker" | "DataNode"
ResponseContext["HostName"] -> <host name>
ResponseContext["PreProfile"] -> {<fragment>: {<host name>: {<segment>: <pre-profile>}}}
ResponseContext["NativeProfile"] -> <pre-profile>
The "Fragment"
and "HostName"
values are local-only, and are filled in as they become known. To speed up access within a single native query, we cache the native query pre-profile in "NativeProfile"
.
Each native query has an associated pre-profile object which gathers stats.
The response trailer is currently emitted in the QueryResource
class, in the inlined ResponseBuilder
to emit a trailer as a subset of fields from the ResponseContext
. A special field is added, metrics
, which holds query specific values.
It uses two classes: MultiQueryMetricsCollector
, and SingleQueryMetricsCollector
with a number of fixed fields. These two classes are added by Gian to collect metrics. The classes are Jackson-serialized to JSON. They are not yet widely used, however. Still, we can build on the idea.
The response context appears to have evolved over time. It probably started as a set of fields to return in the header. Later, additional internal-only fields were added. Now, we add trailer-only fields.
The response context operates in a scatter/gather pattern. Individual sub-queries (Druid has no good name for these) populate the context at the level of either multiple fragments or individual fragments. Then, higher levels gather and merge the individual contexts. Some of the merging is done within a process, some is done across processes.
When a server exchange occurs, the context can be truncated to fit a header size limit. When that is done, information is discarded. For the profile, we want to retain all information, so we must be careful when we pull information from the response context to avoid pulling truncated information.
As a result, the response context has become a bit of a kitchen sink and deserves another good refactoring. For now, we'll take a half-way step:
- Add
set
method for each of the known values. The set method do the mechanics of setting the value. - Change client code to use the set methods rather than mucking with keys.
- Extend the set methods to also set the query profile values where needed.
- Allow direct access to keys for extended values. Capture these values in the profile also as a map.
The above tried to piggy-back on QueryMetrics
or ResponseContext
-- primarily because they exist. There are issues.
-
QueryMetrics
is discussed above: it is too closely tied with the idea of emitting metrics to a metrics system and is not well suited to gathering metrics to build up a profile: the API is just too wrong for this task. -
ResponseContext
seemed a good choice. However, it is created outside of the query and so can't be query-specific. Also, it has no nuance: we can't tell which part of a query a metric is for.
Part of the problem is that queries are implemented in QueryRunner
to have no side effects: a query returns a Sequence
, but offers no additional information. It seems that the ResponseContext
was added to allow some additional information to leak out. That is:
Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext);
So, another solution is to change the QueryRunner
API to return additional information. One thought would be to return that information from the query runner itself:
Pair<Sequence<T>, Stats> run(QueryPlus<T> queryPlus);
For some class Stats
that gives us the stats we want.
One issue is that existing code expects to create the ResponseContext
. Another is that the query profile information will be available only
on query completion. So, another idea is to create an indirection:
class StatsHolder {
ResponseContext context;
Stats stats;
}
Sequence<T> run(QueryPlus<T> queryPlus, StatsHolder stats);
That is, the caller creates the stats holder, fills in the context, and passes that into the query. This works, but suggests an issue: the stats really are not valid until the end of the query: we'd have to promise not to look at the stats
field until the Sequence
reports EOF.
So, yet another approach is to explicitly get the stats at query completion:
Sequence<T> run(QueryPlus<T> queryPlus);
Stats gatherStats(ResponseContext context);
Or even:
Sequence<T> run(QueryPlus<T> queryPlus);
Stats stats();
interface Stats {
...
void populateResponse(ResponseContext context);
Profile profile();
In this case, the call to stats()
would not be valid except when the query is completed. Or, since it may be that a query can simply be abandoned, stats()
returns information as of that moment.
In this model, the internal query code writes to a stats collector which gathers information that will eventually be put into the profile and the response context.
To ease the transition, we create some shims:
public interface QueryRunner<T>
{
default Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext) {
Sequence<T> result = run(queryPlus);
stats().populate(responseContext)
return result;
}
default Sequence<T> run(QueryPlus<T> queryPlus)
{
return this.run(queryPlus, ResponseContext.createEmpty());
}
default Stats stats() ... ? Kinda stuck
}
Here we need more details. The query runner runs a query, but not really. It is the outer-most wrapper, and is nested. The actual query is run within QueryLifecycle.execute()
which calls QueryPlus.run()
which calls QueryRunner.run()
.
Clearly, one or the other methods must be overriden.
Druid has somewhat odd semantics: there seems to be no real implementation of a query as an operator that reads data, does work, and returns results. It seems that, instead, data is provided (somehow) to a toolchest, which creates a runner on the fly to things like merge. Either I still don't understand, or things kind of shift roles: nothing is the true "run a query". Or the true operator which knows what metrics it can meaningfully produce, nor is there a reified object that represents an operator. Operators are functions or lambdas that don't really have an identity and so no place to hold state such as stats.
The QueryPlus
might represent the whole query, but it might not know how that query is used, if there are more than one path to execute the query. Yet, we want to gather profile stats based on the execution path. Also, the QueryPlus
is rewritten during various operations, so is not sufficiently stable to act as a stats collection point.
So, it does seem that the QueryRunner
is the right place. Perhaps we treat a runner as an operator, and the QueryLifecycle
as the fragment.
Then we have a tree:
fragment: represents a QueryLifecycle
operator: represents a Runnable
operator: input. Can be a list for merge.
Operators can be leafs (scan), have multiple children (merge), or can host another fragment (subquery).
Trivial runners return their child's stats.
public interface QueryRunner<T>
{
Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext);
Stats stats();
}
A problem with the above is that the QueryRunner
seems to have been designed to be a functional concept with only one method. So, lots of code would have to change to add the stats()
method.
In the first pass, we keep the response context separate. It might be held on the stats object, but we don't know all the complex interactions at the moment, so leave it be temporarily.
QueryMetrics
at the moment are on the QueryPlus
:
final QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest);
final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();
In this case, we pass the query metrics down into the query by using the QueryPlus
. For the profile, we want to bubble metrics up: we want the runner which does the work to create the metrics, and to join it with any created by its children. For now, we'll leave the QueryMetrics
in place and add a new line that gathers profile metrics.
We can now separate two ideas. One is the class that holds metrics, which is specific to a runner (only? or are there lower level things?). The other is the idea of a collector to which code writes metrics. Some code may be used in multiple places so it needs an interface independent of the storage mechanism.
So, eventually we'd have:
-
MetricsCollector
- used by the code which produces metrics. -
MetricsCollectorImpl
- concrete representation, perhaps chained, which does something with the metrics: writes to metrics, to the profile, to the response context. -
Profile
a concrete class that gathers metrics for a specific operation.
Since the QueryPlus
kind of maintains state now, perhaps we hold part of the profile state there. The (awkward) flow might be:
QueryPlus<?> queryPlus = ...
QueryRunner<?> runner = ...
runner.run(queryPlus, responseContext);
Stats childStats = queryPlus.popStats();
Stats newStats = ...
newStats.child = childStats
queryPlus.pushStats(newStats);
If no stats were pushed, we'd return a default OpaqueOperator
object that says nothing is visible. Runners that do nothing "statsworthy" would leave the child node unchanged.
The top level QueryLifecycle
would pop the top-most operator to place as the root for that fragment.
Ugly. But, it might work... except that the comment for QueryPlus
says, "An immutable composite object". The stats would be mutable and would have to be put onto the correct version of the query. The rewrites happen before calling a QueryRunner
.
OK, so maybe go back to the earlier idea:
class QueryState {
ResponseContext context;
Stats stats;
}
Sequence<T> run(QueryPlus<T> queryPlus, QueryState state);
To ease the transition, we create some shims:
public interface QueryRunner<T>
{
public interface OperatorStats {
}
public class OpaqueOperator implements OperatorStats {
}
public class QueryState {
public final ResponseContext context;
public OperatorStats profile;
public QueryState(ResponseContext context) {
this.context = context;
}
}
default Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext) {
return run(queryPlus, new QueryState(responseContext));
}
default Sequence<T> run(QueryPlus<T> queryPlus, QueryState state) {
state.stats = new OpaqueOperator();
return run(queryPlus, state.reponseContext);
}
While the above is clean, it will result in the change of many, many implementations. Further, QueryRunner
is marked as @ExtensionPoint
, so changing the API will likely break third-party code which is a show-stopper.
Perhaps we can revive the earlier idea of placing the profile information on one of the objects passed into the runner. The QueryPlus
didn't work out, so let's consider the ResponseContext
instead.
The (awkward) flow might be:
QueryPlus<?> queryPlus = ...
QueryRunner<?> runner = ...
ResponseContext responseContext = ...
runner.run(queryPlus, responseContext);
Stats childStats = responseContext.popStats();
Stats newStats = ...
newStats.child = childStats
responseContext.pushStats(newStats);
If no stats were pushed, we'd return a default OpaqueOperator
object that says nothing is visible. Runners that do nothing "statsworthy" would leave the child node unchanged.
The top level QueryLifecycle
would pop the top-most operator to place as the root for that fragment.
Still ugly. But, since we've already greatly mucked about with the ResponseContext
with no harm done, this one might actually work.
Druid's query engine uses functional programming techniques heavily. At its core, a Druid query is a stack of stateless QueryRunner
functional interfaces. Each exists only long enough to perform its action, then disappears from the stack. In many places, the QueryRunner
is not even instantiated: it is simply used to create a lambda function which is immediately called.
That is, Druid's design is to be stateless in the query execution path. Our goal here is to gather state (per-operator metrics). Hence, at the core we're going against the grain of Druid's stateless, functional design. (We will set aside the discussion of whether the functional design is better than the traditional, stateful operator design.)
So, how do we capture state in a system that says that state is a bug, not a feature? The answer is that the code already has created chinks in the stateless armor:
- The
ResponseContext
is passed into eachQueryRunner
to provide query-wide state. We noted above how we can exploit this fact to build up a tree of (simulated) operator profiles. - In many other cases where Druid uses functions other than a
QueryRunner
, the function is defined as a lambda, and uses closures to pass state into the function from the surroundingQueryRunner.run()
function. An FP purist would frown on this design, but here we exploit it to pass a stateful operator profile object into a function though the closure "back door." - In other cases, functions are not within closures and are simply designed to be of the form
f(sequence) -> sequence
with no state. The answer here would seem to be to pass in a "holder" object that the function can use to set its internal profile object. This approach is necessary because the actual function is often passed along or looked up: multiple functions could be called. The function itself has to decide the form of the profile state it should capture. Thus, the function as to create the profile object. Since we can't change the return signature, nor use state or closures to capture the profile, we use the "holder" concept instead.
The result is admittedly ugly: it is a hack on top of a design that frowns on doing what we're trying to do. But, ugly or not, it should work.
- The client view of query execution cannot change. There may be many Druid clients in the wild; we can't break them. Thus, the response trailer must be omitted by default so that the return value continues to be an array of rows.
- The
pydruid
client, which is managed by the Druid project, should be extended to capture the profile, to allow easier debugging and to give users a way to exploit the profile. - Since most clients won't capture the profile, we need some other way to capture the information. We propose a pluggable interface with two options by default: 1) don't store (or create) profiles (the default), and 2) write the profiles to a designated local directory.
The query profile builds on earlier work to add a response trailer to a query. The response trailer changes the JSON result from an array of rows to an object which contains fields for the rows and the trailer. In a Druid cluster, any node can be upgraded in any order. (Verify.) So, we must handle the potential protocol issues.
One way would be to know the version of the node to which a message is sent, and only request a trailer if the node is of a version that understands trailers. However, since Druid is based on the HTTP protocol, there is no good way to learn the version ahead of time. (Is the version written into ZK when each node is registered?) Here, the software version is not helpful because the version string may differ for different vendors. Instead, we need the API version, something that Druid does not seem to support.
Another approach is for the trailer to be an option, specified using a new attribute, such as the proposed HTTP header. Old servers will never include this header, so new servers will not include the trailer. New servers will include the header, but old servers will ignore it. Thus, even if the new server requests a trailer, an old server will return an array (without a trailer). The new server must be prepared to get an array response even if it requests a trailer.
This pattern should allow for rolling upgrades with any combination of (new|old) → (new|old) versions.
We can expect subsequent releases to add detail to the profile as we gain experience. We may, for example, add a new foo
field, or introduce a new operator type, or perhaps remove a no-longer-needed bar
field. In all such cases, using JSON deserialization is problematic: receiver cannot return a changed structure unless it knows that the sender can deserialize that structure. In the extreme, this requires both the sender and receiver to be of the same version — or that the profile structure never changes.
To avoid this problem, we can specify that the sender (the node that receives the trailer) should make no assumptions about the structure of the profile: only that it is some form of JSON. This can be done if the JSON is deserialized as a generic map rather than as a specific Java object.
Such an approach is fine if we do not require any Druid server software to interpret the profile, only to create it. We leave interpretation to some other system: one which can handle changes in the profile details.