-
Notifications
You must be signed in to change notification settings - Fork 0
Operators Component Details
See TimeseriesQueryEngine
.
- One Guice-created engine per server.
- Uses a buffer pool.
- Operates on a single segment.
-
process()
is really "plan" and should appear in the planner. - Handles vectorized and non-vectorized cases. Create these as separate operators.
Non-vectorized:
- Uses
QueryRunnerHelper
to make the cursors, then map cursors to functions (Alter the scan query operators to follow) -
Sequences.filter
acts as a sequence filter. Equiv is theFilteringOperator
. But here, it just ignores null values. - Each cursor:
- Create aggregators
- Aggregate rows using aggregators
- Convert aggregators to a time series result
- Close the aggregators
For vectorized:
- Makes one vectorized cursor
- Uses
VectorCursorGranularizer
to ???
Cursors
- The cursor does not deliver rows, instead it uses selectors to write values to some target location.
- Cursors handle filtering, interval filtering, virtual (computed) columns, granularity and asc/desc order (ordered by what?)
- Regular cursors can produce multiple (in theory, but not in practice?), vector cursors produce only one.
CombiningAccumulator
handles values for a single group.
-
Comparator<T> ordering
determines when two values are the same. -
BinaryOperator<T> mergeFn
merges values:-
f(x, null)
accepts the first value -
f(x, y)
combines values, where x is the result of a previous accumulation and y is the new value.
-
It seems that an Accumulator
is, essentially, a collection builder: it gathers items into an output (which may be a list, etc.)
When used as a CombiningAccumulator
, it creates a list of like values, with the values combined.
- "accumulate" - gather values
- "combine" - function to combine two values into one value
Data coming into a agg is grouped by segment? So that "by segment" simply leaves the grouping?
Row
is a representation of a row of data.
RowAdapters
is a wrapper around a row with limited functionality: returns functions to get the timestamp and a column value.
MapBasedRow
is a row comprised of a timestamp and a map.
TimeseriesResultBuilder
is a builder for a (timestamp, event-map) row, represented as a Result
over a TimeseriesResultValue
Result
is another form of (timestamp, value) object.
ColumnSelectorFactory
factory for column selectors.
ColumnValueSelector
provides access methods (and conversions) for all Druid data types.
RowBasedColumnSelectorFactory
creates column selectors for a Row
given a column name.
Results appear to contain only a timestamp and aggregator values. (No dimensions per group?)
TimeseriesQueryQueryToolChest.mergeResults
:
Base sequence values: TimeseriesResultValue
which is a wrapper over a map. (Note: would be more efficient to store data as a tuple.)
Case 1: Overall total (with nuances for skip empty buckets, by-segment): get single value or create a default value.
AggregatorFactory
opinionated factory for aggregations. This probably wants to be refactored, but will be hard because of the sheer number of items, and that that is an extension point. The factory seems to know its column name; thus it is a (type, column) pair. Should be split.
- accumulate(prev-total, cur-row) -> new-total
- mergefn.(x, null) - initializes the accumulator
- mergeFn.(tot, row) -> new-tot
Random requirements:
- By-segment
To convert merge:
- Retain the nested query runner for now
- Top flow
- Toolchest returns a query runner that will do the work
- That query runner implements the runtime flow
- Runtime flow
- Base runner of type
QueryRunner<Result<TimeseriesResultValue>>
- Aggregator
- Limit (if needed)
- Top aggregator
- Handle empty input (with conditions, see above)
- Add grand total, if requested
- Base runner of type
A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor.
- Returns a sequence of ...?
- Based on an iterator of futures obtained from
- Submitting a runner task to a processing pool
- The runner itself converts the results of the child runner to a list and returns that as an iterable.
- Merges all the iterables (i.e., lists) using
MergeIterable
which waits on the futures
Also handles cancellation of sub-tasks if any task fails.
- Each item essentially becomes a new fragment, and must be managed as such.
- We then need another layer to represent a DAG of fragments. Must change the top-level classes to match.
- Processing pools accept
PrioritizedQueryRunnerCallable
, but we probably only wantPrioritizedCallable
. Add new methods? Seems benign. Change method?QueryProcessingPool
is an extension point. Maybe create a default that uses the old form somehow. - To start, materialize the nested fragment to a list. Later, change this to a producer/consumer queue (which needs an entirely different execution framework.)
Sketch:
- Use
Operators.toList()
to create the materialized list. - Create a new
ListReader
to read from the list. (Done) - Create a
SimpleCallable
that extendsAbstractPrioritizedQueryRunnerCallable
. (Done) - Generalize
FragmentContext
into aFragmentManager
andQueryManager
- Query tracks the fragments and their relations (Done)
- Operators are added to fragments (Done)
- Query gets its status from fragments
- On close, query delegates... how?
- Holds a collection of fragments (Done)
- Available from each fragment context (for now) (Done)
- Fragment contexts work per fragment (thread) as today.
- Query closes root fragment, ensures others are closed
- Fragment failures bubble up to the query
- Fragment, operator numbering
- Fragment numbers for logical layers
- Instance numbers within logical layers
- Operator numbers within fragments
- Profile creation
- Split operator tree from profile data
- Simple tree, reporting for root
- For children, first group by logical fragment
- Then, try to combine physical fragments by operators
- This means creating the operator tree for each fragment, then compare them to create groups
- If one group, include child fragment inline with parent. Else, include stub and list children by groups
- Create a "materialized list scatter/gather"
- Holds the set of child fragments
- Holds the set of leaves that read from the child fragments
- Starts the workers on
open()
, and waits for completion. - Use futures to gather lists and/or failures.
- Captures any exception into the fragment context.
- Closes the sub-fragment upon completion of the thread (which updates profile stats)
- Passes the exception (or a generalized one) to the caller to be given to the recipient of the future
- Buffers list of lists (yuck!)
-
next()
returns the one & only list of lists
- List-of-list-based operators
- Concat
- Merge
- Flatten
- Use a merge or concat operator to combine the results.
- Planner chooses concat, merge or nothing depending on segment count & query details
- Operator tree looks like???
01-01 Upstream
|
01-02 Concat|Merge|Null
|
01-03 List Scatter/Gather
|
02-01 List Materializer
|
02-02 Child
...
01-02 List Concat
Input count: x
Row count: x
01-03 List Scatter/Gather
Segment 1
Segment: xxxx
Fragment: 02-01
Row Count: xxx
Wait time: xxx
Total time: xxx
Segment 2
...
02-01 Child
01:
Row Count: x
If the child fragments are not all the same:
|
02-01 List Materializer
|
02-02 (Various)
Fragment 02 (instances 01, 04, 05)
02-01 List Materializer
|
02-02 Scan Query
Fragment 02 (Instances 02)
02-01 List Materializer
|
02-02 Null