Skip to content

Query Profile

Paul Rogers edited this page Oct 5, 2021 · 21 revisions

A query profile is a collection of information about a single query used to diagnose performance issues for both developers and users. Query profiles are a standard feature of Drill, Impala (Cloudera version), Presto and other query tools. When aggregated, the information in the profile can show query access patterns needed to optimize performance of a cluster.

Each profile generally includes the query, the query plan and metrics about query execution. Metrics are unique to each product and reflect how the product operates internally. Our goal here is to design a query profile specifically for Druid.

Goals

Druid offers blindly fast performance, but only if the system is properly tuned for the data and queries. Historically, tuning Druid was more of an art than a science because one had to observe performance, know what to look for in data and queries, know how Druid operates internally, and thus know which changes might be beneficial. Though many experts developed the necessary skills, newer users are often left with trial and error since they've not yet developed a feel for how Druid works.

One way to address this issue is to make the performance drivers explicit: to show the areas that take time. One could use code-level solutions such as flame graphs, but our goal here is not to optimize the code, but rather the application. Knowing that function foo() takes x% of the time might tell an expert that something is amiss, but it is unlikely to provide useful knowledge to the average user.

Similarly, measuring query time is useful to tell us where we stand, but just knowing query run time does not tell us why the query takes that amount of time, or what we can do about it.

Our premise here is that if we know, at an application level, why some part of a query takes time, then we are in a better position to make application-level changes that improves performance. Such changes typically include:

  • Revisions to the data structure: roll-up "grain", number of dimensions, number of tables, etc.
  • Partitioning: a form of top-level indexing
  • Segment sizes which influence the cost of full table scans
  • Distribution (the number of data nodes), which allows us to do more work in parallel.

Experience with query profiles in other products has shown that detailed query-level information is essential to visualizing, understanding, and improving each of the above factors.

The goal here is simply to gather the required information and make it available via a new REST call. We leave it to others to add analysis, visualization or other tools on top of the basic query profile mechanism.

Profile Design

Druid is a distributed system that offers queries as one of its services. As with any good REST API: each API call does one thing well. Thus, Druid has a collection of native queries, each of which does one kind of operation. Druid also offers a SQL layer that converts SQL queries into one of the native Druid queries. As a result, Druid does not use the classic "DAG"-based query execution model, but rather a series of RPC calls, each of which does an entire query, but with a fixed set of query-related operations.

A typical query profile uses the DAG structure to organize data. The Drill page linked above shows a visualization of the DAG, and the metrics gathered for each node (operator) within the DAG. Since Druid has a different model, the profile will be structured differently. Though Druid does not make a query DAG explicit, the Druid code implicitly operates as a set of steps which form a kind of "hard-coded DAG." For this reason, we will gather metrics according to the native query structure, but present them in the query profile in the form of a DAG. One advantage of such a structure is that the DAG for a query is determined by the query, and is fairly stable, while the native query implementation is subject to constant revision to increase performance. Thus, a DAG structure allows the profile to change more slowly than the actual code.

User-Visible Profile

The profile presented to the user is JSON object that can be serialized into Java or other languages. The top-level object includes the following top-level fields:

  • Query type: Native or SQL
  • Query ID of the top-level query
  • Status: succeeded or failed.
  • Tables: list of tables which the query accesses
    • Kind: Kind of data source
    • Name: Name of the data source
    • Dimension: list of dimension which the query accesses
    • Measures: list of measures which the query accesses
    • Intervals: list of intervals (rounded to segment boundaries) which the query accesses
  • Root slice (see below)
  • Attributes: open-ended map of key/value pairs to be used for any purpose and for information which does not fit above.

The other fields depend on whether the query is SQL or native. If SQL:

  • SQL query text (if SQL)
  • Has wildcard: true if the statement is of the form SELECT *, false if columns are listed explicitly.
  • Query plan

If native:

  • Native query text (if native)

Fragments

A distributed query plan consists of a set of logical operators that describe what is to be done. Then, a set of physical operators are distributed across the network to do the work. One or more physical operator is grouped in a fragment which executes on a node. Thus three an be many concurrent physical operators for each logical operator. In a full DAG-based engine, multiple exchanges shift data from one fragment to another. In Druid, the native query is roughly the equivalent of a fragment, and exchanges are limited to a single level scatter/gather process between data nodes (historical, peons) and the broker. Operators are implicit within the implementation of each native query. Here, we generalize Druid's structure into a DAG form.

The first level of structure for query metrics is the slice. For Druid a slice is either the "Jetty thread" in the broker, or a native query on a data node. For data nodes, a slice is implemented by a set of fragments: actually worker threads that do work.

A slice is called that because it contains a "slice" of the overall DAG. Again, in fully distributed system, multiple slices occur: data can flow across many exchanges during a single query. In Druid, there are two slices: the broker and a native query. Each slice contains a "sub-DAG" with its own slice-local root and leaf nodes. All slices have a Sender as its root. Broker slices have Receivers as its leaf, while data nodes have Scans at the leaf.

Slice fields include:

  • Kind: root, native query kind
  • Role: broker, data node
  • Fragments: instances running on a node.

Fragment fields include:

  • Host name: The host (and port) on which the fragment runs
  • ID: the query ID assigned to the sub-query
  • Status: succeeded or failed
  • Time
  • Rows
  • Sender: the sender operator at the root of the fragment sub-DAG

Operators

The execution metrics present the query as a set of operators, defined below. The operators include:

  • Kind: the kind of operator
  • Rows: the number of rows produced by the operator
  • Total Time: the wall-clock time for the operator and all its children
  • Time: the wall-clock time for the one operator (total time minus the total time of any children)

The set of operators includes:

  • Sender: the operator which sends results to the client (top-level query) or broker (data node queries)
    • Child: the operator which sends data to the sender
  • Merge: the operator which receives data from a subquery and merges it
    • Children: the list of subqueries which send data to the merge node
  • Segment metadata scan
    • Segment read time
  • Segment scan
    • Open time: time to read the segment header, etc. (Details needed)
    • Open bytes
    • Blocks read
    • Total scan time
    • Total scan bytes
    • Columns
      • Name
      • Bytes read
      • I/O time (or total time to read, including decode)

(More TBD)

A "scan" operator is one which reads data from one of Druid's several data sources. Each kind of scan has its own unique characteristics.

Data Nodes

Data nodes send results to the broker using the "response trailer" mechanism which Gian designed. That is, the data nodes gather metrics for a native query, and return them to the broker after the data portion of the response. The structure used within each data node is different than the above: it is an intermediate form more closely tied to the structure of the native query.

As per Gian's design, the data node returns the trailer only when requested. The broker (when profiles are enabled) will request the trailer. Druid clients can also optionally request a trailer. But, by default, no trailer is sent so that behavior is backward-compatible with existing clients who work with data nodes directly.

Broker

The broker manages each query and is extended to gather the response trailers from each data node. The information is merged with broker-level information to create the user form of the profile.

The profile is not returned with the query, doing so would add unwanted complexity to the many existing Druid clients. Further, most clients would not know what to do with the profile even if they received it.

Instead, the broker maintains a disk-based cache of profiles. At a high level:

  • The broker plans the query, scatters subqueries, gathers results and trailers
  • The broker sends data to the client, and gathers trailers into a "pre-profile" object
  • After the last data byte is sent, before closing the connection, the broker places the "pre-profile" onto an internal queue
  • The broker sends the final response to the client
  • A worker thread picks up the pre-profile object, generates the user-level profile, and writes it to disk
  • The worker may also prune old profiles, if configured to do so.

Profile REST API

A new API, /v2/profile/<id>, requests a profile, which is returned as JSON. The flow here is designed to avoid race conditions, especially if a client runs a query and immediately asks for its profile.

  • First check the profile queue. If found in the queue, force (or wait for the completion of) generation of the user profile, and remove the entry from the queue.
  • Check the disk cache for the profile.
  • Copy the disk file contents to the client.

Operation at Scale

Note that this API is meant for short-term, immediate access to a query profile. In a production system, the broker should not store profiles for more than a brief time: minutes to hours. After that, to conserve resources, profiles are either deleted (because the probability of any single profile being requested drops exponentially with time), or are gathered and sent to some other system for aggregation or storage. Once the profile is removed, it is no longer available from the broker. It may be available from some other, third-party system.

This does create an awkward gap in coverage: during the time that the profile is removed from the broker, before it becomes available in that third-party system. The thought is that if a user wants to profile a single query, they will run the query, then immediately (with a few minutes) request the profile from the broker. Longer term, profiles are used in the aggregate, or to diagnose issues, and so the "gap" is less of an issue.

Shutdown

The profile queue is flushed asynchronously to ensure that that queries don't wait for the profile to be written to disk. As a result, profiles may be pending writes when the server exits. To ensure that no profiles are lost, the queue must flush all remaining profiles to disk on shutdown.

Implementation Notes

BrokerQueryResource is the broker-specific handler for query requests. Extends QueryResource. Use this as the lever to create broker-specific query profile handling. Have a profile handler in the base that does nothing. Replace it in broker with one that does the fancy stuff.

BrokerServerView appears to be the place to hold the implementation of the query profile manager.

The Graceful interface adds a shutdown feature to handlers. Implement this on BrokerQueryResource to return a Future which will flush the profile queue.

QueryMetrics

An existing QueryMetrics mechanism exists, tailored to writing data into a metrics system. The profile system should at least be aware of this mechanism, and might perhaps want to leverage it somehow. Perhaps the native query specific pre-profile can optionally hold the query metrics if no other information is available. This would be needed for custom extension queries.

The pre-profile should sit above query metrics to avoid changing the QueryMetrics class. It can pull needed values from QueryMetrics to avoid duplication. New values would be accumulated in a custom pre-profile. A default pre-profile would serve queries that don't yet have a custom version.

A typical QueryMetrics implementation does not retain values: it just forwards it to be emitted. So, either we need some kind of shim to capture the values, or replace writes to QueryMetrics with writes to the pre-profile instead, and have the pre-profile do the pass-through.

The place to create the wrapper would be DruidMetrics.makeRequestMetrics().

Maybe:

  • QueryMetricsProxy - Wraps the metrics collector
  • MetricsCollector - top-level metrics interface. Replace uses of QueryMetrics with this one.
  • QueryMetrics - Query-specific instance as returned from the QueryToolchest.

New code uses the collector, older code uses the proxy.

The above is only a partial solution. QueryMetrics is a downward-facing class: it parses some query information itself, then calls its own methods. There is no good way to add a shim layer.

At the top, we create a new MetricsCollector to gather the outside-focused metrics. QueryMetrics then implements this new interface. A shim version gathers values for use in the profile.

Instead, we can create a MetricsTranslator which pulls information from the default implementation.

  • Get dimension values from ServiceMetricEvent.Builder.
  • Cache metric values.

More gotchas: the metrics object appears to be created late in query execution: QueryLifecycle.emitLogsAndMetrics(), past the point of emitting the response trailer and past the point where most metrics could have been gathered.

Yet another option: refactor the lifecycle it to collect the (fixed set of) metrics prior to footer return, then emit them using the existing mechanism in a revised emitLogsAndMetrics(). The gathered stats go into the pre-profile before returning the footer.

Pre-Profile

The ResponseContext is used to gather profile data. The context is merged automatically. To retain specific values, we use a map in which each server and segment has an entry, and we merge maps to preserve individual keys. Thus we have:

ResponseContext["Fragment"] -> "Broker" | "DataNode"
ResponseContext["HostName"] -> <host name>
ResponseContext["PreProfile"] -> {<fragment>: {<host name>: {<segment>: <pre-profile>}}}
ResponseContext["NativeProfile"] -> <pre-profile>

The "Fragment" and "HostName" values are local-only, and are filled in as they become known. To speed up access within a single native query, we cache the native query pre-profile in "NativeProfile".

Each native query has an associated pre-profile object which gathers stats.

Response Trailer

The response trailer is currently emitted in the QueryResource class, in the inlined ResponseBuilder to emit a trailer as a subset of fields from the ResponseContext. A special field is added, metrics, which holds query specific values.

It uses two classes: MultiQueryMetricsCollector, and SingleQueryMetricsCollector with a number of fixed fields. These two classes are added by Gian to collect metrics. The classes are Jackson-serialized to JSON. They are not yet widely used, however. Still, we can build on the idea.

ResponseContext

The response context appears to have evolved over time. It probably started as a set of fields to return in the header. Later, additional internal-only fields were added. Now, we add trailer-only fields.

The response context operates in a scatter/gather pattern. Individual sub-queries (Druid has no good name for these) populate the context at the level of either multiple fragments or individual fragments. Then, higher levels gather and merge the individual contexts. Some of the merging is done within a process, some is done across processes.

When a server exchange occurs, the context can be truncated to fit a header size limit. When that is done, information is discarded. For the profile, we want to retain all information, so we must be careful when we pull information from the response context to avoid pulling truncated information.

As a result, the response context has become a bit of a kitchen sink and deserves another good refactoring. For now, we'll take a half-way step:

  • Add set method for each of the known values. The set method do the mechanics of setting the value.
  • Change client code to use the set methods rather than mucking with keys.
  • Extend the set methods to also set the query profile values where needed.
  • Allow direct access to keys for extended values. Capture these values in the profile also as a map.

And Now For Something Completely Different...

The above tried to piggy-back on QueryMetrics or ResponseContext -- primarily because they exist. There are issues.

  • QueryMetrics is discussed above: it is too closely tied with the idea of emitting metrics to a metrics system and is not well suited to gathering metrics to build up a profile: the API is just too wrong for this task.
  • ResponseContext seemed a good choice. However, it is created outside of the query and so can't be query-specific. Also, it has no nuance: we can't tell which part of a query a metric is for.

Part of the problem is that queries are implemented in QueryRunner to have no side effects: a query returns a Sequence, but offers no additional information. It seems that the ResponseContext was added to allow some additional information to leak out. That is:

  Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext);

So, another solution is to change the QueryRunner API to return additional information. One thought would be to return that information from the query runner itself:

  Pair<Sequence<T>, Stats> run(QueryPlus<T> queryPlus);

For some class Stats that gives us the stats we want.

One issue is that existing code expects to create the ResponseContext. Another is that the query profile information will be available only on query completion. So, another idea is to create an indirection:

class StatsHolder {
  ResponseContext context;
  Stats stats;
}

  Sequence<T> run(QueryPlus<T> queryPlus, StatsHolder stats);

That is, the caller creates the stats holder, fills in the context, and passes that into the query. This works, but suggests an issue: the stats really are not valid until the end of the query: we'd have to promise not to look at the stats field until the Sequence reports EOF.

So, yet another approach is to explicitly get the stats at query completion:

  Sequence<T> run(QueryPlus<T> queryPlus);
  Stats gatherStats(ResponseContext context);

Or even:

  Sequence<T> run(QueryPlus<T> queryPlus);
  Stats stats();

interface Stats {
   ...
   void populateResponse(ResponseContext context);
   Profile profile();

In this case, the call to stats() would not be valid except when the query is completed. Or, since it may be that a query can simply be abandoned, stats() returns information as of that moment.

In this model, the internal query code writes to a stats collector which gathers information that will eventually be put into the profile and the response context.

To ease the transition, we create some shims:

public interface QueryRunner<T>
{
  default Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext) {
    Sequence<T> result = run(queryPlus);
    stats().populate(responseContext)
    return result;
  }

  default Sequence<T> run(QueryPlus<T> queryPlus)
  {
    return this.run(queryPlus, ResponseContext.createEmpty());
  }

  default Stats stats() ... ? Kinda stuck
}

Here we need more details. The query runner runs a query, but not really. It is the outer-most wrapper, and is nested. The actual query is run within QueryLifecycle.execute() which calls QueryPlus.run() which calls QueryRunner.run().

Clearly, one or the other methods must be overriden.

Take 2

Druid has somewhat odd semantics: there seems to be no real implementation of a query as an operator that reads data, does work, and returns results. It seems that, instead, data is provided (somehow) to a toolchest, which creates a runner on the fly to things like merge. Either I still don't understand, or things kind of shift roles: nothing is the true "run a query". Or the true operator which knows what metrics it can meaningfully produce, nor is there a reified object that represents an operator. Operators are functions or lambdas that don't really have an identity and so no place to hold state such as stats.

The QueryPlus might represent the whole query, but it might not know how that query is used, if there are more than one path to execute the query. Yet, we want to gather profile stats based on the execution path. Also, the QueryPlus is rewritten during various operations, so is not sufficiently stable to act as a stats collection point.

So, it does seem that the QueryRunner is the right place. Perhaps we treat a runner as an operator, and the QueryLifecycle as the fragment.

Then we have a tree:

fragment: represents a QueryLifecycle
  operator: represents a Runnable
    operator: input. Can be a list for merge.

Operators can be leafs (scan), have multiple children (merge), or can host another fragment (subquery).

Trivial runners return their child's stats.

public interface QueryRunner<T>
{
  Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext);

  Stats stats();
}

A problem with the above is that the QueryRunner seems to have been designed to be a functional concept with only one method. So, lots of code would have to change to add the stats() method.

In the first pass, we keep the response context separate. It might be held on the stats object, but we don't know all the complex interactions at the moment, so leave it be temporarily.

QueryMetrics at the moment are on the QueryPlus:

    final QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest);
    final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();

In this case, we pass the query metrics down into the query by using the QueryPlus. For the profile, we want to bubble metrics up: we want the runner which does the work to create the metrics, and to join it with any created by its children. For now, we'll leave the QueryMetrics in place and add a new line that gathers profile metrics.

We can now separate two ideas. One is the class that holds metrics, which is specific to a runner (only? or are there lower level things?). The other is the idea of a collector to which code writes metrics. Some code may be used in multiple places so it needs an interface independent of the storage mechanism.

So, eventually we'd have:

  • MetricsCollector - used by the code which produces metrics.
  • MetricsCollectorImpl - concrete representation, perhaps chained, which does something with the metrics: writes to metrics, to the profile, to the response context.
  • Profile a concrete class that gathers metrics for a specific operation.

Since the QueryPlus kind of maintains state now, perhaps we hold part of the profile state there. The (awkward) flow might be:

  QueryPlus<?> queryPlus = ...
  QueryRunner<?> runner = ...
  runner.run(queryPlus, responseContext);
  Stats childStats = queryPlus.popStats();
  Stats newStats = ...
  newStats.child = childStats
  queryPlus.pushStats(newStats);

If no stats were pushed, we'd return a default OpaqueOperator object that says nothing is visible. Runners that do nothing "statsworthy" would leave the child node unchanged.

The top level QueryLifecycle would pop the top-most operator to place as the root for that fragment.

Ugly. But, it might work... except that the comment for QueryPlus says, "An immutable composite object". The stats would be mutable and would have to be put onto the correct version of the query. The rewrites happen before calling a QueryRunner.

Take 3

OK, so maybe go back to the earlier idea:

class QueryState {
  ResponseContext context;
  Stats stats;
}

  Sequence<T> run(QueryPlus<T> queryPlus, QueryState state);

To ease the transition, we create some shims:

public interface QueryRunner<T>
{
  public interface OperatorStats {
    
  }
  
  public class OpaqueOperator implements OperatorStats {
    
  }
  
  public class QueryState {
    public final ResponseContext context;
    public OperatorStats profile;
    
    public QueryState(ResponseContext context) {
      this.context = context;
    }
  }

  default Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext) {
    return run(queryPlus, new QueryState(responseContext));
  }

  default Sequence<T> run(QueryPlus<T> queryPlus, QueryState state) {
    state.stats = new OpaqueOperator();
    return run(queryPlus, state.reponseContext);
  }

While the above is clean, it will result in the change of many, many implementations. Further, QueryRunner is marked as @ExtensionPoint, so changing the API will likely break third-party code which is a show-stopper.

Take 4

Perhaps we can revive the earlier idea of placing the profile information on one of the objects passed into the runner. The QueryPlus didn't work out, so let's consider the ResponseContext instead.

The (awkward) flow might be:

  QueryPlus<?> queryPlus = ...
  QueryRunner<?> runner = ...
  ResponseContext responseContext = ...
  runner.run(queryPlus, responseContext);
  Stats childStats = responseContext.popStats();
  Stats newStats = ...
  newStats.child = childStats
  responseContext.pushStats(newStats);

If no stats were pushed, we'd return a default OpaqueOperator object that says nothing is visible. Runners that do nothing "statsworthy" would leave the child node unchanged.

The top level QueryLifecycle would pop the top-most operator to place as the root for that fragment.

Still ugly. But, since we've already greatly mucked about with the ResponseContext with no harm done, this one might actually work.

Open Issues

Upgrade Considerations

Initial Profile Release

The query profile builds on earlier work to add a response trailer to a query. The response trailer changes the JSON result from an array of rows to an object which contains fields for the rows and the trailer. In a Druid cluster, any node can be upgraded in any order. (Verify.) So, we must handle the potential protocol issues.

One way would be to know the version of the node to which a message is sent, and only request a trailer if the node is of a version that understands trailers. However, since Druid is based on the HTTP protocol, there is no good way to learn the version ahead of time. (Is the version written into ZK when each node is registered?) Here, the software version is not helpful because the version string may differ for different vendors. Instead, we need the API version, something that Druid does not seem to support.

Another approach is for the trailer to be an option, specified using a new attribute, such as the proposed HTTP header. Old servers will never include this header, so new servers will not include the trailer. New servers will include the header, but old servers will ignore it. Thus, even if the new server requests a trailer, an old server will return an array (without a trailer). The new server must be prepared to get an array response even if it requests a trailer.

This pattern should allow for rolling upgrades with any combination of (new|old) → (new|old) versions.

Subsequent Releases

We can expect subsequent releases to add detail to the profile as we gain experience. We may, for example, add a new foo field, or introduce a new operator type, or perhaps remove a no-longer-needed bar field. In all such cases, using JSON deserialization is problematic: receiver cannot return a changed structure unless it knows that the sender can deserialize that structure. In the extreme, this requires both the sender and receiver to be of the same version — or that the profile structure never changes.

To avoid this problem, we can specify that the sender (the node that receives the trailer) should make no assumptions about the structure of the profile: only that it is some form of JSON. This can be done if the JSON is deserialized as a generic map rather than as a specific Java object.

Such an approach is fine if we do not require any Druid server software to interpret the profile, only to create it. We leave interpretation to some other system: one which can handle changes in the profile details.