From df448c654bc3262776916431aa3990e09227b223 Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Fri, 27 Aug 2021 09:59:23 -0400 Subject: [PATCH] feat(tsi): optimize series iteration (#22316) When using queries like 'select count(_seriesKey) from bigmeasurement`, we should iterate over the tsi structures to serve the query instead of loading all the series into memory up front. Co-authored-by: Sam Arnold --- CHANGELOG.md | 1 + tsdb/engine/tsm1/engine.go | 56 ++++++++++++++++++++ tsdb/engine/tsm1/engine_test.go | 75 ++++++++++++++++++++++++++ tsdb/engine/tsm1/iterator.go | 69 ++++++++++++++++++++++++ tsdb/index.go | 93 +++++++++++++++++++++++++++++++++ tsdb/series_file.go | 2 +- 6 files changed, 295 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8dac2d0173..4172ffeab58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,6 +73,7 @@ Because of the version bump to `go`, the macOS build for this release requires a 1. [21910](https://github.com/influxdata/influxdb/pull/21910): Added `--ui-disabled` option to `influxd` to allow for running with the UI disabled. 1. [21958](https://github.com/influxdata/influxdb/pull/21958): Telemetry improvements: Do not record telemetry data for non-existant paths; replace invalid static asset paths with a slug. 1. [22023](https://github.com/influxdata/influxdb/pull/22023): Upgrade Flux to v0.124.0. +1. [22316](https://github.com/influxdata/influxdb/pull/22316): Optimize series iteration for queries that can be answered without inspecting TSM data. ### Bug Fixes diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 7934110da1a..69cfb6cd15f 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -2390,6 +2390,40 @@ func (e *Engine) CreateIterator(ctx context.Context, measurement string, opt que return newMergeFinalizerIterator(ctx, itrs, opt, e.logger) } +// createSeriesIterator creates an optimized series iterator if possible. +// We exclude less-common cases for now as not worth implementing. +func (e *Engine) createSeriesIterator(measurement string, ref *influxql.VarRef, is tsdb.IndexSet, opt query.IteratorOptions) (query.Iterator, error) { + // Main check to see if we are trying to create a seriesKey iterator + if ref == nil || ref.Val != "_seriesKey" || len(opt.Aux) != 0 { + return nil, nil + } + // Check some other cases that we could maybe handle, but don't + if len(opt.Dimensions) > 0 { + return nil, nil + } + if opt.SLimit != 0 || opt.SOffset != 0 { + return nil, nil + } + if opt.StripName { + return nil, nil + } + if opt.Ordered { + return nil, nil + } + // Actual creation of the iterator + seriesCursor, err := is.MeasurementSeriesKeyByExprIterator([]byte(measurement), opt.Condition, opt.Authorizer) + if err != nil { + seriesCursor.Close() + return nil, err + } + var seriesIterator query.Iterator + seriesIterator = newSeriesIterator(measurement, seriesCursor) + if opt.InterruptCh != nil { + seriesIterator = query.NewInterruptIterator(seriesIterator, opt.InterruptCh) + } + return seriesIterator, nil +} + func (e *Engine) createCallIterator(ctx context.Context, measurement string, call *influxql.Call, opt query.IteratorOptions) ([]query.Iterator, error) { ref, _ := call.Args[0].(*influxql.VarRef) @@ -2399,6 +2433,28 @@ func (e *Engine) createCallIterator(ctx context.Context, measurement string, cal return nil, nil } + // check for optimized series iteration for tsi index + if e.index.Type() == tsdb.TSI1IndexName { + indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} + seriesOpt := opt + if len(opt.Dimensions) == 0 && call.Name == "count" { + // no point ordering the series if we are just counting all of them + seriesOpt.Ordered = false + } + seriesIterator, err := e.createSeriesIterator(measurement, ref, indexSet, seriesOpt) + if err != nil { + return nil, err + } + if seriesIterator != nil { + callIterator, err := query.NewCallIterator(seriesIterator, opt) + if err != nil { + seriesIterator.Close() + return nil, err + } + return []query.Iterator{callIterator}, nil + } + } + // Determine tagsets for this measurement based on dimensions and filters. var ( tagSets []*query.TagSet diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index bb35677427e..9b75d8ae2ef 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -27,6 +27,7 @@ import ( "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" "github.com/influxdata/influxql" + tassert "github.com/stretchr/testify/assert" "go.uber.org/zap/zaptest" ) @@ -2023,6 +2024,80 @@ func TestEngine_CreateCursor_Descending(t *testing.T) { } } +// Ensure engine can create an descending iterator for cached values. +func TestEngine_CreateIterator_SeriesKey(t *testing.T) { + t.Parallel() + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { + assert := tassert.New(t) + e := MustOpenEngine(t, index) + defer e.Close() + + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) + e.CreateSeriesIfNotExists([]byte("cpu,host=A,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "A", "region": "east"})) + e.CreateSeriesIfNotExists([]byte("cpu,host=B,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "B", "region": "east"})) + e.CreateSeriesIfNotExists([]byte("cpu,host=C,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "C", "region": "east"})) + e.CreateSeriesIfNotExists([]byte("cpu,host=A,region=west"), []byte("cpu"), models.NewTags(map[string]string{"host": "A", "region": "west"})) + + if err := e.WritePointsString( + `cpu,host=A,region=east value=1.1 1000000001`, + `cpu,host=B,region=east value=1.2 1000000002`, + `cpu,host=A,region=east value=1.3 1000000003`, + `cpu,host=C,region=east value=1.4 1000000004`, + `cpu,host=A,region=west value=1.5 1000000005`, + ); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + opts := query.IteratorOptions{ + Expr: influxql.MustParseExpr(`_seriesKey`), + Dimensions: []string{}, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, + Condition: influxql.MustParseExpr(`host = 'A'`), + } + + itr, err := e.CreateIterator(context.Background(), "cpu", opts) + if err != nil { + t.Fatal(err) + } + + stringItr, ok := itr.(query.StringIterator) + assert.True(ok, "series iterator must be of type string") + expectedSeries := map[string]struct{}{ + "cpu,host=A,region=west": struct{}{}, + "cpu,host=A,region=east": struct{}{}, + } + var str *query.StringPoint + for str, err = stringItr.Next(); err == nil && str != (*query.StringPoint)(nil); str, err = stringItr.Next() { + _, ok := expectedSeries[str.Value] + assert.True(ok, "Saw bad key "+str.Value) + delete(expectedSeries, str.Value) + } + assert.NoError(err) + assert.NoError(itr.Close()) + + countOpts := opts + countOpts.Expr = influxql.MustParseExpr(`count(_seriesKey)`) + itr, err = e.CreateIterator(context.Background(), "cpu", countOpts) + if err != nil { + t.Fatal(err) + } + + integerIter, ok := itr.(query.IntegerIterator) + assert.True(ok, "series count iterator must be of type integer") + i, err := integerIter.Next() + assert.NoError(err) + assert.Equal(int64(2), i.Value, "must count 2 series with host=A") + i, err = integerIter.Next() + assert.NoError(err) + assert.Equal((*query.IntegerPoint)(nil), i, "count iterator has only one output") + assert.NoError(itr.Close()) + }) + } +} + func makeBlockTypeSlice(n int) []byte { r := make([]byte, n) b := tsm1.BlockFloat64 diff --git a/tsdb/engine/tsm1/iterator.go b/tsdb/engine/tsm1/iterator.go index 972996139e1..fab159fb52a 100644 --- a/tsdb/engine/tsm1/iterator.go +++ b/tsdb/engine/tsm1/iterator.go @@ -3,6 +3,7 @@ package tsm1 import ( "context" "fmt" + "sync" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/pkg/metrics" @@ -216,3 +217,71 @@ func newInstrumentedIterator(ctx context.Context, itr query.Iterator) query.Iter panic(fmt.Sprintf("unsupported instrumented iterator type: %T", itr)) } } + +type seriesIterator struct { + cur tsdb.SeriesKeyIterator + point query.StringPoint // reusable buffer + + statsLock sync.Mutex + stats query.IteratorStats + statsBuf query.IteratorStats +} + +func newSeriesIterator(name string, cur tsdb.SeriesKeyIterator) *seriesIterator { + itr := &seriesIterator{ + cur: cur, + point: query.StringPoint{ + Name: name, + Tags: query.NewTags(nil), + }, + } + itr.stats = itr.statsBuf + return itr +} + +// Next returns the next point from the iterator. +func (itr *seriesIterator) Next() (*query.StringPoint, error) { + // Read from the main cursor + b, err := itr.cur.Next() + if err != nil { + itr.copyStats() + return nil, err + } + itr.point.Value = string(b) + + // Exit if we have no more points or we are outside our time range. + if b == nil { + itr.copyStats() + return nil, nil + } + // Track points returned. + itr.statsBuf.PointN++ + itr.statsBuf.SeriesN++ + + // Copy buffer to stats periodically. + if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 { + itr.copyStats() + } + + return &itr.point, nil +} + +// copyStats copies from the itr stats buffer to the stats under lock. +func (itr *seriesIterator) copyStats() { + itr.statsLock.Lock() + itr.stats = itr.statsBuf + itr.statsLock.Unlock() +} + +// Stats returns stats on the points processed. +func (itr *seriesIterator) Stats() query.IteratorStats { + itr.statsLock.Lock() + stats := itr.stats + itr.statsLock.Unlock() + return stats +} + +// Close closes the iterator. +func (itr *seriesIterator) Close() error { + return itr.cur.Close() +} diff --git a/tsdb/index.go b/tsdb/index.go index 85e2fe57134..e661a5d73f4 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -211,6 +211,12 @@ type SeriesIDIterator interface { Close() error } +// SeriesKeyIterator represents an iterator over a list of SeriesKeys +type SeriesKeyIterator interface { + Next() ([]byte, error) + Close() error +} + // SeriesIDSetIterator represents an iterator that can produce a SeriesIDSet. type SeriesIDSetIterator interface { SeriesIDIterator @@ -2295,6 +2301,93 @@ func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Ex return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil } +type measurementSeriesKeyByExprIterator struct { + ids SeriesIDIterator + is IndexSet + auth query.Authorizer + once sync.Once + releaser func() +} + +func (itr *measurementSeriesKeyByExprIterator) Next() ([]byte, error) { + if itr == nil { + return nil, nil + } + for { + e, err := itr.ids.Next() + if err != nil { + return nil, err + } else if e.SeriesID == 0 { + return nil, nil + } + + seriesKey := itr.is.SeriesFile.SeriesKey(e.SeriesID) + if len(seriesKey) == 0 { + continue + } + + name, tags := ParseSeriesKey(seriesKey) + + // Check leftover filters. All fields that might be filtered default to zero values + if e.Expr != nil { + if v, ok := e.Expr.(*influxql.BooleanLiteral); ok { + if !v.Val { + continue + } + } else { + values := make(map[string]interface{}, len(tags)) + for _, t := range tags { + values[string(t.Key)] = string(t.Value) + } + if !influxql.EvalBool(e.Expr, values) { + continue + } + } + } + + if itr.auth != nil && !itr.auth.AuthorizeSeriesRead(itr.is.Database(), name, tags) { + continue + } + + out := models.MakeKey(name, tags) + // ensure nil is only returned when we are done (or for errors) + if out == nil { + out = []byte{} + } + return out, nil + } +} + +func (itr *measurementSeriesKeyByExprIterator) Close() error { + if itr == nil { + return nil + } + itr.once.Do(itr.releaser) + return itr.ids.Close() +} + +// MeasurementSeriesKeyByExprIterator iterates through series, filtered by an expression on the tags. +// Any non-tag expressions will be filtered as if the field had the zero value. +func (is IndexSet) MeasurementSeriesKeyByExprIterator(name []byte, expr influxql.Expr, auth query.Authorizer) (SeriesKeyIterator, error) { + release := is.SeriesFile.Retain() + // Create iterator for all matching series. + ids, err := is.measurementSeriesByExprIterator(name, expr) + if err != nil { + release() + return nil, err + } + if ids == nil { + release() + return nil, nil + } + return &measurementSeriesKeyByExprIterator{ + ids: ids, + releaser: release, + auth: auth, + is: is, + }, nil +} + // MeasurementSeriesKeysByExpr returns a list of series keys matching expr. func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { release := is.SeriesFile.Retain() diff --git a/tsdb/series_file.go b/tsdb/series_file.go index ed083db5691..4100dcfe4ab 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -266,7 +266,7 @@ func (f *SeriesFile) SeriesCount() uint64 { return n } -// SeriesIterator returns an iterator over all the series. +// SeriesIDIterator returns an iterator over all the series. func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator { var ids []uint64 for _, p := range f.partitions {