From 801a00b6dfd4d4ca113d610c8a8c61838cf76423 Mon Sep 17 00:00:00 2001 From: jlapacik Date: Wed, 11 Jul 2018 13:03:14 -0700 Subject: [PATCH] flux options interface default now option value --- builtin/builtin.go | 1 + compile.go | 88 ++++++++++++---- control/controller.go | 4 +- functions/count_test.go | 2 +- functions/join.go | 2 +- functions/mean_test.go | 2 +- functions/percentile_test.go | 2 +- functions/prepcsvtests/prepcsvtests.go | 3 +- functions/query_test.go | 3 +- functions/range_test.go | 2 +- functions/skew_test.go | 2 +- functions/spread_test.go | 2 +- functions/stddev_test.go | 2 +- functions/sum_test.go | 2 +- functions/system_time.go | 29 ++++++ interpreter/interpreter.go | 134 ++++++++++++++++++++----- interpreter/interpreter_test.go | 58 ++++++----- options/now.go | 10 ++ plan/logical.go | 3 + plan/logical_test.go | 6 +- plan/physical.go | 13 ++- plan/physical_test.go | 12 ++- query_test.go | 102 ++++++++++++++++++- querytest/compile.go | 9 +- querytest/compspecs/compspecs.go | 3 +- repl/repl.go | 25 ++--- spec.go | 2 + values/function.go | 96 ++++++++++++++++++ values/values.go | 7 -- 29 files changed, 513 insertions(+), 113 deletions(-) create mode 100644 functions/system_time.go create mode 100644 options/now.go create mode 100644 values/function.go diff --git a/builtin/builtin.go b/builtin/builtin.go index 79c3cbd423..e27933df4d 100644 --- a/builtin/builtin.go +++ b/builtin/builtin.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/platform/query/complete" _ "github.com/influxdata/platform/query/functions" // Import the built-in functions "github.com/influxdata/platform/query/interpreter" + _ "github.com/influxdata/platform/query/options" // Import the built-in options ) func init() { diff --git a/compile.go b/compile.go index f6cd166024..d04cd7c5bc 100644 --- a/compile.go +++ b/compile.go @@ -19,6 +19,7 @@ const ( TableParameter = "table" tableKindKey = "kind" tableParentsKey = "parents" + nowOption = "now" //tableSpecKey = "spec" ) @@ -35,7 +36,8 @@ type options struct { } // Compile evaluates a Flux script producing a query Spec. -func Compile(ctx context.Context, q string, opts ...Option) (*Spec, error) { +// now parameter must be non-zero, that is the default now time should be set before compiling. +func Compile(ctx context.Context, q string, now time.Time, opts ...Option) (*Spec, error) { o := new(options) for _, opt := range opts { opt(o) @@ -49,8 +51,10 @@ func Compile(ctx context.Context, q string, opts ...Option) (*Spec, error) { s, _ = opentracing.StartSpanFromContext(ctx, "compile") defer s.Finish() - scope, decls := builtIns() - interpScope := interpreter.NewScopeWithValues(scope) + itrp := NewInterpreter() + itrp.SetOption(nowOption, nowFunc(now)) + + _, decls := builtIns(itrp) // Convert AST program to a semantic program semProg, err := semantic.New(astProg, decls) @@ -58,11 +62,10 @@ func Compile(ctx context.Context, q string, opts ...Option) (*Spec, error) { return nil, err } - operations, err := interpreter.Eval(semProg, interpScope) - if err != nil { + if err := itrp.Eval(semProg); err != nil { return nil, err } - spec := toSpec(operations) + spec := toSpec(itrp) if o.verbose { log.Println("Query Spec: ", Formatted(spec, FmtJSON)) @@ -70,7 +73,38 @@ func Compile(ctx context.Context, q string, opts ...Option) (*Spec, error) { return spec, nil } -func toSpec(stmtVals []values.Value) *Spec { +// NewInterpreter returns an interpreter instance with +// pre-constructed options and global scopes. +func NewInterpreter() *interpreter.Interpreter { + options := make(map[string]values.Value, len(builtinOptions)) + globals := make(map[string]values.Value, len(builtinScope)) + + for k, v := range builtinScope { + globals[k] = v + } + + for k, v := range builtinOptions { + options[k] = v + } + + return interpreter.NewInterpreter(options, globals) +} + +func nowFunc(now time.Time) values.Function { + timeVal := values.NewTimeValue(values.ConvertTime(now)) + ftype := semantic.NewFunctionType(semantic.FunctionSignature{ + ReturnType: semantic.Time, + }) + call := func(args values.Object) (values.Value, error) { + return timeVal, nil + } + sideEffect := false + return values.NewFunction(nowOption, ftype, call, sideEffect) +} + +func toSpec(itrp *interpreter.Interpreter) *Spec { + operations := itrp.SideEffects() + ider := &ider{ id: 0, lookup: make(map[*TableObject]OperationID), @@ -78,9 +112,9 @@ func toSpec(stmtVals []values.Value) *Spec { spec := new(Spec) visited := make(map[*TableObject]bool) - nodes := make([]*TableObject, 0, len(stmtVals)) + nodes := make([]*TableObject, 0, len(operations)) - for _, val := range stmtVals { + for _, val := range operations { if op, ok := val.(*TableObject); ok { dup := false for _, node := range nodes { @@ -95,12 +129,23 @@ func toSpec(stmtVals []values.Value) *Spec { } } } + + // now option is Time value + nowValue, _ := itrp.Option(nowOption).Function().Call(nil) + spec.Now = nowValue.Time().Time() + return spec } type CreateOperationSpec func(args Arguments, a *Administration) (OperationSpec, error) var builtinScope = make(map[string]values.Value) + +// TODO(Josh): Default option values should be registered similarly to built-in +// functions. Default options should be registered in their own files +// (or in a single file) using the RegisterBuiltInOption function which will +// place the resolved option value in the following map. +var builtinOptions = make(map[string]values.Value) var builtinDeclarations = make(semantic.DeclarationScope) // list of builtin scripts @@ -150,6 +195,17 @@ func RegisterBuiltInValue(name string, v values.Value) { builtinScope[name] = v } +// RegisterBuiltInOption adds the value to the builtin scope. +func RegisterBuiltInOption(name string, v values.Value) { + if finalized { + panic(errors.New("already finalized, cannot register builtin option")) + } + if _, ok := builtinOptions[name]; ok { + panic(fmt.Errorf("duplicate registration for builtin option %q", name)) + } + builtinOptions[name] = v +} + // FinalizeBuiltIns must be called to complete registration. // Future calls to RegisterFunction, RegisterBuiltIn or RegisterBuiltInValue will panic. func FinalizeBuiltIns() { @@ -373,16 +429,12 @@ func BuiltIns() (map[string]values.Value, semantic.DeclarationScope) { if !finalized { panic("builtins not finalized") } - return builtIns() + return builtIns(NewInterpreter()) } -func builtIns() (map[string]values.Value, semantic.DeclarationScope) { +func builtIns(itrp *interpreter.Interpreter) (map[string]values.Value, semantic.DeclarationScope) { decls := builtinDeclarations.Copy() - scope := make(map[string]values.Value, len(builtinScope)) - for k, v := range builtinScope { - scope[k] = v - } - interpScope := interpreter.NewScopeWithValues(scope) + for name, script := range builtins { astProg, err := parser.NewAST(script) if err != nil { @@ -393,11 +445,11 @@ func builtIns() (map[string]values.Value, semantic.DeclarationScope) { panic(errors.Wrapf(err, "failed to create semantic graph for builtin %q", name)) } - if _, err := interpreter.Eval(semProg, interpScope); err != nil { + if err := itrp.Eval(semProg); err != nil { panic(errors.Wrapf(err, "failed to evaluate builtin %q", name)) } } - return scope, decls + return itrp.GlobalScope().Values(), decls } type Administration struct { diff --git a/control/controller.go b/control/controller.go index 8584f4df2c..12c4a99377 100644 --- a/control/controller.go +++ b/control/controller.go @@ -114,7 +114,7 @@ func (c *Controller) compileQuery(q *Query, queryStr string) error { if !q.tryCompile() { return errors.New("failed to transition query to compiling state") } - spec, err := query.Compile(q.compilingCtx, queryStr, query.Verbose(c.verbose)) + spec, err := query.Compile(q.compilingCtx, queryStr, q.now, query.Verbose(c.verbose)) if err != nil { return errors.Wrap(err, "failed to compile query") } @@ -226,7 +226,7 @@ func (c *Controller) processQuery(q *Query) (pop bool, err error) { log.Println("logical plan", plan.Formatted(lp)) } - p, err := c.pplanner.Plan(lp, nil, q.now) + p, err := c.pplanner.Plan(lp, nil) if err != nil { return true, errors.Wrap(err, "failed to create physical plan") } diff --git a/functions/count_test.go b/functions/count_test.go index 2eb2839577..10d6919886 100644 --- a/functions/count_test.go +++ b/functions/count_test.go @@ -4,10 +4,10 @@ import ( "testing" "time" - "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/execute" "github.com/influxdata/platform/query/execute/executetest" + "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query/plan" "github.com/influxdata/platform/query/plan/plantest" "github.com/influxdata/platform/query/querytest" diff --git a/functions/join.go b/functions/join.go index ec93c637ff..7495aa45a3 100644 --- a/functions/join.go +++ b/functions/join.go @@ -127,7 +127,7 @@ func createJoinOpSpec(args query.Arguments, a *query.Administration) (query.Oper return } p := t.(*query.TableObject) - joinParams.add(k /*parameter*/, p /*argument*/) + joinParams.add(k, p) spec.tableNames[p] = k }) if err != nil { diff --git a/functions/mean_test.go b/functions/mean_test.go index b9bfdee509..64efa033d8 100644 --- a/functions/mean_test.go +++ b/functions/mean_test.go @@ -4,9 +4,9 @@ import ( "math" "testing" - "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/execute/executetest" + "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query/querytest" ) diff --git a/functions/percentile_test.go b/functions/percentile_test.go index 38cd878b57..e77f9d1160 100644 --- a/functions/percentile_test.go +++ b/functions/percentile_test.go @@ -4,10 +4,10 @@ import ( "math" "testing" - "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/execute" "github.com/influxdata/platform/query/execute/executetest" + "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query/querytest" ) diff --git a/functions/prepcsvtests/prepcsvtests.go b/functions/prepcsvtests/prepcsvtests.go index 576031f1f2..6528131eb4 100644 --- a/functions/prepcsvtests/prepcsvtests.go +++ b/functions/prepcsvtests/prepcsvtests.go @@ -9,6 +9,7 @@ import ( "path/filepath" "regexp" "strings" + "time" "github.com/influxdata/platform/query" _ "github.com/influxdata/platform/query/builtin" @@ -69,7 +70,7 @@ func main() { } qs := querytest.GetQueryServiceBridge() - qspec, err := query.Compile(context.Background(), string(querytext)) + qspec, err := query.Compile(context.Background(), string(querytext), time.Now().UTC()) if err != nil { fmt.Printf("error compiling. \n query: \n %s \n err: %s", string(querytext), err) return diff --git a/functions/query_test.go b/functions/query_test.go index 4a03185da1..85f1f4e5e0 100644 --- a/functions/query_test.go +++ b/functions/query_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/influxdata/platform/query" _ "github.com/influxdata/platform/query/builtin" @@ -83,7 +84,7 @@ func queryTester(t *testing.T, qs query.QueryService, prefix, queryExt string) e t.Fatal(err) } - spec, err := query.Compile(context.Background(), q) + spec, err := query.Compile(context.Background(), q, time.Now().UTC()) if err != nil { t.Fatalf("failed to compile: %v", err) } diff --git a/functions/range_test.go b/functions/range_test.go index 20c52b4f0a..ef364c9b58 100644 --- a/functions/range_test.go +++ b/functions/range_test.go @@ -4,9 +4,9 @@ import ( "testing" "time" - "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/execute" + "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query/plan" "github.com/influxdata/platform/query/plan/plantest" "github.com/influxdata/platform/query/querytest" diff --git a/functions/skew_test.go b/functions/skew_test.go index 9906efb7a8..39b0e4b54f 100644 --- a/functions/skew_test.go +++ b/functions/skew_test.go @@ -4,9 +4,9 @@ import ( "math" "testing" - "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/execute/executetest" + "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query/querytest" ) diff --git a/functions/spread_test.go b/functions/spread_test.go index 86618acc25..41fc951063 100644 --- a/functions/spread_test.go +++ b/functions/spread_test.go @@ -3,9 +3,9 @@ package functions_test import ( "testing" - "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/execute/executetest" + "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query/querytest" ) diff --git a/functions/stddev_test.go b/functions/stddev_test.go index 6e3721c881..df15b4e1dc 100644 --- a/functions/stddev_test.go +++ b/functions/stddev_test.go @@ -4,9 +4,9 @@ import ( "math" "testing" - "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/execute/executetest" + "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query/querytest" ) diff --git a/functions/sum_test.go b/functions/sum_test.go index 1152eb4de9..10bb46cd23 100644 --- a/functions/sum_test.go +++ b/functions/sum_test.go @@ -3,9 +3,9 @@ package functions_test import ( "testing" - "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/execute/executetest" + "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query/plan" "github.com/influxdata/platform/query/plan/plantest" "github.com/influxdata/platform/query/querytest" diff --git a/functions/system_time.go b/functions/system_time.go new file mode 100644 index 0000000000..3340efa03a --- /dev/null +++ b/functions/system_time.go @@ -0,0 +1,29 @@ +package functions + +import ( + "time" + + "github.com/influxdata/platform/query" + "github.com/influxdata/platform/query/semantic" + "github.com/influxdata/platform/query/values" +) + +var systemTimeFuncName = "systemTime" + +func init() { + nowFunc := SystemTime() + query.RegisterBuiltInValue(systemTimeFuncName, nowFunc) +} + +// SystemTime return a function value that when called will give the current system time +func SystemTime() values.Value { + name := systemTimeFuncName + ftype := semantic.NewFunctionType(semantic.FunctionSignature{ + ReturnType: semantic.Time, + }) + call := func(args values.Object) (values.Value, error) { + return values.NewTimeValue(values.ConvertTime(time.Now().UTC())), nil + } + sideEffect := false + return values.NewFunction(name, ftype, call, sideEffect) +} diff --git a/interpreter/interpreter.go b/interpreter/interpreter.go index 0f2932f26a..0b53b07d6e 100644 --- a/interpreter/interpreter.go +++ b/interpreter/interpreter.go @@ -10,19 +10,63 @@ import ( "github.com/pkg/errors" ) -func Eval(program *semantic.Program, scope *Scope) ([]values.Value, error) { - itrp := interpreter{} - err := itrp.eval(program, scope) - return itrp.values, err +// Interpreter used to interpret a Flux program +type Interpreter struct { + values []values.Value + options *Scope + globals *Scope } -type interpreter struct { - values []values.Value +// NewInterpreter instantiates a new Flux Interpreter +func NewInterpreter(options, builtins map[string]values.Value) *Interpreter { + optionScope := NewScopeWithValues(options) + globalScope := optionScope.NestWithValues(builtins) + interpreter := new(Interpreter) + interpreter.options = optionScope + interpreter.globals = globalScope + return interpreter } -func (itrp *interpreter) eval(program *semantic.Program, scope *Scope) error { +// Return gives the return value from the block +func (itrp *Interpreter) Return() values.Value { + return itrp.globals.Return() +} + +// GlobalScope returns a pointer to the global scope of the program. +// That is the scope nested directly below the options scope. +func (itrp *Interpreter) GlobalScope() *Scope { + return itrp.globals +} + +// SetVar adds a variable binding to the global scope +func (itrp *Interpreter) SetVar(name string, val values.Value) { + itrp.globals.Set(name, val) +} + +// SideEffects returns the evaluated expressions of a Flux program +func (itrp *Interpreter) SideEffects() []values.Value { + return itrp.values +} + +// Option returns a Flux option by name +func (itrp *Interpreter) Option(name string) values.Value { + return itrp.options.Get(name) +} + +// SetOption sets a new option binding +func (itrp *Interpreter) SetOption(name string, val values.Value) { + itrp.options.Set(name, val) +} + +// Eval evaluates the expressions composing a Flux program. +func (itrp *Interpreter) Eval(program *semantic.Program) error { + return itrp.eval(program) +} + +func (itrp *Interpreter) eval(program *semantic.Program) error { + topLevelScope := itrp.globals for _, stmt := range program.Body { - val, err := itrp.doStatement(stmt, scope) + val, err := itrp.doStatement(stmt, topLevelScope) if err != nil { return err } @@ -34,11 +78,11 @@ func (itrp *interpreter) eval(program *semantic.Program, scope *Scope) error { } // doStatement returns the resolved value of a top-level statement -func (itrp *interpreter) doStatement(stmt semantic.Statement, scope *Scope) (values.Value, error) { +func (itrp *Interpreter) doStatement(stmt semantic.Statement, scope *Scope) (values.Value, error) { scope.SetReturn(values.InvalidValue) switch s := stmt.(type) { case *semantic.OptionStatement: - return itrp.doStatement(s.Declaration, scope) + return itrp.doOptionStatement(s.Declaration.(*semantic.NativeVariableDeclaration), scope) case *semantic.NativeVariableDeclaration: return itrp.doVariableDeclaration(s, scope) case *semantic.ExpressionStatement: @@ -62,8 +106,8 @@ func (itrp *interpreter) doStatement(stmt semantic.Statement, scope *Scope) (val } } } - // Propgate any return value from the nested scope out. - // Since a return statement is always last we do not have to worry about overriding an existing return value. + // Propgate any return value from the nested scope out. Since a return statement is + // always last we do not have to worry about overriding an existing return value. scope.SetReturn(nested.Return()) case *semantic.ReturnStatement: v, err := itrp.doExpression(s.Argument, scope) @@ -77,7 +121,16 @@ func (itrp *interpreter) doStatement(stmt semantic.Statement, scope *Scope) (val return nil, nil } -func (itrp *interpreter) doVariableDeclaration(declaration *semantic.NativeVariableDeclaration, scope *Scope) (values.Value, error) { +func (itrp *Interpreter) doOptionStatement(declaration *semantic.NativeVariableDeclaration, scope *Scope) (values.Value, error) { + value, err := itrp.doExpression(declaration.Init, scope) + if err != nil { + return nil, err + } + itrp.options.Set(declaration.Identifier.Name, value) + return value, nil +} + +func (itrp *Interpreter) doVariableDeclaration(declaration *semantic.NativeVariableDeclaration, scope *Scope) (values.Value, error) { value, err := itrp.doExpression(declaration.Init, scope) if err != nil { return nil, err @@ -86,7 +139,7 @@ func (itrp *interpreter) doVariableDeclaration(declaration *semantic.NativeVaria return value, nil } -func (itrp *interpreter) doExpression(expr semantic.Expression, scope *Scope) (values.Value, error) { +func (itrp *Interpreter) doExpression(expr semantic.Expression, scope *Scope) (values.Value, error) { switch e := expr.(type) { case semantic.Literal: return itrp.doLiteral(e) @@ -208,7 +261,7 @@ func (itrp *interpreter) doExpression(expr semantic.Expression, scope *Scope) (v } } -func (itrp *interpreter) doArray(a *semantic.ArrayExpression, scope *Scope) (values.Value, error) { +func (itrp *Interpreter) doArray(a *semantic.ArrayExpression, scope *Scope) (values.Value, error) { elements := make([]values.Value, len(a.Elements)) elementType := semantic.EmptyArrayType.ElementType() for i, el := range a.Elements { @@ -227,7 +280,7 @@ func (itrp *interpreter) doArray(a *semantic.ArrayExpression, scope *Scope) (val return values.NewArrayWithBacking(elementType, elements), nil } -func (itrp *interpreter) doObject(m *semantic.ObjectExpression, scope *Scope) (values.Value, error) { +func (itrp *Interpreter) doObject(m *semantic.ObjectExpression, scope *Scope) (values.Value, error) { obj := values.NewObject() for _, p := range m.Properties { v, err := itrp.doExpression(p.Value, scope) @@ -242,7 +295,7 @@ func (itrp *interpreter) doObject(m *semantic.ObjectExpression, scope *Scope) (v return obj, nil } -func (itrp *interpreter) doLiteral(lit semantic.Literal) (values.Value, error) { +func (itrp *Interpreter) doLiteral(lit semantic.Literal) (values.Value, error) { switch l := lit.(type) { case *semantic.DateTimeLiteral: return values.NewTimeValue(values.Time(l.Value.UnixNano())), nil @@ -288,7 +341,7 @@ func DoFunctionCall(f func(args Arguments) (values.Value, error), argsObj values return v, nil } -func (itrp *interpreter) doCall(call *semantic.CallExpression, scope *Scope) (values.Value, error) { +func (itrp *Interpreter) doCall(call *semantic.CallExpression, scope *Scope) (values.Value, error) { callee, err := itrp.doExpression(call.Callee, scope) if err != nil { return nil, err @@ -319,7 +372,7 @@ func (itrp *interpreter) doCall(call *semantic.CallExpression, scope *Scope) (va return value, nil } -func (itrp *interpreter) doArguments(args *semantic.ObjectExpression, scope *Scope) (values.Object, error) { +func (itrp *Interpreter) doArguments(args *semantic.ObjectExpression, scope *Scope) (values.Object, error) { obj := values.NewObject() if args == nil || len(args.Properties) == 0 { return obj, nil @@ -337,6 +390,7 @@ func (itrp *interpreter) doArguments(args *semantic.ObjectExpression, scope *Sco return obj, nil } +// TODO(Josh): Scope methods should be private type Scope struct { parent *Scope values map[string]values.Value @@ -348,9 +402,35 @@ func NewScope() *Scope { values: make(map[string]values.Value), } } -func NewScopeWithValues(values map[string]values.Value) *Scope { +func NewScopeWithValues(vals map[string]values.Value) *Scope { + cp := make(map[string]values.Value, len(vals)) + for k, v := range vals { + cp[k] = v + } return &Scope{ - values: values, + values: cp, + } +} + +func (s *Scope) Get(name string) values.Value { + return s.values[name] +} + +func (s *Scope) Set(name string, value values.Value) { + s.values[name] = value +} + +func (s *Scope) Values() map[string]values.Value { + cp := make(map[string]values.Value, len(s.values)) + for k, v := range s.values { + cp[k] = v + } + return cp +} + +func (s *Scope) SetValues(vals map[string]values.Value) { + for k, v := range vals { + s.values[k] = v } } @@ -365,10 +445,6 @@ func (s *Scope) Lookup(name string) (values.Value, bool) { return v, ok } -func (s *Scope) Set(name string, value values.Value) { - s.values[name] = value -} - // SetReturn sets the return value of this scope. func (s *Scope) SetReturn(value values.Value) { s.returnValue = value @@ -397,6 +473,12 @@ func (s *Scope) Nest() *Scope { return c } +func (s *Scope) NestWithValues(values map[string]values.Value) *Scope { + c := NewScopeWithValues(values) + c.parent = s + return c +} + // Copy returns a copy of the scope and its parents. func (s *Scope) Copy() *Scope { c := NewScope() @@ -455,7 +537,7 @@ type function struct { scope *Scope call func(Arguments) (values.Value, error) - itrp *interpreter + itrp *Interpreter } func (f *function) Type() semantic.Type { diff --git a/interpreter/interpreter_test.go b/interpreter/interpreter_test.go index 50e120ef00..d03fe89b9b 100644 --- a/interpreter/interpreter_test.go +++ b/interpreter/interpreter_test.go @@ -14,15 +14,20 @@ import ( "github.com/influxdata/platform/query/values" ) -var testScope = interpreter.NewScope() +var testScope = make(map[string]values.Value) +var optionScope = make(map[string]values.Value) var testDeclarations = make(semantic.DeclarationScope) var optionsObject = values.NewObject() func addFunc(f *function) { - testScope.Set(f.name, f) + testScope[f.name] = f testDeclarations[f.name] = semantic.NewExternalVariableDeclaration(f.name, f.t) } +func addOption(name string, opt values.Value) { + optionScope[name] = opt +} + func init() { addFunc(&function{ name: "fortyTwo", @@ -90,8 +95,11 @@ func init() { }, hasSideEffect: true, }) + optionsObject.Set("name", values.NewStringValue("foo")) optionsObject.Set("repeat", values.NewIntValue(100)) + + addOption("task", optionsObject) } // TestEval tests whether a program can run to completion or not @@ -337,14 +345,17 @@ func TestEval(t *testing.T) { t.Fatal(err) } - values, err := interpreter.Eval(graph, testScope.Nest()) + // Create new interpreter scope for each test case + itrp := interpreter.NewInterpreter(optionScope, testScope) + + err = itrp.Eval(graph) if !tc.wantErr && err != nil { t.Fatal(err) } else if tc.wantErr && err == nil { t.Fatal("expected error") } - if tc.want != nil && !cmp.Equal(tc.want, values, semantictest.CmpOptions...) { - t.Fatalf("unexpected side effect values -want/+got: \n%s", cmp.Diff(tc.want, values, semantictest.CmpOptions...)) + if tc.want != nil && !cmp.Equal(tc.want, itrp.SideEffects(), semantictest.CmpOptions...) { + t.Fatalf("unexpected side effect values -want/+got: \n%s", cmp.Diff(tc.want, itrp.SideEffects(), semantictest.CmpOptions...)) } }) } @@ -352,7 +363,6 @@ func TestEval(t *testing.T) { } func TestResolver(t *testing.T) { var got semantic.Expression - scope := interpreter.NewScope() declarations := make(semantic.DeclarationScope) f := &function{ name: "resolver", @@ -382,7 +392,7 @@ func TestResolver(t *testing.T) { }, hasSideEffect: false, } - scope.Set(f.name, f) + testScope[f.name] = f declarations[f.name] = semantic.NewExternalVariableDeclaration(f.name, f.t) program, err := parser.NewAST(` @@ -398,7 +408,9 @@ func TestResolver(t *testing.T) { t.Fatal(err) } - if _, err := interpreter.Eval(graph, scope); err != nil { + itrp := interpreter.NewInterpreter(optionScope, testScope) + + if err := itrp.Eval(graph); err != nil { t.Fatal(err) } @@ -425,42 +437,42 @@ type function struct { hasSideEffect bool } -func (f function) Type() semantic.Type { +func (f *function) Type() semantic.Type { return f.t } -func (f function) Str() string { +func (f *function) Str() string { panic(values.UnexpectedKind(semantic.Object, semantic.String)) } -func (f function) Int() int64 { +func (f *function) Int() int64 { panic(values.UnexpectedKind(semantic.Object, semantic.Int)) } -func (f function) UInt() uint64 { +func (f *function) UInt() uint64 { panic(values.UnexpectedKind(semantic.Object, semantic.UInt)) } -func (f function) Float() float64 { +func (f *function) Float() float64 { panic(values.UnexpectedKind(semantic.Object, semantic.Float)) } -func (f function) Bool() bool { +func (f *function) Bool() bool { panic(values.UnexpectedKind(semantic.Object, semantic.Bool)) } -func (f function) Time() values.Time { +func (f *function) Time() values.Time { panic(values.UnexpectedKind(semantic.Object, semantic.Time)) } -func (f function) Duration() values.Duration { +func (f *function) Duration() values.Duration { panic(values.UnexpectedKind(semantic.Object, semantic.Duration)) } -func (f function) Regexp() *regexp.Regexp { +func (f *function) Regexp() *regexp.Regexp { panic(values.UnexpectedKind(semantic.Object, semantic.Regexp)) } -func (f function) Array() values.Array { +func (f *function) Array() values.Array { panic(values.UnexpectedKind(semantic.Object, semantic.Function)) } -func (f function) Object() values.Object { +func (f *function) Object() values.Object { panic(values.UnexpectedKind(semantic.Object, semantic.Object)) } -func (f function) Function() values.Function { - return &f +func (f *function) Function() values.Function { + return f } func (f *function) Equal(rhs values.Value) bool { if f.Type() != rhs.Type() { @@ -469,10 +481,10 @@ func (f *function) Equal(rhs values.Value) bool { v, ok := rhs.(*function) return ok && (f == v) } -func (f function) HasSideEffect() bool { +func (f *function) HasSideEffect() bool { return f.hasSideEffect } -func (f function) Call(args values.Object) (values.Value, error) { +func (f *function) Call(args values.Object) (values.Value, error) { return f.call(args) } diff --git a/options/now.go b/options/now.go new file mode 100644 index 0000000000..2ac9f8124e --- /dev/null +++ b/options/now.go @@ -0,0 +1,10 @@ +package options + +import ( + "github.com/influxdata/platform/query" + "github.com/influxdata/platform/query/functions" +) + +func init() { + query.RegisterBuiltInOption("now", functions.SystemTime()) +} diff --git a/plan/logical.go b/plan/logical.go index e2cab430c6..7375876b7e 100644 --- a/plan/logical.go +++ b/plan/logical.go @@ -2,6 +2,7 @@ package plan import ( "fmt" + "time" "github.com/influxdata/platform/query" uuid "github.com/satori/go.uuid" @@ -14,6 +15,7 @@ type LogicalPlanSpec struct { Procedures map[ProcedureID]*Procedure Order []ProcedureID Resources query.ResourceManagement + Now time.Time } func (lp *LogicalPlanSpec) Do(f func(pr *Procedure)) { @@ -49,6 +51,7 @@ func (p *logicalPlanner) Plan(q *query.Spec) (*LogicalPlanSpec, error) { if err != nil { return nil, err } + p.plan.Now = q.Now return p.plan, nil } diff --git a/plan/logical_test.go b/plan/logical_test.go index d1dbc1c2dc..84de4e8707 100644 --- a/plan/logical_test.go +++ b/plan/logical_test.go @@ -6,8 +6,8 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query" + "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query/plan" "github.com/influxdata/platform/query/plan/plantest" ) @@ -171,6 +171,10 @@ func TestLogicalPlanner_Plan(t *testing.T) { for i, tc := range testCases { tc := tc t.Run(strconv.Itoa(i), func(t *testing.T) { + // Set Now time on query spec + tc.q.Now = time.Now().UTC() + tc.ap.Now = tc.q.Now + planner := plan.NewLogicalPlanner() got, err := planner.Plan(tc.q) if err != nil { diff --git a/plan/physical.go b/plan/physical.go index 37e0d5fe65..417b8ef536 100644 --- a/plan/physical.go +++ b/plan/physical.go @@ -14,15 +14,16 @@ import ( const DefaultYieldName = "_result" type PlanSpec struct { - // Now represents the relative currentl time of the plan. + // Now represents the relative current time of the plan. Now time.Time Bounds BoundsSpec + // Procedures is a set of all operations Procedures map[ProcedureID]*Procedure Order []ProcedureID - // Results is a list of datasets that are the result of the plan - Results map[string]YieldSpec + // Results is a list of datasets that are the result of the plan + Results map[string]YieldSpec Resources query.ResourceManagement } @@ -42,7 +43,7 @@ func (p *PlanSpec) lookup(id ProcedureID) *Procedure { type Planner interface { // Plan create a plan from the logical plan and available storage. - Plan(p *LogicalPlanSpec, s Storage, now time.Time) (*PlanSpec, error) + Plan(p *LogicalPlanSpec, s Storage) (*PlanSpec, error) } type PlanRewriter interface { @@ -61,7 +62,9 @@ func NewPlanner() Planner { return new(planner) } -func (p *planner) Plan(lp *LogicalPlanSpec, s Storage, now time.Time) (*PlanSpec, error) { +func (p *planner) Plan(lp *LogicalPlanSpec, s Storage) (*PlanSpec, error) { + now := lp.Now + p.plan = &PlanSpec{ Now: now, Procedures: make(map[ProcedureID]*Procedure, len(lp.Procedures)), diff --git a/plan/physical_test.go b/plan/physical_test.go index 0107d0ee39..db2d1e80b0 100644 --- a/plan/physical_test.go +++ b/plan/physical_test.go @@ -1050,12 +1050,14 @@ func TestPhysicalPlanner_Plan_PushDown_Mixed(t *testing.T) { func PhysicalPlanTestHelper(t *testing.T, lp *plan.LogicalPlanSpec, want *plan.PlanSpec) { t.Helper() + // Setup expected now time now := time.Now() + lp.Now = now want.Now = now planner := plan.NewPlanner() - got, err := planner.Plan(lp, nil, now) + got, err := planner.Plan(lp, nil) if err != nil { t.Fatal(err) } @@ -1076,10 +1078,11 @@ func BenchmarkPhysicalPlan(b *testing.B) { if err != nil { b.Fatal(err) } - planner := plan.NewPlanner() now := time.Date(2017, 8, 8, 0, 0, 0, 0, time.UTC) + lp.Now = now + planner := plan.NewPlanner() for n := 0; n < b.N; n++ { - benchmarkPhysicalPlan, err = planner.Plan(lp, nil, now) + benchmarkPhysicalPlan, err = planner.Plan(lp, nil) if err != nil { b.Fatal(err) } @@ -1097,7 +1100,8 @@ func BenchmarkQueryToPhysicalPlan(b *testing.B) { if err != nil { b.Fatal(err) } - benchmarkQueryToPhysicalPlan, err = pp.Plan(lp, nil, now) + lp.Now = now + benchmarkQueryToPhysicalPlan, err = pp.Plan(lp, nil) if err != nil { b.Fatal(err) } diff --git a/query_test.go b/query_test.go index 47191510e2..507a9148b3 100644 --- a/query_test.go +++ b/query_test.go @@ -3,16 +3,26 @@ package query_test import ( "encoding/json" "errors" + "fmt" + "os" "strconv" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query" + "github.com/influxdata/platform/query/functions" + _ "github.com/influxdata/platform/query/options" + "github.com/influxdata/platform/query/parser" + "github.com/influxdata/platform/query/semantic" + "github.com/influxdata/platform/query/values" ) +func init() { + query.FinalizeBuiltIns() +} + var ignoreUnexportedQuerySpec = cmpopts.IgnoreUnexported(query.Spec{}) func TestQuery_JSON(t *testing.T) { @@ -266,3 +276,93 @@ func TestQuery_Walk(t *testing.T) { }) } } + +// Example_option demonstrates retrieving an option from the Flux interpreter +func Example_option() { + // Instantiate a new Flux interpreter with pre-populated option and global scopes + itrp := query.NewInterpreter() + + // Retrieve the default value for an option + nowFunc := itrp.Option("now") + + // The now option is a function value whose default behavior is to return + // the current system time when called. The function now() doesn't take + // any arguments so can be called with nil. + nowTime, _ := nowFunc.Function().Call(nil) + fmt.Fprintf(os.Stderr, "The current system time (UTC) is: %v\n", nowTime) + // Output: +} + +// Example_setOption demonstrates setting an option from the Flux interpreter +func Example_setOption() { + // Instantiate a new Flux interpreter with pre-populated option and global scopes + itrp := query.NewInterpreter() + + // Set a new option from the interpreter + itrp.SetOption("dummy_option", values.NewIntValue(3)) + + fmt.Printf("dummy_option = %d", itrp.Option("dummy_option").Int()) + // Output: dummy_option = 3 +} + +// Example_overrideDefaultOptionExternally demonstrates how declaring an option +// in a Flux script will change that option's binding in the options scope of the interpreter. +func Example_overrideDefaultOptionExternally() { + queryString := ` + now = () => 2018-07-13T00:00:00Z + what_time_is_it = now()` + + itrp := query.NewInterpreter() + _, declarations := query.BuiltIns() + + ast, _ := parser.NewAST(queryString) + semanticProgram, _ := semantic.New(ast, declarations) + + // Evaluate program + itrp.Eval(semanticProgram) + + // After evaluating the program, lookup the value of what_time_is_it + now, _ := itrp.GlobalScope().Lookup("what_time_is_it") + + // what_time_is_it? Why it's .... + fmt.Printf("The new current time (UTC) is: %v", now) + // Output: The new current time (UTC) is: 2018-07-13T00:00:00.000000000Z +} + +// Example_overrideDefaultOptionInternally demonstrates how one can override a default +// option that is used in a query before that query is evaluated by the interpreter. +func Example_overrideDefaultOptionInternally() { + queryString := `what_time_is_it = now()` + + itrp := query.NewInterpreter() + _, declarations := query.BuiltIns() + + ast, _ := parser.NewAST(queryString) + semanticProgram, _ := semantic.New(ast, declarations) + + // Define a new now function which returns a static time value of 2018-07-13T00:00:00.000000000Z + timeValue := time.Date(2018, 7, 13, 0, 0, 0, 0, time.UTC) + functionName := "newTime" + functionType := semantic.NewFunctionType(semantic.FunctionSignature{ + ReturnType: semantic.Time, + }) + functionCall := func(args values.Object) (values.Value, error) { + return values.NewTimeValue(values.ConvertTime(timeValue)), nil + } + sideEffect := false + + newNowFunc := values.NewFunction(functionName, functionType, functionCall, sideEffect) + + // Override the default now function with the new one + itrp.SetOption("now", newNowFunc) + + // Evaluate program + itrp.Eval(semanticProgram) + + // After evaluating the program, lookup the value of what_time_is_it + now, _ := itrp.GlobalScope().Lookup("what_time_is_it") + + // what_time_is_it? Why it's .... + fmt.Printf("The new current time (UTC) is: %v", now) + // Output: The new current time (UTC) is: 2018-07-13T00:00:00.000000000Z +} diff --git a/querytest/compile.go b/querytest/compile.go index cdea2dddc4..974e08a28b 100644 --- a/querytest/compile.go +++ b/querytest/compile.go @@ -1,13 +1,14 @@ package querytest import ( - "github.com/influxdata/platform/query/functions" "context" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/influxdata/platform/query" + "github.com/influxdata/platform/query/functions" "github.com/influxdata/platform/query/semantic/semantictest" ) @@ -29,7 +30,8 @@ var opts = append( func NewQueryTestHelper(t *testing.T, tc NewQueryTestCase) { t.Helper() - got, err := query.Compile(context.Background(), tc.Raw) + now := time.Now().UTC() + got, err := query.Compile(context.Background(), tc.Raw, now) if (err != nil) != tc.WantErr { t.Errorf("query.NewQuery() error = %v, wantErr %v", err, tc.WantErr) return @@ -37,6 +39,9 @@ func NewQueryTestHelper(t *testing.T, tc NewQueryTestCase) { if tc.WantErr { return } + if tc.Want != nil { + tc.Want.Now = now + } if !cmp.Equal(tc.Want, got, opts...) { t.Errorf("query.NewQuery() = -want/+got %s", cmp.Diff(tc.Want, got, opts...)) } diff --git a/querytest/compspecs/compspecs.go b/querytest/compspecs/compspecs.go index 02614b4b57..1c0e6aae09 100644 --- a/querytest/compspecs/compspecs.go +++ b/querytest/compspecs/compspecs.go @@ -8,6 +8,7 @@ import ( "path/filepath" "regexp" "strings" + "time" "github.com/google/go-cmp/cmp/cmpopts" "github.com/influxdata/platform/query" @@ -62,7 +63,7 @@ func main() { return } - fluxSpec, err := query.Compile(context.Background(), string(fluxText)) + fluxSpec, err := query.Compile(context.Background(), string(fluxText), time.Now().UTC()) if err != nil { fmt.Printf("error compiling. \n query: \n %s \n err: %s", string(fluxText), err) return diff --git a/repl/repl.go b/repl/repl.go index 77826edb76..e70b5df02a 100644 --- a/repl/repl.go +++ b/repl/repl.go @@ -30,7 +30,7 @@ import ( type REPL struct { orgID platform.ID - scope *interpreter.Scope + interpreter *interpreter.Interpreter declarations semantic.DeclarationScope c *control.Controller @@ -38,7 +38,7 @@ type REPL struct { cancelFunc context.CancelFunc } -func addBuiltIn(script string, scope *interpreter.Scope, declarations semantic.DeclarationScope) error { +func addBuiltIn(script string, itrp *interpreter.Interpreter, declarations semantic.DeclarationScope) error { astProg, err := parser.NewAST(script) if err != nil { return errors.Wrap(err, "failed to parse builtin") @@ -48,20 +48,21 @@ func addBuiltIn(script string, scope *interpreter.Scope, declarations semantic.D return errors.Wrap(err, "failed to create semantic graph for builtin") } - if _, err := interpreter.Eval(semProg, scope); err != nil { + if err := itrp.Eval(semProg); err != nil { return errors.Wrap(err, "failed to evaluate builtin") } return nil } func New(c *control.Controller, orgID platform.ID) *REPL { - scope, declarations := query.BuiltIns() - interpScope := interpreter.NewScopeWithValues(scope) - addBuiltIn("run = () => yield(table:_)", interpScope, declarations) + itrp := query.NewInterpreter() + _, decls := query.BuiltIns() + addBuiltIn("run = () => yield(table:_)", itrp, decls) + return &REPL{ orgID: orgID, - scope: interpScope, - declarations: declarations, + interpreter: itrp, + declarations: decls, c: c, } } @@ -102,7 +103,7 @@ func (r *REPL) clearCancel() { } func (r *REPL) completer(d prompt.Document) []prompt.Suggest { - names := r.scope.Names() + names := r.interpreter.GlobalScope().Names() sort.Strings(names) s := make([]prompt.Suggest, 0, len(names)) @@ -170,11 +171,11 @@ func (r *REPL) executeLine(t string, expectYield bool) (values.Value, error) { return nil, err } - if _, err := interpreter.Eval(semProg, r.scope); err != nil { + if err := r.interpreter.Eval(semProg); err != nil { return nil, err } - v := r.scope.Return() + v := r.interpreter.Return() // Check for yield and execute query if v.Type() == query.TableObjectType { @@ -186,7 +187,7 @@ func (r *REPL) executeLine(t string, expectYield bool) (values.Value, error) { } } - r.scope.Set("_", v) + r.interpreter.SetVar("_", v) // Print value if v.Type() != semantic.Invalid { diff --git a/spec.go b/spec.go index 5fed512ce0..a0b405ef7b 100644 --- a/spec.go +++ b/spec.go @@ -2,6 +2,7 @@ package query import ( "fmt" + "time" "github.com/pkg/errors" ) @@ -11,6 +12,7 @@ type Spec struct { Operations []*Operation `json:"operations"` Edges []Edge `json:"edges"` Resources ResourceManagement `json:"resources"` + Now time.Time `json:"now"` sorted []*Operation children map[OperationID][]*Operation diff --git a/values/function.go b/values/function.go new file mode 100644 index 0000000000..0168b92d86 --- /dev/null +++ b/values/function.go @@ -0,0 +1,96 @@ +package values + +import ( + "regexp" + + "github.com/influxdata/platform/query/semantic" +) + +// Function represents a callable type +type Function interface { + Value + HasSideEffect() bool + Call(args Object) (Value, error) +} + +// NewFunction returns a new function value +func NewFunction(name string, typ semantic.Type, call func(args Object) (Value, error), sideEffect bool) Function { + return &function{ + name: name, + t: typ, + call: call, + hasSideEffect: sideEffect, + } +} + +// function implements Value interface and more specifically the Function interface +type function struct { + name string + t semantic.Type + call func(args Object) (Value, error) + hasSideEffect bool +} + +func (f *function) Type() semantic.Type { + return f.t +} + +func (f *function) Str() string { + panic(UnexpectedKind(semantic.Object, semantic.String)) +} + +func (f *function) Int() int64 { + panic(UnexpectedKind(semantic.Object, semantic.Int)) +} + +func (f *function) UInt() uint64 { + panic(UnexpectedKind(semantic.Object, semantic.UInt)) +} + +func (f *function) Float() float64 { + panic(UnexpectedKind(semantic.Object, semantic.Float)) +} + +func (f *function) Bool() bool { + panic(UnexpectedKind(semantic.Object, semantic.Bool)) +} + +func (f *function) Time() Time { + panic(UnexpectedKind(semantic.Object, semantic.Time)) +} + +func (f *function) Duration() Duration { + panic(UnexpectedKind(semantic.Object, semantic.Duration)) +} + +func (f *function) Regexp() *regexp.Regexp { + panic(UnexpectedKind(semantic.Object, semantic.Regexp)) +} + +func (f *function) Array() Array { + panic(UnexpectedKind(semantic.Object, semantic.Function)) +} + +func (f *function) Object() Object { + panic(UnexpectedKind(semantic.Object, semantic.Object)) +} + +func (f *function) Function() Function { + return f +} + +func (f *function) Equal(rhs Value) bool { + if f.Type() != rhs.Type() { + return false + } + v, ok := rhs.(*function) + return ok && (f == v) +} + +func (f *function) HasSideEffect() bool { + return f.hasSideEffect +} + +func (f *function) Call(args Object) (Value, error) { + return f.call(args) +} diff --git a/values/values.go b/values/values.go index 00990812e0..a7872b76d8 100644 --- a/values/values.go +++ b/values/values.go @@ -27,13 +27,6 @@ type Value interface { Equal(Value) bool } -// Function represents a callable type -type Function interface { - Value - HasSideEffect() bool - Call(args Object) (Value, error) -} - type value struct { t semantic.Type v interface{}