Skip to content

Sequences and Yielders

Paul Rogers edited this page Oct 16, 2021 · 2 revisions

Druid's query engine makes extensive use of functional programming (FP) ideas. The Sequence and Yielder classes are core to this pattern. (Interestingly, those these classes are in a guava package in Druid, there is no equivalent classes in Guava itself. Perhaps they existed in some previous version? Or, are Druid extensions?)

The basic Druid query structure is defined by the QueryRunner class:

public interface QueryRunner<T> {
  Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext);
}

At first glance, this appears to accept a query and return a sequence of results. This is true, for some definition of "sequence of results." The Sequence is not a List: it is not a materialized collection of row-like objects. Instead, it is a contract to obtain row-like objects on demand.

It is easiest to understand this in relation to a traditional query engine. A planner produces a DAG make of of operators. Then, at run time, each operator has a next() method that returns one or more rows. The tree of operators forms a data pipeline in which data flows from the leaf (scan operators which read data from some external source) to the leaf (which returns data to the client.) Along the way, the pipeline includes filters, sorts, merges, grouping and other data transforms.

The Druid engine kinda-sorta follows this same pattern. The QueryRunner.run() method is really a QueryPlanner.plan() method: it takes a query description and returns a mechanism to implement the query. Druid dislikes the terms "operator" and "DAG". Instead, the execution pipeline is a set of Sequences, which perform the data transforms as side effects of fetching the next object.

The QueryRunner.run() functions form a call tree of plan-time operations: the runners anonymous inner classes and lambdas to do the transforms, then wrap the transforms in a Sequence. Not every run() method creates a transform (some do other accounting-style tasks, or make decisions about which transforms to include.) However, the result of each transform which does occur is a Sequence. Thus Druid's equivalent of an operator tree (DAG) is a tree of Sequences, along with anonymous bits of code to do the transforms.

This implementation follows the FP style of using stateless functions to perform operations instead of the stateful objects used in a traditional pipeline. This means that the operators are not reified (represented as objects). Instead, they are implicit in the the set of lambdas which implement a sequence.

In short, think of the set of sequences as functionally equivalent to the traditional operator DAG, but without having an identity, mutable state or explicit operator classes. That is, given a Sequence we can't obtain any information about what it implements or what row it is on: that is all hidden in closures and lambdas.

Looking at the Sequence class, you'll see no methods to actually get the next object (row or batch) because such methods would require mutable state. Instead, Druid provides a Yielder: a specialized kind of iterator based on FP guidelines. Rather than having mutable state to identify the next row to return, each Yielder points to one row, and creates a new Yielder to point to the next row.

Thus:

  • QueryRunner.run() either builds a new sequence to represent a transform, or rewrites the query, calls another QueryRunner.run() method, and returns the result. Despite the name, run() does not actually run the query, it just creates the code that will do so when we request the first row.
  • Sequence is a holder of code that can produce rows (actually, objects that often hold sets of rows), when used with an associated
  • Yielder which does the work to get the current and next row. It is the Yielder which actually runs a query as a byproduct of moving to the next value.

The key thing to remember is that there is no SortSequence or ScanSequence, there is only a Sequence. The code which performs operations is implemented in anonymous inner classes and lambdas: it is not accessible for inspection and should not maintain mutable state. Of course, there is mutable state associated with a query (the current row, accumulated totals, etc.), so that state is provided by closures passed in from the QueryRunner.run() method into the anonymous Sequence or Yielder classes.

The values managed by a Sequence or returned by a Yielder are given by type parameters to those classes. However this type information is generally erased when the objects are passed up the data pipeline and there is no way to recover it. Code just has to know the type of the object.

An interesting aspect of the "no side effects, no mutable state" goal of the FP approach is that many Druid classes don't use mutable variables. Instead, they use the AtomicInt or some such so that the state variable can be final private final AtomicInt notReallyState, but the immutable variable can still be mutated via the methods on the AtomicInt. This is one of may "back door" state tricks seen throughout the code: it is stateless, but not really.

Iterate using a Yielder

Another interesting pattern is that each Yielder should return a new Yielder on each call to next(). But, Yielder is Closeable. Does this mean that we need to close() each Yielder? Before or after the call to next()? As it turns out, we have to call close() only on the final Yielder: the one that returned null from next():

  yielder = ...
  while (true) {
    value = yielder.get()
    // Do something with the value
    newYielder = yielder.next();
    if (newYielder == null) {
      yielder.close();
      break;
    }
    yielder = newYielder;
  }

Iterate over a Sequence

The top-level of a query wants to retrieve all the rows (or objects) from the query. This is done, say, to write the results to a stream to return from a REST request. Here's how that is done in, say SqlResource:

Run the query (via a mechanism we'll skip over here) to get a Sequence:

Sequence<?> results = // something that calls QueryRunner.run();

The code calls the output "results" though, as we saw above, the Sequence is not (necessarily) the actual results, just a mechanism to obtain them when asked. Next, we need to get the Yielder for the first row (row 0).

Yielder<Object[]> yielder0 = Yielders.each(results);

The Yielders.each() method sounds like it visits each row. It actually creates a Yielder for that first row:

  public static <T> Yielder<T> each(final Sequence<T> sequence)
  {
    return sequence.toYielder(
        null,
        new YieldingAccumulator<T, T>()
        {
          @Override
          public T accumulate(T accumulated, T in)
          {
            yield();
            return in;
          }
        }
    );
  }

We now have to ask, what is a YieldingAccumulator? It is a general way to accumulate a result. For example, we might want to sum the rows, or take their average. Here, we "accumulate" each row by returning the next row. The null value is our initial value (which we don't care about). That value enters accumulate as the accumulated variable (which we ignore here). We return a new, updated accumulation which is just the current value.

The yield() call is key: it says that the accumulation is done. (We're at the end of a group, say.) In this case, our "group" is every row, so we yield after every row. What does this mean? It means that that caller has to stop using this accumulator and return a Yielder with the accumulated value.

Now, remember, the Sequence is just a shell: it has no actual values. So, it needs a way to get some. This is done in BaseSequence. In abbreviated form:

  public <OutType> OutType accumulate(final OutType initValue, final Accumulator<OutType, T> fn)
  {
    IterType iterator = maker.make();
    OutType accumulated = initValue;

    while (iterator.hasNext()) {
      accumulated = fn.accumulate(accumulated, iterator.next());
    }
    return accumulated;
  }

The specific Sequence class provides an iterator "maker" (factory) that can provide an iterator over the rows. We iterate over the rows acccumulating each one. When we run out of rows, we return the accumulated result. Clearly, this works only if our goal is to accumulate all rows. Actually, we're trying to iterate over them, which the above cannot do (there is no check to see if we "yielded".)

Kinds of Sequences

  • MappedSequence is a sequence obtained by applying a function to each item in some base sequence.

Example Sequence Stack

Here is the stack of sequences obtained from running a fairly simple SQL query:

  • MappedSequence
  • ConcatSequence
  • Mapped Sequence
  • WrappingSequence
  • BaseSequence

Other Classes

A SequenceWrapper is some kind of wrapper around a sequence. Rather than creating a new sequence to add functionality, evidently we can wrap a sequence instead, such as replacing the base sequence's yielder with a different one. The wrapper also allows injecting code to run before and after the child sequence. This allows, for example, to measure the CPU time consumed by the base sequence. (Actually, because of the way the call stack works, this measures time up and down the sequence stack, including the CPU time needed to stream the output.)

The code of often uses Iterator subclasses to step over records. These are often nested inside of sequences in a complicated way. The iterators themselves are often defined as anonymous inner classes. There are cases where a Yielder wraps an iterator or visa-versa. Interesting, iterators do not attempt to be stateless: they do contain mutable variables (as they must.)

Patterns

The Sequence seems more of a placeholder than anything else. The action occurs in a Yielder. But, since a Yielder is stateless, it must be backed by an Iterator. The iterator iterates over something, and that something is provided by the IteratorMaker, which is often a lambda inside some other bit of code.

One explanation for the code patterns is that Druid plans the (native) query as it executes: each run() function has to first figure out what it needs to do to run the query its's been given. This requires a number of indirections to figure out what to do. One lookup is the QueryToolChest which provides some of the operations which the query needs.

While it is tempting to say that QueryRunner.run() creates the structure of the query, while fetching from the Sequence runs it, the actual code is not that simple. In some cases, the top-level call to QueryRunner.run() only makes it partway down the query and returns a stack of sequences, the leaf-most version of which will invoke more QueryRunner.run() calls when fetching the first result. Perhaps this is repeated, obtaining another sequence which will invoke yet another QueryRunner.run() to provide its first event.

So it may be more accurate to say that each call to run() creates some subtree of sequences, which, when fetched from, create additional levels of sequence tree, and so on.

When the above pattern holds, reading from a sequence will result in a wrapper creating an iterator maker which creates an iterator which runs a QueryRunner.run() to start the next layer of query. (Very confusing.)

For query retries. an iterator runs the queries. If a query fails (due to missing segments) the iterator returns another sequence with the missing segments, and so on. (The use of an iterator to handle retry is rather unique.)

And, of course, at each level, each run() method tends to rewrite the native query, adding some bit of context or other information. In this way, all QueryRunners run the "same" query, but that "same" query is modified as it works its way down the call stack.

Clone this wiki locally