Skip to content

Commit

Permalink
lib/storage: time series search optimization according to production …
Browse files Browse the repository at this point in the history
…workload profiling

Do not pass filter metric ids to getMetricIDsForTagFilter, since it has been appeared that this slows down
the function by multiple times when it finds big number of metricIDs (tens of millions).
  • Loading branch information
valyala committed Mar 16, 2021
1 parent 264d343 commit fd86a7d
Showing 1 changed file with 10 additions and 15 deletions.
25 changes: 10 additions & 15 deletions lib/storage/index_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2058,7 +2058,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
continue
}

metricIDs, _, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics, int64Max)
metricIDs, _, err := is.getMetricIDsForTagFilter(tf, maxMetrics, int64Max)
if err != nil {
return nil, nil, fmt.Errorf("cannot find MetricIDs for tagFilter %s: %w", tf, err)
}
Expand Down Expand Up @@ -2361,28 +2361,24 @@ const (

var uselessTagFilterCacheValue = []byte("1")

func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set.Set, maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
if tf.isNegative {
logger.Panicf("BUG: isNegative must be false")
}
metricIDs := &uint64set.Set{}
if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
var loopsCount int64
var err error
if filter != nil {
loopsCount, err = is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter, maxLoopsCount)
} else {
loopsCount, err = is.updateMetricIDsForOrSuffixesNoFilter(tf, metricIDs, maxMetrics, maxLoopsCount)
}
loopsCount, err := is.updateMetricIDsForOrSuffixesNoFilter(tf, metricIDs, maxMetrics, maxLoopsCount)
if err != nil {
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
}
return metricIDs, loopsCount, nil
}

// Slow path - scan for all the rows with the given prefix.
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, filter, metricIDs.Add, maxLoopsCount)
// Pass nil filter to getMetricIDsForTagFilterSlow, since it works faster on production workloads
// than non-nil filter with many entries.
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, nil, metricIDs.Add, maxLoopsCount)
if err != nil {
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
}
Expand Down Expand Up @@ -2872,7 +2868,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
continue
}
maxLoopsCount := getFirstPositiveLoopsCount(tfws[i+1:])
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics, maxLoopsCount)
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics, maxLoopsCount)
if err != nil {
if errors.Is(err, errTooManyLoops) {
// The tf took too many loops compared to the next filter. Postpone applying this filter.
Expand Down Expand Up @@ -2966,7 +2962,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
break
}
maxLoopsCount := getFirstPositiveFilterLoopsCount(tfws[i+1:])
m, filterLoopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, intMax, maxLoopsCount)
m, filterLoopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, intMax, maxLoopsCount)
if err != nil {
if errors.Is(err, errTooManyLoops) {
// Postpone tf, since it took more loops than the next filter may need.
Expand Down Expand Up @@ -3152,8 +3148,7 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
return true, nil
}

func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, filter *uint64set.Set, commonPrefix []byte,
maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
// Augument tag filter prefix for per-date search instead of global search.
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
Expand All @@ -3165,7 +3160,7 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
tfNew := *tf
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
tfNew.prefix = kb.B
metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics, maxLoopsCount)
metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics, maxLoopsCount)
kbPool.Put(kb)
return metricIDs, loopsCount, err
}
Expand Down

0 comments on commit fd86a7d

Please sign in to comment.