Skip to content

Operators Timeseries Query

Paul Rogers edited this page Oct 14, 2022 · 2 revisions

CombiningAccumulator handles values for a single group.

  • Comparator<T> ordering determines when two values are the same.
  • BinaryOperator<T> mergeFn merges values:
    • f(x, null) accepts the first value
    • f(x, y) combines values, where x is the result of a previous accumulation and y is the new value.

It seems that an Accumulator is, essentially, a collection builder: it gathers items into an output (which may be a list, etc.) When used as a CombiningAccumulator, it creates a list of like values, with the values combined.

  • "accumulate" - gather values
  • "combine" - function to combine two values into one value

Data coming into a agg is grouped by segment? So that "by segment" simply leaves the grouping?

Rows

Row is a representation of a row of data.

RowAdapters is a wrapper around a row with limited functionality: returns functions to get the timestamp and a column value.

MapBasedRow is a row comprised of a timestamp and a map.

TimeseriesResultBuilder is a builder for a (timestamp, event-map) row, represented as a Result over a TimeseriesResultValue

Result is another form of (timestamp, value) object.

ColumnSelectorFactory factory for column selectors.

ColumnValueSelector provides access methods (and conversions) for all Druid data types.

RowBasedColumnSelectorFactory creates column selectors for a Row given a column name.

Timeseries Query

Results appear to contain only a timestamp and aggregator values. (No dimensions per group?)

TimeseriesQueryQueryToolChest.mergeResults:

Base sequence values: TimeseriesResultValue which is a wrapper over a map. (Note: would be more efficient to store data as a tuple.)

Case 1: Overall total (with nuances for skip empty buckets, by-segment): get single value or create a default value.

Aggregations

AggregatorFactory opinionated factory for aggregations. This probably wants to be refactored, but will be hard because of the sheer number of items, and that that is an extension point. The factory seems to know its column name; thus it is a (type, column) pair. Should be split.

Sequences & Yielders (Again)

  • accumulate(prev-total, cur-row) -> new-total
  • mergefn.(x, null) - initializes the accumulator
  • mergeFn.(tot, row) -> new-tot

Notes

Random requirements:

  • By-segment

To convert merge:

  • Retain the nested query runner for now
  • Top flow
    • Toolchest returns a query runner that will do the work
    • That query runner implements the runtime flow
  • Runtime flow
    • Base runner of type QueryRunner<Result<TimeseriesResultValue>>
    • Aggregator
    • Limit (if needed)
    • Top aggregator
      • Handle empty input (with conditions, see above)
      • Add grand total, if requested

Top-Level Flow

Runner stack:

  • What?
  • TimeseriesQueryQueryToolChest.mergeResults() -> anonymous runner
  • Anonymous merge runner
    • Rewrite query to remove grand total flag
    • Run resultMergeQueryRunner
    • Result for zero rows
    • Grand total
  • Anonymous resultMergeQueryRunner
    • Remove post aggregators
    • Run merging runner
    • Apply limit
  • Merging runner (if not by segment)
  • Base runner

In more detail:

  • TimeseriesQueryQueryToolChest.mergeResults() returns a query runner
  • Creates an instance of ResultMergeQueryRunner which extends BySegmentSkippingQueryRunner
  • Provides the base runner and comparators
  • Creates an anonymous runner that takes the merge runner via a closure

When the anonymous runner is run:

  • Rewrites the query to remove the grand total flag
  • Invokes the merging runner and obtains the results
  • Special cases ALL
    • If the above results are empty, provide a zero result instead
  • If no grand total, return the results thus far.
  • Create a sequence which accumulates grand totals
    • Totals are over all rows; no grouping here
  • Return a sequence which first iterates over the results
    • Then inserts a grand-totals row.

When the merging runner is run:

  • Calls BySegmentSkippingQueryRunner.run()
  • If by segment, just returns the base runner.
  • Else, calls doRun() with the base runner.
  • For Timeseries, doRun() is overridden in the mergeResults() method.
    • ResultMergeQueryRunner.doRun() creates a CombiningSequence to do the merge
  • Removes the post-aggs
  • Calls ResultMergeQueryRunner.doRun(), with the rewritten query
  • Applies a limit
    • Note: this means the limit is prior to grand totals, and so the last grand total will be wrong.
Clone this wiki locally