Skip to content

Commit

Permalink
Merge pull request #37 from talIguaz/dev-to-master
Browse files Browse the repository at this point in the history
Dev --> master
  • Loading branch information
gshatz authored Apr 11, 2019
2 parents 02ac9bc + 7458f3d commit f3568a4
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 44 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,4 @@ replace launchpad.net/gocheck => github.com/go-check/check v0.0.0-20180628173108

replace github.com/v3io/frames => github.com/v3io/frames v0.0.0-20190328123118-1dad1ff610509e7b087d9cd390ed1b452caecf15

replace github.com/v3io/v3io-tsdb => github.com/v3io/v3io-tsdb v0.9.0
replace github.com/v3io/v3io-tsdb => github.com/v3io/v3io-tsdb v0.0.0-20190410160018-de081cf7a8519d2c67e31b6202046b1defa87559
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,8 @@ github.com/v3io/frames v0.0.0-20190328123118-1dad1ff610509e7b087d9cd390ed1b452ca
github.com/v3io/frames v0.0.0-20190328123118-1dad1ff610509e7b087d9cd390ed1b452caecf15/go.mod h1:6aKW4Wl4A+gQhXH0JRCVOLgwvcrLyk+fqEpemuie094=
github.com/v3io/v3io-go-http v0.0.0-20190221115935-53e2b487c9a2 h1:NJc63wM25iS+ci5z7LVwjWD4QM0QpTQw/fovKzatss0=
github.com/v3io/v3io-go-http v0.0.0-20190221115935-53e2b487c9a2/go.mod h1:GXYcR9MxgfbE3BJdkXki5EclvtS8Nxu2RQNLA8hMMog=
github.com/v3io/v3io-tsdb v0.0.0-20190408164746-6763082a23aac832739fd3dc9afba20445592ee9 h1:NAQhv+V0OhisAfNv2h+/Z6WWMk8uw+tTdfJR80JJEZU=
github.com/v3io/v3io-tsdb v0.0.0-20190408164746-6763082a23aac832739fd3dc9afba20445592ee9/go.mod h1:5GOd2S8a0EtKkQAjp7Ke3o+oxZKdUMJFVEU+mk9ltdE=
github.com/v3io/v3io-tsdb v0.9.0 h1:HcopU5LpwAipvQ7D7jBZ2nrMNKycygttKBjnRwNzwZA=
github.com/v3io/v3io-tsdb v0.9.0/go.mod h1:5GOd2S8a0EtKkQAjp7Ke3o+oxZKdUMJFVEU+mk9ltdE=
github.com/v3io/v3io-tsdb v0.0.0-20190410160018-de081cf7a8519d2c67e31b6202046b1defa87559 h1:FWaY/tEnAVMdD51p+iDxbWw54l+5nAZ5dTbKclOsQLo=
github.com/v3io/v3io-tsdb v0.0.0-20190410160018-de081cf7a8519d2c67e31b6202046b1defa87559/go.mod h1:5GOd2S8a0EtKkQAjp7Ke3o+oxZKdUMJFVEU+mk9ltdE=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.0.0 h1:BwIoZQbBsTo3v2F5lz5Oy3TlTq4wLKTLV260EVTEWco=
Expand Down
74 changes: 48 additions & 26 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ var (
// DefaultEvaluationInterval is the default evaluation interval of
// a subquery in milliseconds.
DefaultEvaluationInterval int64

supportedV3ioFunctions = map[string]bool{"max_over_time": true,
"min_over_time": true,
"avg_over_time": true,
"sum_over_time": true,
"count_over_time": true,
"stddev_over_time": true,
"stdvar_over_time": true}
supportedV3ioAggregations = map[ItemType]bool{itemAvg: true,
itemCount: true,
itemSum: true,
itemMin: true,
itemMax: true,
itemStddev: true,
itemStdvar: true}
)

// SetDefaultEvaluationInterval sets DefaultEvaluationInterval.
Expand Down Expand Up @@ -518,6 +533,8 @@ func (ng *Engine) cumulativeSubqueryOffset(path []Node) time.Duration {

func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, storage.Warnings, error) {
var maxOffset time.Duration
var aggregationWindow int64

Inspect(s.Expr, func(node Node, path []Node) error {
subqOffset := ng.cumulativeSubqueryOffset(path)
switch n := node.(type) {
Expand All @@ -529,6 +546,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
maxOffset = n.Offset + LookbackDelta + subqOffset
}
case *MatrixSelector:
aggregationWindow = n.Range.Nanoseconds() / 1000000
if maxOffset < n.Range+subqOffset {
maxOffset = n.Range + subqOffset
}
Expand Down Expand Up @@ -556,21 +574,19 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
Step: durationToInt64Millis(s.Interval),
}

querier.(*tsdb.V3ioPromQuerier).UseAggregates = isV3ioEligibleQueryExpr(s.Expr)

switch n := node.(type) {
case *VectorSelector:
params.Start = params.Start - durationMilliseconds(LookbackDelta)
params.Func = extractFuncFromPath(path)
params.AggregationWindow = aggregationWindow
if n.Offset > 0 {
offsetMilliseconds := durationMilliseconds(n.Offset)
params.Start = params.Start - offsetMilliseconds
params.End = params.End - offsetMilliseconds
}

switch e := s.Expr.(type) {
case *AggregateExpr:
querier.(*tsdb.V3ioPromQuerier).UseAggregates = isV3ioEligibleQueryExpr(e)
}

level.Debug(ng.logger).Log("msg", "Querying v3io vector selector",
"useV3ioAggregations", querier.(*tsdb.V3ioPromQuerier).UseAggregates,
"use3ioAggregationConfig", querier.(*tsdb.V3ioPromQuerier).UseAggregatesConfig)
Expand All @@ -587,6 +603,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
// For all matrix queries we want to ensure that we have (end-start) + range selected
// this way we have `range` data before the start time
params.Start = params.Start - durationMilliseconds(n.Range)
params.AggregationWindow = aggregationWindow
if n.Offset > 0 {
offsetMilliseconds := durationMilliseconds(n.Offset)
params.Start = params.Start - offsetMilliseconds
Expand Down Expand Up @@ -977,6 +994,10 @@ func (ev *evaluator) eval(expr Expr) Value {
otherInArgs := make([]Vector, len(e.Args))
for i, e := range e.Args {
if i != matrixArgIndex {
if ev.useV3ioAggregations {
return ev.emptyAggregation(e)
}

otherArgs[i] = ev.eval(e).(Matrix)
otherInArgs[i] = Vector{Sample{}}
inArgs[i] = otherInArgs[i]
Expand Down Expand Up @@ -1906,33 +1927,34 @@ func (ev *evaluator) emptyAggregation(e Expr) Matrix {
}

func isV3ioEligibleAggregation(op ItemType) bool {
supportedV3ioAggregations := []ItemType{itemAvg, itemCount, itemSum, itemMin, itemMax, itemStddev, itemStdvar}
return containsItemType(op, supportedV3ioAggregations)
return supportedV3ioAggregations[op]
}

func isV3ioEligibleQueryExpr(e *AggregateExpr) bool {
if !isV3ioEligibleAggregation(e.Op) {
return false
}
if e.Without {
return false
}
// Currently only supports non-nested functions.
// Not supported - avg(max_over_time(cpu[10m])), Supported - avg(cpu)
if e, ok := e.Expr.(*Call); ok {
if e.Func != nil {
return false
}
}
return true
func isV3ioEligibleFunction(function string) bool {
return supportedV3ioFunctions[function]
}

func containsItemType(item ItemType, slice []ItemType) bool {
for _, curr := range slice {
if curr == item {
return true
func isV3ioEligibleQueryExpr(e Expr) bool {
switch expr := e.(type) {
case *AggregateExpr:
if !isV3ioEligibleAggregation(expr.Op) {
return false
}
if expr.Without {
return false
}
// Currently only supports non-nested functions.
// Not supported - avg(max_over_time(cpu[10m])), Supported - avg(cpu)
if e, ok := expr.Expr.(*Call); ok {
if e.Func != nil {
return false
}
}
return true
case *Call:
return isV3ioEligibleFunction(expr.Func.Name)
}

return false
}

Expand Down
30 changes: 18 additions & 12 deletions storage/tsdb/promtsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (promQuery *V3ioPromQuerier) UseV3ioAggregations() bool {

// Select returns a set of series that matches the given label matchers.
func (promQuery *V3ioPromQuerier) Select(params *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
name, filter, functions := match2filter(oms, promQuery.logger)
name, filter, function := match2filter(oms, promQuery.logger)
noAggr := false

// if a nil params is passed we assume it's a metadata query, so we fetch only the different labelsets withtout data.
Expand All @@ -100,28 +100,34 @@ func (promQuery *V3ioPromQuerier) Select(params *storage.SelectParams, oms ...*l
}

promQuery.logger.Debug("SelectParams: %+v", params)
overTimeSuffix := "_over_time"

if params.Func != "" {
// only pass xx_over_time functions (just the xx part)
// TODO: support count/stdxx, require changes in Prometheus: promql/functions.go, not calc aggregate twice
if strings.HasSuffix(params.Func, "_over_time") {
f := params.Func[0:3]
if params.Step == 0 && (f == "min" || f == "max" || f == "sum" || f == "avg") {
functions = f
if strings.HasSuffix(params.Func, overTimeSuffix) {
if promQuery.UseAggregates && promQuery.UseAggregatesConfig {
function = strings.TrimSuffix(params.Func, overTimeSuffix)
} else {
noAggr = true
f := params.Func[0:3]
if params.Step == 0 && (f == "min" || f == "max" || f == "sum" || f == "avg") {
function = f
} else {
noAggr = true
}
}
} else if promQuery.UseV3ioAggregations() {
functions = fmt.Sprintf("%v_all", params.Func)
function = fmt.Sprintf("%v_all", params.Func)
}
}

selectParams := &pquerier.SelectParams{Name: name,
Functions: functions,
Step: params.Step,
Filter: filter,
From: promQuery.mint,
To: promQuery.maxt}
Functions: function,
Step: params.Step,
Filter: filter,
From: promQuery.mint,
To: promQuery.maxt,
AggregationWindow: params.AggregationWindow}

set, err := promQuery.v3ioQuerier.SelectProm(selectParams, noAggr)
return &V3ioPromSeriesSet{s: set}, nil, err
Expand Down
2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ github.com/v3io/frames
github.com/v3io/frames/pb
# github.com/v3io/v3io-go-http v0.0.0-20190221115935-53e2b487c9a2
github.com/v3io/v3io-go-http
# github.com/v3io/v3io-tsdb v0.9.0 => github.com/v3io/v3io-tsdb v0.9.0
# github.com/v3io/v3io-tsdb v0.9.0 => github.com/v3io/v3io-tsdb v0.0.0-20190410160018-de081cf7a8519d2c67e31b6202046b1defa87559
github.com/v3io/v3io-tsdb/pkg/aggregate
github.com/v3io/v3io-tsdb/pkg/appender
github.com/v3io/v3io-tsdb/pkg/config
Expand Down

0 comments on commit f3568a4

Please sign in to comment.