-
Notifications
You must be signed in to change notification settings - Fork 0
Sequences and Yielders
Druid's query engine makes extensive use of functional programming (FP) ideas. The Sequence
and Yielder
classes are core to this pattern. (Interestingly, those these classes are in a guava
package in Druid, there is no equivalent classes in Guava itself. Perhaps they existed in some previous version? Or, are Druid extensions?)
The basic Druid query structure is defined by the QueryRunner
class:
public interface QueryRunner<T> {
Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext);
}
At first glance, this appears to accept a query and return a sequence of results. This is true, for some definition of "sequence of results." The Sequence
is not a List
: it is not a materialized collection of row-like objects. Instead, it is a contract to obtain row-like objects on demand.
It is easiest to understand this in relation to a traditional query engine. A planner produces a DAG make of of operators. Then, at run time, each operator has a next()
method that returns one or more rows. The tree of operators forms a data pipeline in which data flows from the leaf (scan operators which read data from some external source) to the leaf (which returns data to the client.) Along the way, the pipeline includes filters, sorts, merges, grouping and other data transforms.
The Druid engine kinda-sorta follows this same pattern. The QueryRunner.run()
method is really a QueryPlanner.plan()
method: it takes a query description and returns a mechanism to implement the query. Druid dislikes the terms "operator" and "DAG". Instead, the execution pipeline is a set of Sequences
, which perform the data transforms as side effects of fetching the next object.
The QueryRunner.run()
functions form a call tree of plan-time operations: the runners anonymous inner classes and lambdas to do the transforms, then wrap the transforms in a Sequence
. Not every run()
method creates a transform (some do other accounting-style tasks, or make decisions about which transforms to include.) However, the result of each transform which does occur is a Sequence
. Thus Druid's equivalent of an operator tree (DAG) is a tree of Sequence
s, along with anonymous bits of code to do the transforms.
This implementation follows the FP style of using stateless functions to perform operations instead of the stateful objects used in a traditional pipeline. This means that the operators are not reified (represented as objects). Instead, they are implicit in the the set of lambdas which implement a sequence.
In short, think of the set of sequences as functionally equivalent to the traditional operator DAG, but without having an identity, mutable state or explicit operator classes. That is, given a Sequence
we can't obtain any information about what it implements or what row it is on: that is all hidden in closures and lambdas.
Looking at the Sequence
class, you'll see no methods to actually get the next object (row or batch) because such methods would require mutable state. Instead, Druid provides a Yielder
: a specialized kind of iterator based on FP guidelines. Rather than having mutable state to identify the next row to return, each Yielder
points to one row, and creates a new Yielder
to point to the next row.
Thus:
-
QueryRunner.run()
either builds a new sequence to represent a transform, or rewrites the query, calls anotherQueryRunner.run()
method, and returns the result. Despite the name,run()
does not actually run the query, it just creates the code that will do so when we request the first row. -
Sequence
is a holder of code that can produce rows (actually, objects that often hold sets of rows), when used with an associated -
Yielder
which does the work to get the current and next row. It is theYielder
which actually runs a query as a byproduct of moving to the next value.
The key thing to remember is that there is no SortSequence
or ScanSequence
, there is only a Sequence
. The code which performs operations is implemented in anonymous inner classes and lambdas: it is not accessible for inspection and should not maintain mutable state. Of course, there is mutable state associated with a query (the current row, accumulated totals, etc.), so that state is provided by closures passed in from the QueryRunner.run()
method into the anonymous Sequence
or Yielder
classes.
The values managed by a Sequence
or returned by a Yielder
are given by type parameters to those classes. However this type information is generally erased when the objects are passed up the data pipeline and there is no way to recover it. Code just has to know the type of the object.
An interesting aspect of the "no side effects, no mutable state" goal of the FP approach is that many Druid classes don't use mutable variables. Instead, they use the AtomicInt
or some such so that the state variable can be final private final AtomicInt notReallyState
, but the immutable variable can still be mutated via the methods on the AtomicInt
. This is one of may "back door" state tricks seen throughout the code: it is stateless, but not really.
Another interesting pattern is that each Yielder
should return a new Yielder
on each call to next()
. But, Yielder
is Closeable
. Does this mean that we need to close()
each Yielder
? Before or after the call to next()
? As it turns out, we have to call close()
only on the final Yielder
: the one that returned null
from next()
:
yielder = ...
while (true) {
value = yielder.get()
// Do something with the value
newYielder = yielder.next();
if (newYielder == null) {
yielder.close();
break;
}
yielder = newYielder;
}
The top-level of a query wants to retrieve all the rows (or objects) from the query. This is done, say, to write the results to a stream to return from a REST request. Here's how that is done in, say SqlResource
:
Run the query (via a mechanism we'll skip over here) to get a Sequence
:
Sequence<?> results = // something that calls QueryRunner.run();
The code calls the output "results" though, as we saw above, the Sequence
is not (necessarily) the actual results, just a mechanism to obtain them when asked. Next, we need to get the Yielder
for the first row (row 0).
Yielder<Object[]> yielder0 = Yielders.each(results);
The Yielders.each()
method sounds like it visits each row. It actually creates a Yielder
for that first row:
public static <T> Yielder<T> each(final Sequence<T> sequence)
{
return sequence.toYielder(
null,
new YieldingAccumulator<T, T>()
{
@Override
public T accumulate(T accumulated, T in)
{
yield();
return in;
}
}
);
}
We now have to ask, what is a YieldingAccumulator
? It is a general way to accumulate a result. For example, we might want to sum the rows, or take their average. Here, we "accumulate" each row by returning the next row. The null
value is our initial value (which we don't care about). That value enters accumulate
as the accumulated
variable (which we ignore here). We return a new, updated accumulation which is just the current value.
The yield()
call is key: it says that the accumulation is done. (We're at the end of a group, say.) In this case, our "group" is every row, so we yield after every row. What does this mean? It means that that caller has to stop using this accumulator and return a Yielder
with the accumulated value.
Now, remember, the Sequence
is just a shell: it has no actual values. So, it needs a way to get some. This is done in BaseSequence
. In abbreviated form:
public <OutType> OutType accumulate(final OutType initValue, final Accumulator<OutType, T> fn)
{
IterType iterator = maker.make();
OutType accumulated = initValue;
while (iterator.hasNext()) {
accumulated = fn.accumulate(accumulated, iterator.next());
}
return accumulated;
}
The specific Sequence
class provides an iterator "maker" (factory) that can provide an iterator over the rows. We iterate over the rows acccumulating each one. When we run out of rows, we return the accumulated result. Clearly, this works only if our goal is to accumulate all rows. Actually, we're trying to iterate over them, which the above cannot do (there is no check to see if we "yielded".)
-
MappedSequence
is a sequence obtained by applying a function to each item in some base sequence.
Here is the stack of sequences obtained from running a fairly simple SQL query:
- MappedSequence
- ConcatSequence
- Mapped Sequence
- WrappingSequence
- BaseSequence
A SequenceWrapper
is some kind of wrapper around a sequence. Rather than creating a new sequence to add functionality, evidently we can wrap a sequence instead, such as replacing the base sequence's yielder with a different one. The wrapper also allows injecting code to run before and after the child sequence. This allows, for example, to measure the CPU time consumed by the base sequence. (Actually, because of the way the call stack works, this measures time up and down the sequence stack, including the CPU time needed to stream the output.)
The code of often uses Iterator
subclasses to step over records. These are often nested inside of sequences in a complicated way. The iterators themselves are often defined as anonymous inner classes. There are cases where a Yielder
wraps an iterator or visa-versa. Interesting, iterators do not attempt to be stateless: they do contain mutable variables (as they must.)
The Sequence
seems more of a placeholder than anything else. The action occurs in a Yielder
. But, since a Yielder
is stateless, it must be backed by an Iterator
. The iterator iterates over something, and that something is provided by the IteratorMaker
, which is often a lambda inside some other bit of code.
One explanation for the code patterns is that Druid plans the (native) query as it executes: each run()
function has to first figure out what it needs to do to run the query its's been given. This requires a number of indirections to figure out what to do. One lookup is the QueryToolChest
which provides some of the operations which the query needs.
While it is tempting to say that QueryRunner.run()
creates the structure of the query, while fetching from the Sequence
runs it, the actual code is not that simple. In some cases, the top-level call to QueryRunner.run()
only makes it partway down the query and returns a stack of sequences, the leaf-most version of which will invoke more QueryRunner.run()
calls when fetching the first result. Perhaps this is repeated, obtaining another sequence which will invoke yet another QueryRunner.run()
to provide its first event.
So it may be more accurate to say that each call to run()
creates some subtree of sequences, which, when fetched from, create additional levels of sequence tree, and so on.
When the above pattern holds, reading from a sequence will result in a wrapper creating an iterator maker which creates an iterator which runs a QueryRunner.run()
to start the next layer of query. (Very confusing.)
For query retries. an iterator runs the queries. If a query fails (due to missing segments) the iterator returns another sequence with the missing segments, and so on. (The use of an iterator to handle retry is rather unique.)
And, of course, at each level, each run()
method tends to rewrite the native query, adding some bit of context or other information. In this way, all QueryRunner
s run the "same" query, but that "same" query is modified as it works its way down the call stack.