Skip to content

Commit

Permalink
flux options interface
Browse files Browse the repository at this point in the history
default now option value
  • Loading branch information
jlapacik committed Jul 16, 2018
1 parent cdb915b commit 801a00b
Show file tree
Hide file tree
Showing 29 changed files with 513 additions and 113 deletions.
1 change: 1 addition & 0 deletions builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/influxdata/platform/query/complete"
_ "github.com/influxdata/platform/query/functions" // Import the built-in functions
"github.com/influxdata/platform/query/interpreter"
_ "github.com/influxdata/platform/query/options" // Import the built-in options
)

func init() {
Expand Down
88 changes: 70 additions & 18 deletions compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
TableParameter = "table"
tableKindKey = "kind"
tableParentsKey = "parents"
nowOption = "now"
//tableSpecKey = "spec"
)

Expand All @@ -35,7 +36,8 @@ type options struct {
}

// Compile evaluates a Flux script producing a query Spec.
func Compile(ctx context.Context, q string, opts ...Option) (*Spec, error) {
// now parameter must be non-zero, that is the default now time should be set before compiling.
func Compile(ctx context.Context, q string, now time.Time, opts ...Option) (*Spec, error) {
o := new(options)
for _, opt := range opts {
opt(o)
Expand All @@ -49,38 +51,70 @@ func Compile(ctx context.Context, q string, opts ...Option) (*Spec, error) {
s, _ = opentracing.StartSpanFromContext(ctx, "compile")
defer s.Finish()

scope, decls := builtIns()
interpScope := interpreter.NewScopeWithValues(scope)
itrp := NewInterpreter()
itrp.SetOption(nowOption, nowFunc(now))

_, decls := builtIns(itrp)

// Convert AST program to a semantic program
semProg, err := semantic.New(astProg, decls)
if err != nil {
return nil, err
}

operations, err := interpreter.Eval(semProg, interpScope)
if err != nil {
if err := itrp.Eval(semProg); err != nil {
return nil, err
}
spec := toSpec(operations)
spec := toSpec(itrp)

if o.verbose {
log.Println("Query Spec: ", Formatted(spec, FmtJSON))
}
return spec, nil
}

func toSpec(stmtVals []values.Value) *Spec {
// NewInterpreter returns an interpreter instance with
// pre-constructed options and global scopes.
func NewInterpreter() *interpreter.Interpreter {
options := make(map[string]values.Value, len(builtinOptions))
globals := make(map[string]values.Value, len(builtinScope))

for k, v := range builtinScope {
globals[k] = v
}

for k, v := range builtinOptions {
options[k] = v
}

return interpreter.NewInterpreter(options, globals)
}

func nowFunc(now time.Time) values.Function {
timeVal := values.NewTimeValue(values.ConvertTime(now))
ftype := semantic.NewFunctionType(semantic.FunctionSignature{
ReturnType: semantic.Time,
})
call := func(args values.Object) (values.Value, error) {
return timeVal, nil
}
sideEffect := false
return values.NewFunction(nowOption, ftype, call, sideEffect)
}

func toSpec(itrp *interpreter.Interpreter) *Spec {
operations := itrp.SideEffects()

ider := &ider{
id: 0,
lookup: make(map[*TableObject]OperationID),
}

spec := new(Spec)
visited := make(map[*TableObject]bool)
nodes := make([]*TableObject, 0, len(stmtVals))
nodes := make([]*TableObject, 0, len(operations))

for _, val := range stmtVals {
for _, val := range operations {
if op, ok := val.(*TableObject); ok {
dup := false
for _, node := range nodes {
Expand All @@ -95,12 +129,23 @@ func toSpec(stmtVals []values.Value) *Spec {
}
}
}

// now option is Time value
nowValue, _ := itrp.Option(nowOption).Function().Call(nil)
spec.Now = nowValue.Time().Time()

return spec
}

type CreateOperationSpec func(args Arguments, a *Administration) (OperationSpec, error)

var builtinScope = make(map[string]values.Value)

// TODO(Josh): Default option values should be registered similarly to built-in
// functions. Default options should be registered in their own files
// (or in a single file) using the RegisterBuiltInOption function which will
// place the resolved option value in the following map.
var builtinOptions = make(map[string]values.Value)
var builtinDeclarations = make(semantic.DeclarationScope)

// list of builtin scripts
Expand Down Expand Up @@ -150,6 +195,17 @@ func RegisterBuiltInValue(name string, v values.Value) {
builtinScope[name] = v
}

// RegisterBuiltInOption adds the value to the builtin scope.
func RegisterBuiltInOption(name string, v values.Value) {
if finalized {
panic(errors.New("already finalized, cannot register builtin option"))
}
if _, ok := builtinOptions[name]; ok {
panic(fmt.Errorf("duplicate registration for builtin option %q", name))
}
builtinOptions[name] = v
}

// FinalizeBuiltIns must be called to complete registration.
// Future calls to RegisterFunction, RegisterBuiltIn or RegisterBuiltInValue will panic.
func FinalizeBuiltIns() {
Expand Down Expand Up @@ -373,16 +429,12 @@ func BuiltIns() (map[string]values.Value, semantic.DeclarationScope) {
if !finalized {
panic("builtins not finalized")
}
return builtIns()
return builtIns(NewInterpreter())
}

func builtIns() (map[string]values.Value, semantic.DeclarationScope) {
func builtIns(itrp *interpreter.Interpreter) (map[string]values.Value, semantic.DeclarationScope) {
decls := builtinDeclarations.Copy()
scope := make(map[string]values.Value, len(builtinScope))
for k, v := range builtinScope {
scope[k] = v
}
interpScope := interpreter.NewScopeWithValues(scope)

for name, script := range builtins {
astProg, err := parser.NewAST(script)
if err != nil {
Expand All @@ -393,11 +445,11 @@ func builtIns() (map[string]values.Value, semantic.DeclarationScope) {
panic(errors.Wrapf(err, "failed to create semantic graph for builtin %q", name))
}

if _, err := interpreter.Eval(semProg, interpScope); err != nil {
if err := itrp.Eval(semProg); err != nil {
panic(errors.Wrapf(err, "failed to evaluate builtin %q", name))
}
}
return scope, decls
return itrp.GlobalScope().Values(), decls
}

type Administration struct {
Expand Down
4 changes: 2 additions & 2 deletions control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *Controller) compileQuery(q *Query, queryStr string) error {
if !q.tryCompile() {
return errors.New("failed to transition query to compiling state")
}
spec, err := query.Compile(q.compilingCtx, queryStr, query.Verbose(c.verbose))
spec, err := query.Compile(q.compilingCtx, queryStr, q.now, query.Verbose(c.verbose))
if err != nil {
return errors.Wrap(err, "failed to compile query")
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func (c *Controller) processQuery(q *Query) (pop bool, err error) {
log.Println("logical plan", plan.Formatted(lp))
}

p, err := c.pplanner.Plan(lp, nil, q.now)
p, err := c.pplanner.Plan(lp, nil)
if err != nil {
return true, errors.Wrap(err, "failed to create physical plan")
}
Expand Down
2 changes: 1 addition & 1 deletion functions/count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"testing"
"time"

"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/plan/plantest"
"github.com/influxdata/platform/query/querytest"
Expand Down
2 changes: 1 addition & 1 deletion functions/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func createJoinOpSpec(args query.Arguments, a *query.Administration) (query.Oper
return
}
p := t.(*query.TableObject)
joinParams.add(k /*parameter*/, p /*argument*/)
joinParams.add(k, p)
spec.tableNames[p] = k
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion functions/mean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"math"
"testing"

"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/querytest"
)

Expand Down
2 changes: 1 addition & 1 deletion functions/percentile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"math"
"testing"

"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/querytest"
)

Expand Down
3 changes: 2 additions & 1 deletion functions/prepcsvtests/prepcsvtests.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"regexp"
"strings"
"time"

"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
Expand Down Expand Up @@ -69,7 +70,7 @@ func main() {
}

qs := querytest.GetQueryServiceBridge()
qspec, err := query.Compile(context.Background(), string(querytext))
qspec, err := query.Compile(context.Background(), string(querytext), time.Now().UTC())
if err != nil {
fmt.Printf("error compiling. \n query: \n %s \n err: %s", string(querytext), err)
return
Expand Down
3 changes: 2 additions & 1 deletion functions/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
Expand Down Expand Up @@ -83,7 +84,7 @@ func queryTester(t *testing.T, qs query.QueryService, prefix, queryExt string) e
t.Fatal(err)
}

spec, err := query.Compile(context.Background(), q)
spec, err := query.Compile(context.Background(), q, time.Now().UTC())
if err != nil {
t.Fatalf("failed to compile: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion functions/range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"testing"
"time"

"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/plan/plantest"
"github.com/influxdata/platform/query/querytest"
Expand Down
2 changes: 1 addition & 1 deletion functions/skew_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"math"
"testing"

"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/querytest"
)

Expand Down
2 changes: 1 addition & 1 deletion functions/spread_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package functions_test
import (
"testing"

"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/querytest"
)

Expand Down
2 changes: 1 addition & 1 deletion functions/stddev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"math"
"testing"

"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/querytest"
)

Expand Down
2 changes: 1 addition & 1 deletion functions/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package functions_test
import (
"testing"

"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/execute/executetest"
"github.com/influxdata/platform/query/functions"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/plan/plantest"
"github.com/influxdata/platform/query/querytest"
Expand Down
29 changes: 29 additions & 0 deletions functions/system_time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package functions

import (
"time"

"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/semantic"
"github.com/influxdata/platform/query/values"
)

var systemTimeFuncName = "systemTime"

func init() {
nowFunc := SystemTime()
query.RegisterBuiltInValue(systemTimeFuncName, nowFunc)
}

// SystemTime return a function value that when called will give the current system time
func SystemTime() values.Value {
name := systemTimeFuncName
ftype := semantic.NewFunctionType(semantic.FunctionSignature{
ReturnType: semantic.Time,
})
call := func(args values.Object) (values.Value, error) {
return values.NewTimeValue(values.ConvertTime(time.Now().UTC())), nil
}
sideEffect := false
return values.NewFunction(name, ftype, call, sideEffect)
}
Loading

0 comments on commit 801a00b

Please sign in to comment.