Skip to content

Operators Component Details

Paul Rogers edited this page Sep 12, 2022 · 3 revisions

TimeSeries Engine

See TimeseriesQueryEngine.

  • One Guice-created engine per server.
  • Uses a buffer pool.
  • Operates on a single segment.
  • process() is really "plan" and should appear in the planner.
  • Handles vectorized and non-vectorized cases. Create these as separate operators.

Non-vectorized:

  • Uses QueryRunnerHelper to make the cursors, then map cursors to functions (Alter the scan query operators to follow)
  • Sequences.filter acts as a sequence filter. Equiv is the FilteringOperator. But here, it just ignores null values.
  • Each cursor:
    • Create aggregators
    • Aggregate rows using aggregators
    • Convert aggregators to a time series result
    • Close the aggregators

For vectorized:

  • Makes one vectorized cursor
  • Uses VectorCursorGranularizer to ???

Cursors

  • The cursor does not deliver rows, instead it uses selectors to write values to some target location.
  • Cursors handle filtering, interval filtering, virtual (computed) columns, granularity and asc/desc order (ordered by what?)
  • Regular cursors can produce multiple (in theory, but not in practice?), vector cursors produce only one.

TimeSeries Something

CombiningAccumulator handles values for a single group.

  • Comparator<T> ordering determines when two values are the same.
  • BinaryOperator<T> mergeFn merges values:
    • f(x, null) accepts the first value
    • f(x, y) combines values, where x is the result of a previous accumulation and y is the new value.

It seems that an Accumulator is, essentially, a collection builder: it gathers items into an output (which may be a list, etc.) When used as a CombiningAccumulator, it creates a list of like values, with the values combined.

  • "accumulate" - gather values
  • "combine" - function to combine two values into one value

Data coming into a agg is grouped by segment? So that "by segment" simply leaves the grouping?

Rows

Row is a representation of a row of data.

RowAdapters is a wrapper around a row with limited functionality: returns functions to get the timestamp and a column value.

MapBasedRow is a row comprised of a timestamp and a map.

TimeseriesResultBuilder is a builder for a (timestamp, event-map) row, represented as a Result over a TimeseriesResultValue

Result is another form of (timestamp, value) object.

ColumnSelectorFactory factory for column selectors.

ColumnValueSelector provides access methods (and conversions) for all Druid data types.

RowBasedColumnSelectorFactory creates column selectors for a Row given a column name.

Timeseries Query

Results appear to contain only a timestamp and aggregator values. (No dimensions per group?)

TimeseriesQueryQueryToolChest.mergeResults:

Base sequence values: TimeseriesResultValue which is a wrapper over a map. (Note: would be more efficient to store data as a tuple.)

Case 1: Overall total (with nuances for skip empty buckets, by-segment): get single value or create a default value.

Aggregations

AggregatorFactory opinionated factory for aggregations. This probably wants to be refactored, but will be hard because of the sheer number of items, and that that is an extension point. The factory seems to know its column name; thus it is a (type, column) pair. Should be split.

Sequences & Yielders (Again)

  • accumulate(prev-total, cur-row) -> new-total
  • mergefn.(x, null) - initializes the accumulator
  • mergeFn.(tot, row) -> new-tot

Notes

Random requirements:

  • By-segment

To convert merge:

  • Retain the nested query runner for now
  • Top flow
    • Toolchest returns a query runner that will do the work
    • That query runner implements the runtime flow
  • Runtime flow
    • Base runner of type QueryRunner<Result<TimeseriesResultValue>>
    • Aggregator
    • Limit (if needed)
    • Top aggregator
      • Handle empty input (with conditions, see above)
      • Add grand total, if requested

ChainedExecutionQueryRunner

A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor.

  • Returns a sequence of ...?
  • Based on an iterator of futures obtained from
  • Submitting a runner task to a processing pool
  • The runner itself converts the results of the child runner to a list and returns that as an iterable.
  • Merges all the iterables (i.e., lists) using MergeIterable which waits on the futures

Also handles cancellation of sub-tasks if any task fails.

Conversion Issues

  • Each item essentially becomes a new fragment, and must be managed as such.
  • We then need another layer to represent a DAG of fragments. Must change the top-level classes to match.
  • Processing pools accept PrioritizedQueryRunnerCallable, but we probably only want PrioritizedCallable. Add new methods? Seems benign. Change method? QueryProcessingPool is an extension point. Maybe create a default that uses the old form somehow.
  • To start, materialize the nested fragment to a list. Later, change this to a producer/consumer queue (which needs an entirely different execution framework.)

Sketch:

  • Use Operators.toList() to create the materialized list.
  • Create a new ListReader to read from the list. (Done)
  • Create a SimpleCallable that extends AbstractPrioritizedQueryRunnerCallable. (Done)
  • Generalize FragmentContext into a FragmentManager and QueryManager
    • Query tracks the fragments and their relations (Done)
    • Operators are added to fragments (Done)
    • Query gets its status from fragments
    • On close, query delegates... how?
    • Holds a collection of fragments (Done)
    • Available from each fragment context (for now) (Done)
    • Fragment contexts work per fragment (thread) as today.
    • Query closes root fragment, ensures others are closed
    • Fragment failures bubble up to the query
  • Fragment, operator numbering
    • Fragment numbers for logical layers
    • Instance numbers within logical layers
    • Operator numbers within fragments
  • Profile creation
    • Split operator tree from profile data
    • Simple tree, reporting for root
    • For children, first group by logical fragment
    • Then, try to combine physical fragments by operators
    • This means creating the operator tree for each fragment, then compare them to create groups
    • If one group, include child fragment inline with parent. Else, include stub and list children by groups
  • Create a "materialized list scatter/gather"
    • Holds the set of child fragments
    • Holds the set of leaves that read from the child fragments
    • Starts the workers on open(), and waits for completion.
    • Use futures to gather lists and/or failures.
    • Captures any exception into the fragment context.
    • Closes the sub-fragment upon completion of the thread (which updates profile stats)
    • Passes the exception (or a generalized one) to the caller to be given to the recipient of the future
    • Buffers list of lists (yuck!)
    • next() returns the one & only list of lists
  • List-of-list-based operators
    • Concat
    • Merge
    • Flatten
  • Use a merge or concat operator to combine the results.
    • Planner chooses concat, merge or nothing depending on segment count & query details
    • Operator tree looks like???
01-01 Upstream
|
01-02 Concat|Merge|Null
|
01-03 List Scatter/Gather
|
02-01 List Materializer
|
02-02 Child
...

01-02 List Concat
  Input count: x
  Row count: x

01-03 List Scatter/Gather
  Segment 1
    Segment: xxxx
    Fragment: 02-01
    Row Count: xxx
    Wait time: xxx
    Total time: xxx
  Segment 2
    ...

02-01 Child
  01:
    Row Count: x

If the child fragments are not all the same:

|
02-01 List Materializer
|
02-02 (Various)

Fragment 02 (instances 01, 04, 05)

02-01 List Materializer
|
02-02 Scan Query

Fragment 02 (Instances 02)

02-01 List Materializer
|
02-02 Null

Clone this wiki locally