From 32fe18dbcdc6ea4a30cf467231c84cdbf8d1bd54 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Mon, 22 Aug 2022 09:33:02 -0500 Subject: [PATCH] refactor: move spec details into an internal package (#5116) The spec was something that originally existed to accomodate transpilers. We later decided transpilers should just use the AST rather than something internal like the spec and the repl became the only thing that uses the spec. This moves the spec to an internal package so it stays as an internal detail and does not exist as part of the external Go API. This also removes the ider interface. It didn't appear to be used for anything other than an implementation inside of the spec, so this just removes it. The transformations that needed to know the ids of their parents didn't use the operation ids anyway so it didn't matter. --- compile.go | 47 +++-------- execute/concurrency_quota_test.go | 4 +- spec.go => internal/operation/spec.go | 66 ++++++++++------ .../operation/spec_test.go | 78 +++++++++---------- internal/spec/build.go | 36 +++++---- internal/spec/build_test.go | 70 ++++++++--------- lang/compiler.go | 5 +- operation.go | 20 ----- plan/builder.go | 4 +- plan/logical.go | 16 ++-- plan/logical_test.go | 3 +- plan/plantest/cmp.go | 6 +- plan/types.go | 3 +- querytest/compile.go | 9 ++- repl/compiler.go | 3 +- repl/repl.go | 3 +- .../tomhollingworth/events/duration_test.go | 19 ++--- stdlib/csv/from_test.go | 7 +- stdlib/experimental/group_test.go | 7 +- stdlib/experimental/join.go | 9 +-- stdlib/experimental/mqtt/to_test.go | 13 ++-- stdlib/generate/from_test.go | 9 ++- stdlib/influxdata/influxdb/buckets_test.go | 17 ++-- .../influxdata/influxdb/cardinality_test.go | 19 ++--- stdlib/influxdata/influxdb/from_test.go | 19 ++--- .../influxdata/influxdb/v1/databases_test.go | 17 ++-- .../influxdb/v1/from_influx_json_test.go | 6 +- stdlib/influxdata/influxdb/wide_to_test.go | 7 +- stdlib/join/equijoin_test.go | 4 +- stdlib/kafka/to_test.go | 7 +- stdlib/socket/from_test.go | 7 +- stdlib/sql/to_test.go | 13 ++-- stdlib/universe/columns_test.go | 13 ++-- stdlib/universe/count_test.go | 7 +- stdlib/universe/covariance_test.go | 19 ++--- stdlib/universe/fill_test.go | 7 +- stdlib/universe/filter_test.go | 55 ++++++------- stdlib/universe/group_test.go | 31 ++++---- stdlib/universe/holt_winters_test.go | 13 ++-- stdlib/universe/join_test.go | 13 ++-- stdlib/universe/keys_test.go | 13 ++-- stdlib/universe/map_test.go | 19 ++--- stdlib/universe/pivot_test.go | 7 +- stdlib/universe/quantile_test.go | 37 ++++----- stdlib/universe/range_test.go | 13 ++-- stdlib/universe/schema_functions_test.go | 43 +++++----- stdlib/universe/state_tracking_test.go | 13 ++-- stdlib/universe/union_test.go | 9 ++- stdlib/universe/window_test.go | 7 +- stdlib/universe/yield_test.go | 59 +++++++------- 50 files changed, 472 insertions(+), 459 deletions(-) rename spec.go => internal/operation/spec.go (70%) rename spec_test.go => internal/operation/spec_test.go (67%) diff --git a/compile.go b/compile.go index d5f07a4244..960ed2a991 100644 --- a/compile.go +++ b/compile.go @@ -66,17 +66,6 @@ func functionValue(name string, c CreateOperationSpec, mt semantic.MonoType, sid var _ = tableSpecKey // So that linter doesn't think tableSpecKey is unused, considering above TODO. -// IDer produces the mapping of table Objects to OperationIDs -type IDer interface { - ID(*TableObject) OperationID -} - -// IDerOpSpec is the interface any operation spec that needs -// access to OperationIDs in the query spec must implement. -type IDerOpSpec interface { - IDer(ider IDer) -} - // TableObject represents the value returned by a transformation. // As such, it holds the OperationSpec of the transformation it is associated with, // and it is a values.Value (and, also, a values.Object). @@ -84,25 +73,16 @@ type IDerOpSpec interface { type TableObject struct { // TODO(Josh): Remove args once the // OperationSpec interface has an Equal method. - t semantic.MonoType - args Arguments - Kind OperationKind - Spec OperationSpec - Source OperationSource + t semantic.MonoType + args Arguments + Kind OperationKind + Spec OperationSpec + Source struct { + Stack []interpreter.StackEntry + } Parents []*TableObject } -func (t *TableObject) Operation(ider IDer) *Operation { - if iderOpSpec, ok := t.Spec.(IDerOpSpec); ok { - iderOpSpec.IDer(ider) - } - - return &Operation{ - ID: ider.ID(t), - Spec: t.Spec, - Source: t.Source, - } -} func (t *TableObject) IsNull() bool { return false } @@ -326,17 +306,14 @@ func (f *function) call(ctx context.Context, args interpreter.Arguments) (values return nil, err } - stack := interpreter.Stack(ctx) t := &TableObject{ - t: returnType, - args: arguments, - Kind: spec.Kind(), - Spec: spec, - Source: OperationSource{ - Stack: stack, - }, + t: returnType, + args: arguments, + Kind: spec.Kind(), + Spec: spec, Parents: a.parents, } + t.Source.Stack = interpreter.Stack(ctx) return t, nil } func (f *function) String() string { diff --git a/execute/concurrency_quota_test.go b/execute/concurrency_quota_test.go index dddaea8db1..59332f5a38 100644 --- a/execute/concurrency_quota_test.go +++ b/execute/concurrency_quota_test.go @@ -5,10 +5,10 @@ import ( "testing" "time" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/plan" "go.uber.org/zap/zaptest" - "github.com/influxdata/flux" "github.com/influxdata/flux/dependencies/dependenciestest" "github.com/influxdata/flux/dependency" "github.com/influxdata/flux/execute" @@ -108,7 +108,7 @@ func (rule parallelizeFromTo) Rewrite(ctx context.Context, pn plan.Node) (plan.N type flagger map[string]interface{} -func compile(fluxText string, now time.Time) (context.Context, *flux.Spec, error) { +func compile(fluxText string, now time.Time) (context.Context, *operation.Spec, error) { ctx, deps := dependency.Inject(context.Background(), dependenciestest.Default()) defer deps.Finish() spec, err := spec.FromScript(ctx, runtime.Default, now, fluxText) diff --git a/spec.go b/internal/operation/spec.go similarity index 70% rename from spec.go rename to internal/operation/spec.go index 778cabd154..0ae0ffdb60 100644 --- a/spec.go +++ b/internal/operation/spec.go @@ -1,34 +1,52 @@ -package flux +package operation import ( "time" + "github.com/influxdata/flux" "github.com/influxdata/flux/codes" "github.com/influxdata/flux/internal/errors" + "github.com/influxdata/flux/interpreter" ) +// Node denotes a single operation in a query. +type Node struct { + ID NodeID `json:"id"` + Spec flux.OperationSpec `json:"spec"` + Source NodeSource `json:"source"` +} + +// NodeSource specifies the source location that created +// an operation. +type NodeSource struct { + Stack []interpreter.StackEntry `json:"stack"` +} + +// NodeID is a unique ID within a query for the operation. +type NodeID string + // Spec specifies a query. type Spec struct { - Operations []*Operation `json:"operations"` - Edges []Edge `json:"edges"` - Resources ResourceManagement `json:"resources"` - Now time.Time `json:"now"` - - sorted []*Operation - children map[OperationID][]*Operation - parents map[OperationID][]*Operation + Operations []*Node `json:"operations"` + Edges []Edge `json:"edges"` + Resources flux.ResourceManagement `json:"resources"` + Now time.Time `json:"now"` + + sorted []*Node + children map[NodeID][]*Node + parents map[NodeID][]*Node } // Edge is a data flow relationship between a parent and a child type Edge struct { - Parent OperationID `json:"parent"` - Child OperationID `json:"child"` + Parent NodeID `json:"parent"` + Child NodeID `json:"child"` } // Walk calls f on each operation exactly once. // The function f will be called on an operation only after // all of its parents have already been passed to f. -func (q *Spec) Walk(f func(o *Operation) error) error { +func (q *Spec) Walk(f func(o *Node) error) error { if len(q.sorted) == 0 { if err := q.prepare(); err != nil { return err @@ -53,7 +71,7 @@ func (q *Spec) Validate() error { // Children returns a list of children for a given operation. // If the query is invalid no children will be returned. -func (q *Spec) Children(id OperationID) []*Operation { +func (q *Spec) Children(id NodeID) []*Node { if q.children == nil { err := q.prepare() if err != nil { @@ -65,7 +83,7 @@ func (q *Spec) Children(id OperationID) []*Operation { // Parents returns a list of parents for a given operation. // If the query is invalid no parents will be returned. -func (q *Spec) Parents(id OperationID) []*Operation { +func (q *Spec) Parents(id NodeID) []*Node { if q.parents == nil { err := q.prepare() if err != nil { @@ -91,23 +109,23 @@ func (q *Spec) prepare() error { q.parents = parents q.children = children - tMarks := make(map[OperationID]bool) - pMarks := make(map[OperationID]bool) + tMarks := make(map[NodeID]bool) + pMarks := make(map[NodeID]bool) for _, r := range roots { if err := q.visit(tMarks, pMarks, r); err != nil { return err } } - //reverse q.sorted + // reverse q.sorted for i, j := 0, len(q.sorted)-1; i < j; i, j = i+1, j-1 { q.sorted[i], q.sorted[j] = q.sorted[j], q.sorted[i] } return nil } -func (q *Spec) computeLookup() (map[OperationID]*Operation, error) { - lookup := make(map[OperationID]*Operation, len(q.Operations)) +func (q *Spec) computeLookup() (map[NodeID]*Node, error) { + lookup := make(map[NodeID]*Node, len(q.Operations)) for _, o := range q.Operations { if _, ok := lookup[o.ID]; ok { return nil, errors.Newf(codes.Internal, "found duplicate operation ID %q", o.ID) @@ -117,13 +135,13 @@ func (q *Spec) computeLookup() (map[OperationID]*Operation, error) { return lookup, nil } -func (q *Spec) determineParentsChildrenAndRoots() (parents, children map[OperationID][]*Operation, roots []*Operation, _ error) { +func (q *Spec) determineParentsChildrenAndRoots() (parents, children map[NodeID][]*Node, roots []*Node, _ error) { lookup, err := q.computeLookup() if err != nil { return nil, nil, nil, err } - children = make(map[OperationID][]*Operation, len(q.Operations)) - parents = make(map[OperationID][]*Operation, len(q.Operations)) + children = make(map[NodeID][]*Node, len(q.Operations)) + parents = make(map[NodeID][]*Node, len(q.Operations)) for _, e := range q.Edges { // Build children map c, ok := lookup[e.Child] @@ -150,7 +168,7 @@ func (q *Spec) determineParentsChildrenAndRoots() (parents, children map[Operati // Depth first search topological sorting of a DAG. // https://en.wikipedia.org/wiki/Topological_sorting#Algorithms -func (q *Spec) visit(tMarks, pMarks map[OperationID]bool, o *Operation) error { +func (q *Spec) visit(tMarks, pMarks map[NodeID]bool, o *Node) error { id := o.ID if tMarks[id] { return errors.New(codes.Invalid, "found cycle in query") @@ -173,7 +191,7 @@ func (q *Spec) visit(tMarks, pMarks map[OperationID]bool, o *Operation) error { // Functions return the names of all functions used in the plan func (q *Spec) Functions() ([]string, error) { funcs := []string{} - err := q.Walk(func(o *Operation) error { + err := q.Walk(func(o *Node) error { funcs = append(funcs, string(o.Spec.Kind())) return nil }) diff --git a/spec_test.go b/internal/operation/spec_test.go similarity index 67% rename from spec_test.go rename to internal/operation/spec_test.go index c1a622755b..e9231b7d43 100644 --- a/spec_test.go +++ b/internal/operation/spec_test.go @@ -1,4 +1,4 @@ -package flux_test +package operation_test import ( "errors" @@ -6,26 +6,26 @@ import ( "testing" "github.com/google/go-cmp/cmp" - "github.com/influxdata/flux" + "github.com/influxdata/flux/internal/operation" ) func TestSpec_Walk(t *testing.T) { testCases := []struct { - query *flux.Spec - walkOrder []flux.OperationID + query *operation.Spec + walkOrder []operation.NodeID err error }{ { - query: &flux.Spec{}, + query: &operation.Spec{}, err: errors.New("query has no root nodes"), }, { - query: &flux.Spec{ - Operations: []*flux.Operation{ + query: &operation.Spec{ + Operations: []*operation.Node{ {ID: "a"}, {ID: "b"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "a", Child: "b"}, {Parent: "a", Child: "c"}, }, @@ -33,13 +33,13 @@ func TestSpec_Walk(t *testing.T) { err: errors.New("edge references unknown child operation \"c\""), }, { - query: &flux.Spec{ - Operations: []*flux.Operation{ + query: &operation.Spec{ + Operations: []*operation.Node{ {ID: "a"}, {ID: "b"}, {ID: "b"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "a", Child: "b"}, {Parent: "a", Child: "b"}, }, @@ -47,13 +47,13 @@ func TestSpec_Walk(t *testing.T) { err: errors.New("found duplicate operation ID \"b\""), }, { - query: &flux.Spec{ - Operations: []*flux.Operation{ + query: &operation.Spec{ + Operations: []*operation.Node{ {ID: "a"}, {ID: "b"}, {ID: "c"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "a", Child: "b"}, {Parent: "b", Child: "c"}, {Parent: "c", Child: "b"}, @@ -62,14 +62,14 @@ func TestSpec_Walk(t *testing.T) { err: errors.New("found cycle in query"), }, { - query: &flux.Spec{ - Operations: []*flux.Operation{ + query: &operation.Spec{ + Operations: []*operation.Node{ {ID: "a"}, {ID: "b"}, {ID: "c"}, {ID: "d"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "a", Child: "b"}, {Parent: "b", Child: "c"}, {Parent: "c", Child: "d"}, @@ -79,85 +79,85 @@ func TestSpec_Walk(t *testing.T) { err: errors.New("found cycle in query"), }, { - query: &flux.Spec{ - Operations: []*flux.Operation{ + query: &operation.Spec{ + Operations: []*operation.Node{ {ID: "a"}, {ID: "b"}, {ID: "c"}, {ID: "d"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "a", Child: "b"}, {Parent: "b", Child: "c"}, {Parent: "c", Child: "d"}, }, }, - walkOrder: []flux.OperationID{ + walkOrder: []operation.NodeID{ "a", "b", "c", "d", }, }, { - query: &flux.Spec{ - Operations: []*flux.Operation{ + query: &operation.Spec{ + Operations: []*operation.Node{ {ID: "a"}, }, - Edges: []flux.Edge{}, + Edges: []operation.Edge{}, }, - walkOrder: []flux.OperationID{ + walkOrder: []operation.NodeID{ "a", }, }, { - query: &flux.Spec{ - Operations: []*flux.Operation{ + query: &operation.Spec{ + Operations: []*operation.Node{ {ID: "a"}, {ID: "b"}, {ID: "c"}, {ID: "d"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "a", Child: "b"}, {Parent: "a", Child: "c"}, {Parent: "b", Child: "d"}, {Parent: "c", Child: "d"}, }, }, - walkOrder: []flux.OperationID{ + walkOrder: []operation.NodeID{ "a", "c", "b", "d", }, }, { - query: &flux.Spec{ - Operations: []*flux.Operation{ + query: &operation.Spec{ + Operations: []*operation.Node{ {ID: "a"}, {ID: "b"}, {ID: "c"}, {ID: "d"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "a", Child: "c"}, {Parent: "b", Child: "c"}, {Parent: "c", Child: "d"}, }, }, - walkOrder: []flux.OperationID{ + walkOrder: []operation.NodeID{ "b", "a", "c", "d", }, }, { - query: &flux.Spec{ - Operations: []*flux.Operation{ + query: &operation.Spec{ + Operations: []*operation.Node{ {ID: "a"}, {ID: "b"}, {ID: "c"}, {ID: "d"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "a", Child: "c"}, {Parent: "b", Child: "d"}, }, }, - walkOrder: []flux.OperationID{ + walkOrder: []operation.NodeID{ "b", "d", "a", "c", }, }, @@ -165,8 +165,8 @@ func TestSpec_Walk(t *testing.T) { for i, tc := range testCases { tc := tc t.Run(strconv.Itoa(i), func(t *testing.T) { - var gotOrder []flux.OperationID - err := tc.query.Walk(func(o *flux.Operation) error { + var gotOrder []operation.NodeID + err := tc.query.Walk(func(o *operation.Node) error { gotOrder = append(gotOrder, o.ID) return nil }) diff --git a/internal/spec/build.go b/internal/spec/build.go index 7e5baa5a2c..af44ce0302 100644 --- a/internal/spec/build.go +++ b/internal/spec/build.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/flux/codes" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/internal/errors" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/plan" "github.com/opentracing/opentracing-go" @@ -16,7 +17,7 @@ import ( type ider struct { id *int - lookup map[*flux.TableObject]flux.OperationID + lookup map[*flux.TableObject]operation.NodeID } func (i *ider) nextID() int { @@ -25,18 +26,18 @@ func (i *ider) nextID() int { return next } -func (i *ider) get(t *flux.TableObject) (flux.OperationID, bool) { +func (i *ider) get(t *flux.TableObject) (operation.NodeID, bool) { tableID, ok := i.lookup[t] return tableID, ok } -func (i *ider) set(t *flux.TableObject, id int) flux.OperationID { - opID := flux.OperationID(fmt.Sprintf("%s%d", t.Kind, id)) +func (i *ider) set(t *flux.TableObject, id int) operation.NodeID { + opID := operation.NodeID(fmt.Sprintf("%s%d", t.Kind, id)) i.lookup[t] = opID return opID } -func (i *ider) ID(t *flux.TableObject) flux.OperationID { +func (i *ider) ID(t *flux.TableObject) operation.NodeID { tableID, ok := i.get(t) if !ok { tableID = i.set(t, i.nextID()) @@ -52,7 +53,7 @@ func (i *ider) ID(t *flux.TableObject) flux.OperationID { // the terminal node in the plan. // In keeping with the "one result" requirement, when `skipYields` is true // FromEvaluation will produce an error for inputs producing > 1 result. -func FromEvaluation(ctx context.Context, ses []interpreter.SideEffect, now time.Time, skipYields bool) (*flux.Spec, error) { +func FromEvaluation(ctx context.Context, ses []interpreter.SideEffect, now time.Time, skipYields bool) (*operation.Spec, error) { var nextNodeID *int if value := ctx.Value(plan.NextPlanNodeIDKey); value != nil { nextNodeID = value.(*int) @@ -61,10 +62,10 @@ func FromEvaluation(ctx context.Context, ses []interpreter.SideEffect, now time. } ider := &ider{ id: nextNodeID, - lookup: make(map[*flux.TableObject]flux.OperationID), + lookup: make(map[*flux.TableObject]operation.NodeID), } - spec := &flux.Spec{Now: now} + spec := &operation.Spec{Now: now} seen := make(map[*flux.TableObject]bool) objs := make([]*flux.TableObject, 0, len(ses)) resultCount := 0 @@ -123,14 +124,14 @@ func isDuplicateTableObject(ctx context.Context, op *flux.TableObject, objs []*f return false } -func buildSpecWithTrace(ctx context.Context, t *flux.TableObject, ider flux.IDer, spec *flux.Spec, visited map[*flux.TableObject]bool, skipYields bool) { +func buildSpecWithTrace(ctx context.Context, t *flux.TableObject, ider *ider, spec *operation.Spec, visited map[*flux.TableObject]bool, skipYields bool) { s, _ := opentracing.StartSpanFromContext(ctx, "buildSpec") s.SetTag("opKind", t.Kind) buildSpec(t, ider, spec, visited, skipYields) s.Finish() } -func buildSpec(t *flux.TableObject, ider flux.IDer, spec *flux.Spec, visited map[*flux.TableObject]bool, skipYields bool) { +func buildSpec(t *flux.TableObject, ider *ider, spec *operation.Spec, visited map[*flux.TableObject]bool, skipYields bool) { // Traverse graph upwards to first unvisited node. // Note: parents are sorted based on parameter name, so the visit order is consistent. @@ -154,12 +155,19 @@ func buildSpec(t *flux.TableObject, ider flux.IDer, spec *flux.Spec, visited map if !(skipYields && t.Kind == "yield") { // Link table object to all parents after assigning ID. for _, p := range parents { - spec.Edges = append(spec.Edges, flux.Edge{ + spec.Edges = append(spec.Edges, operation.Edge{ Parent: ider.ID(p), Child: tableID, }) } - spec.Operations = append(spec.Operations, t.Operation(ider)) + op := &operation.Node{ + ID: ider.ID(t), + Spec: t.Spec, + Source: operation.NodeSource{ + Stack: t.Source.Stack, + }, + } + spec.Operations = append(spec.Operations, op) } visited[t] = true @@ -178,14 +186,14 @@ func getNonYieldParents(acc []*flux.TableObject, to *flux.TableObject) []*flux.T } // FromTableObject returns a spec from a TableObject. -func FromTableObject(ctx context.Context, to *flux.TableObject, now time.Time) (*flux.Spec, error) { +func FromTableObject(ctx context.Context, to *flux.TableObject, now time.Time) (*operation.Spec, error) { return FromEvaluation(ctx, []interpreter.SideEffect{{Value: to}}, now, true) } // FromScript returns a spec from a script expressed as a raw string. // This is duplicate logic for what happens when a flux.Program runs. // This function is used in tests that compare flux.Specs (e.g. in planner tests). -func FromScript(ctx context.Context, runtime flux.Runtime, now time.Time, script string) (*flux.Spec, error) { +func FromScript(ctx context.Context, runtime flux.Runtime, now time.Time, script string) (*operation.Spec, error) { s, _ := opentracing.StartSpanFromContext(ctx, "parse") astPkg, err := runtime.Parse(script) if err != nil { diff --git a/internal/spec/build_test.go b/internal/spec/build_test.go index 5c5d971a47..362d55a480 100644 --- a/internal/spec/build_test.go +++ b/internal/spec/build_test.go @@ -6,11 +6,11 @@ import ( "testing" "time" - "github.com/influxdata/flux" "github.com/influxdata/flux/dependencies/dependenciestest" "github.com/influxdata/flux/dependency" "github.com/influxdata/flux/execute" _ "github.com/influxdata/flux/fluxinit/static" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/internal/spec" "github.com/influxdata/flux/runtime" ) @@ -65,7 +65,7 @@ func TestFromEvaluation(t *testing.T) { tests := []struct { name string args args - want *flux.Spec + want *operation.Spec wantErr bool }{ { @@ -79,12 +79,12 @@ func TestFromEvaluation(t *testing.T) { now: nowDefault, skipYields: false, }, - want: &flux.Spec{ - Operations: []*flux.Operation{ + want: &operation.Spec{ + Operations: []*operation.Node{ {ID: "array.from0"}, {ID: "yield1"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "array.from0", Child: "yield1"}, }, }, @@ -100,8 +100,8 @@ func TestFromEvaluation(t *testing.T) { now: nowDefault, skipYields: true, }, - want: &flux.Spec{ - Operations: []*flux.Operation{ + want: &operation.Spec{ + Operations: []*operation.Node{ {ID: "array.from0"}, }, // No edges since there's only a single node left @@ -120,14 +120,14 @@ func TestFromEvaluation(t *testing.T) { now: nowDefault, skipYields: false, }, - want: &flux.Spec{ - Operations: []*flux.Operation{ + want: &operation.Spec{ + Operations: []*operation.Node{ {ID: "array.from0"}, {ID: "yield1"}, {ID: "yield2"}, {ID: "yield3"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "array.from0", Child: "yield1"}, {Parent: "yield1", Child: "yield2"}, {Parent: "yield2", Child: "yield3"}, @@ -146,8 +146,8 @@ func TestFromEvaluation(t *testing.T) { now: nowDefault, skipYields: true, }, - want: &flux.Spec{ - Operations: []*flux.Operation{ + want: &operation.Spec{ + Operations: []*operation.Node{ {ID: "array.from0"}, }, // No edges since there's only a single node left @@ -165,13 +165,13 @@ func TestFromEvaluation(t *testing.T) { now: nowDefault, skipYields: false, }, - want: &flux.Spec{ - Operations: []*flux.Operation{ + want: &operation.Spec{ + Operations: []*operation.Node{ {ID: "array.from0"}, {ID: "yield1"}, {ID: "map2"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "array.from0", Child: "yield1"}, {Parent: "yield1", Child: "map2"}, }, @@ -189,12 +189,12 @@ func TestFromEvaluation(t *testing.T) { now: nowDefault, skipYields: true, }, - want: &flux.Spec{ - Operations: []*flux.Operation{ + want: &operation.Spec{ + Operations: []*operation.Node{ {ID: "array.from0"}, {ID: "map2"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "array.from0", Child: "map2"}, }, }, @@ -216,12 +216,12 @@ func TestFromEvaluation(t *testing.T) { now: nowDefault, skipYields: true, }, - want: &flux.Spec{ - Operations: []*flux.Operation{ + want: &operation.Spec{ + Operations: []*operation.Node{ {ID: "array.from0"}, {ID: "map7"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "array.from0", Child: "map7"}, }, }, @@ -242,8 +242,8 @@ func TestFromEvaluation(t *testing.T) { now: nowDefault, skipYields: false, }, - want: &flux.Spec{ - Operations: []*flux.Operation{ + want: &operation.Spec{ + Operations: []*operation.Node{ {ID: "array.from0"}, {ID: "yield1"}, {ID: "yield2"}, @@ -252,7 +252,7 @@ func TestFromEvaluation(t *testing.T) { {ID: "yield5"}, {ID: "map6"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "array.from0", Child: "yield1"}, {Parent: "yield1", Child: "yield2"}, {Parent: "yield2", Child: "map3"}, @@ -278,14 +278,14 @@ func TestFromEvaluation(t *testing.T) { now: nowDefault, skipYields: true, }, - want: &flux.Spec{ - Operations: []*flux.Operation{ + want: &operation.Spec{ + Operations: []*operation.Node{ {ID: "array.from0"}, {ID: "map3"}, {ID: "map4"}, {ID: "map6"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "array.from0", Child: "map3"}, {Parent: "map3", Child: "map4"}, {Parent: "map4", Child: "map6"}, @@ -305,13 +305,13 @@ func TestFromEvaluation(t *testing.T) { now: nowDefault, skipYields: false, }, - want: &flux.Spec{ - Operations: []*flux.Operation{ + want: &operation.Spec{ + Operations: []*operation.Node{ {ID: "array.from0"}, {ID: "toSQL1"}, {ID: "toSQL2"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "array.from0", Child: "toSQL1"}, {Parent: "toSQL1", Child: "toSQL2"}, }, @@ -362,14 +362,14 @@ func TestFromEvaluation(t *testing.T) { now: nowDefault, skipYields: false, }, - want: &flux.Spec{ - Operations: []*flux.Operation{ + want: &operation.Spec{ + Operations: []*operation.Node{ {ID: "array.from0"}, {ID: "toSQL1"}, {ID: "toSQL2"}, {ID: "yield3"}, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "array.from0", Child: "toSQL1"}, {Parent: "toSQL1", Child: "toSQL2"}, {Parent: "toSQL2", Child: "yield3"}, @@ -394,11 +394,11 @@ func TestFromEvaluation(t *testing.T) { return } - gotOpIDs := make([]flux.OperationID, len(got.Operations)) + gotOpIDs := make([]operation.NodeID, len(got.Operations)) for _, o := range got.Operations { gotOpIDs = append(gotOpIDs, o.ID) } - wantOpIDs := make([]flux.OperationID, len(tt.want.Operations)) + wantOpIDs := make([]operation.NodeID, len(tt.want.Operations)) for _, o := range tt.want.Operations { wantOpIDs = append(wantOpIDs, o.ID) } diff --git a/lang/compiler.go b/lang/compiler.go index ce473ad004..8827102db5 100644 --- a/lang/compiler.go +++ b/lang/compiler.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/internal/errors" "github.com/influxdata/flux/internal/jaeger" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/internal/spec" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/memory" @@ -130,7 +131,7 @@ func CompileTableObject(ctx context.Context, to *flux.TableObject, now time.Time }, nil } -func buildPlan(ctx context.Context, spec *flux.Spec, opts *compileOptions) (*plan.Spec, error) { +func buildPlan(ctx context.Context, spec *operation.Spec, opts *compileOptions) (*plan.Spec, error) { s, _ := opentracing.StartSpanFromContext(ctx, "plan") defer s.Finish() pb := plan.PlannerBuilder{} @@ -425,7 +426,7 @@ func (eoc *ExecOptsConfig) ConfigureNow(ctx context.Context, now time.Time) { *deps.Now = now } -func (p *AstProgram) getSpec(ctx context.Context, alloc memory.Allocator) (*flux.Spec, values.Scope, error) { +func (p *AstProgram) getSpec(ctx context.Context, alloc memory.Allocator) (*operation.Spec, values.Scope, error) { ast, astErr := p.GetAst() if astErr != nil { return nil, nil, astErr diff --git a/operation.go b/operation.go index 6b1ea9589c..044b4a07db 100644 --- a/operation.go +++ b/operation.go @@ -1,30 +1,10 @@ package flux -import ( - "github.com/influxdata/flux/interpreter" -) - -// Operation denotes a single operation in a query. -type Operation struct { - ID OperationID `json:"id"` - Spec OperationSpec `json:"spec"` - Source OperationSource `json:"source"` -} - // OperationSpec specifies an operation as part of a query. type OperationSpec interface { // Kind returns the kind of the operation. Kind() OperationKind } -// OperationSource specifies the source location that created -// an operation. -type OperationSource struct { - Stack []interpreter.StackEntry `json:"stack"` -} - -// OperationID is a unique ID within a query for the operation. -type OperationID string - // OperationKind denotes the kind of operations. type OperationKind string diff --git a/plan/builder.go b/plan/builder.go index cf7d5b70ce..bbb0bfa760 100644 --- a/plan/builder.go +++ b/plan/builder.go @@ -3,7 +3,7 @@ package plan import ( "context" - "github.com/influxdata/flux" + "github.com/influxdata/flux/internal/operation" ) // PlannerBuilder provides clients with an easy way to create planners. @@ -37,7 +37,7 @@ type planner struct { pp PhysicalPlanner } -func (p *planner) Plan(ctx context.Context, fspec *flux.Spec) (*Spec, error) { +func (p *planner) Plan(ctx context.Context, fspec *operation.Spec) (*Spec, error) { ip, err := p.lp.CreateInitialPlan(fspec) if err != nil { return nil, err diff --git a/plan/logical.go b/plan/logical.go index 764dfc583d..dd7a25e134 100644 --- a/plan/logical.go +++ b/plan/logical.go @@ -5,9 +5,9 @@ import ( "fmt" "time" - "github.com/influxdata/flux" "github.com/influxdata/flux/codes" "github.com/influxdata/flux/internal/errors" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/interpreter" ) @@ -18,7 +18,7 @@ import ( // actual physical algorithms used to implement operations, and independent of // the actual data being processed. type LogicalPlanner interface { - CreateInitialPlan(spec *flux.Spec) (*Spec, error) + CreateInitialPlan(spec *operation.Spec) (*Spec, error) Plan(context.Context, *Spec) (*Spec, error) } @@ -93,7 +93,7 @@ func DisableIntegrityChecks() LogicalOption { } // CreateInitialPlan translates the flux.Spec into an unoptimized, naive plan. -func (l *logicalPlanner) CreateInitialPlan(spec *flux.Spec) (*Spec, error) { +func (l *logicalPlanner) CreateInitialPlan(spec *operation.Spec) (*Spec, error) { return createLogicalPlan(spec) } @@ -167,8 +167,8 @@ func (lpn *LogicalNode) ShallowCopy() Node { } // createLogicalPlan creates a logical query plan from a flux spec -func createLogicalPlan(spec *flux.Spec) (*Spec, error) { - nodes := make(map[flux.OperationID]Node, len(spec.Operations)) +func createLogicalPlan(spec *operation.Spec) (*Spec, error) { + nodes := make(map[operation.NodeID]Node, len(spec.Operations)) admin := administration{now: spec.Now} plan := NewPlanSpec() @@ -193,9 +193,9 @@ func createLogicalPlan(spec *flux.Spec) (*Spec, error) { // fluxSpecVisitor visits a flux spec and constructs from it a logical plan DAG type fluxSpecVisitor struct { a Administration - spec *flux.Spec + spec *operation.Spec plan *Spec - nodes map[flux.OperationID]Node + nodes map[operation.NodeID]Node yieldNames map[string]struct{} } @@ -213,7 +213,7 @@ func (v *fluxSpecVisitor) addYieldName(pn Node) error { // visitOperation takes a flux spec operation, converts it to its equivalent // logical procedure spec, and adds it to the current logical plan DAG. -func (v *fluxSpecVisitor) visitOperation(o *flux.Operation) error { +func (v *fluxSpecVisitor) visitOperation(o *operation.Node) error { // Retrieve the create function for this query operation createFns, ok := createProcedureFnsFromKind(o.Spec.Kind()) diff --git a/plan/logical_test.go b/plan/logical_test.go index e6c627070e..2043bcbe37 100644 --- a/plan/logical_test.go +++ b/plan/logical_test.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/flux/dependency" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/internal/spec" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/parser" @@ -24,7 +25,7 @@ import ( "github.com/influxdata/flux/values/valuestest" ) -func compile(fluxText string, now time.Time) (*flux.Spec, error) { +func compile(fluxText string, now time.Time) (*operation.Spec, error) { ctx, deps := dependency.Inject(context.Background(), dependenciestest.Default()) defer deps.Finish() return spec.FromScript(ctx, runtime.Default, now, fluxText) diff --git a/plan/plantest/cmp.go b/plan/plantest/cmp.go index 0b0b464967..1af5f57b2c 100644 --- a/plan/plantest/cmp.go +++ b/plan/plantest/cmp.go @@ -6,7 +6,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/influxdata/flux" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/semantic/semantictest" "github.com/influxdata/flux/stdlib/kafka" @@ -17,9 +17,9 @@ import ( // CmpOptions are the options needed to compare plan.ProcedureSpecs inside plan.Spec. var CmpOptions = append( semantictest.CmpOptions, - cmp.AllowUnexported(flux.Spec{}), + cmp.AllowUnexported(operation.Spec{}), cmp.AllowUnexported(universe.JoinOpSpec{}), - cmpopts.IgnoreUnexported(flux.Spec{}), + cmpopts.IgnoreUnexported(operation.Spec{}), cmpopts.IgnoreUnexported(universe.JoinOpSpec{}), cmp.AllowUnexported(kafka.ToKafkaProcedureSpec{}), cmpopts.IgnoreUnexported(kafka.ToKafkaProcedureSpec{}), diff --git a/plan/types.go b/plan/types.go index d1e5625cb8..50887605ff 100644 --- a/plan/types.go +++ b/plan/types.go @@ -7,12 +7,13 @@ import ( "time" "github.com/influxdata/flux" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/interval" ) type Planner interface { - Plan(context.Context, *flux.Spec) (*Spec, error) + Plan(context.Context, *operation.Spec) (*Spec, error) } // Node defines the common interface for interacting with diff --git a/querytest/compile.go b/querytest/compile.go index 737ccb39bb..aa657a5d3c 100644 --- a/querytest/compile.go +++ b/querytest/compile.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/dependencies/dependenciestest" "github.com/influxdata/flux/dependency" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/internal/spec" "github.com/influxdata/flux/runtime" "github.com/influxdata/flux/semantic/semantictest" @@ -23,17 +24,17 @@ import ( type NewQueryTestCase struct { Name string Raw string - Want *flux.Spec + Want *operation.Spec WantErr bool } var opts = append( semantictest.CmpOptions, - cmp.AllowUnexported(flux.Spec{}), + cmp.AllowUnexported(operation.Spec{}), cmp.AllowUnexported(universe.JoinOpSpec{}), - cmpopts.IgnoreUnexported(flux.Spec{}), + cmpopts.IgnoreUnexported(operation.Spec{}), cmpopts.IgnoreUnexported(universe.JoinOpSpec{}), - cmpopts.IgnoreFields(flux.Operation{}, "Source"), + cmpopts.IgnoreFields(operation.Node{}, "Source"), valuestest.ScopeTransformer, ) diff --git a/repl/compiler.go b/repl/compiler.go index 1c5264999d..52dbe54964 100644 --- a/repl/compiler.go +++ b/repl/compiler.go @@ -4,6 +4,7 @@ import ( "context" "github.com/influxdata/flux" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/lang" "github.com/influxdata/flux/plan" ) @@ -13,7 +14,7 @@ const CompilerType = "REPL" // Compiler specific to the Flux REPL type Compiler struct { - Spec *flux.Spec `json:"spec"` + Spec *operation.Spec `json:"spec"` } func (c Compiler) Compile(ctx context.Context, runtime flux.Runtime) (flux.Program, error) { diff --git a/repl/repl.go b/repl/repl.go index bb14a4ea26..3c6503fead 100644 --- a/repl/repl.go +++ b/repl/repl.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/dependency" "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/internal/spec" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/lang" @@ -240,7 +241,7 @@ func (r *REPL) analyzeLine(t string) (*semantic.Package, *libflux.FluxError, err return x, nil, err } -func (r *REPL) doQuery(ctx context.Context, spec *flux.Spec) error { +func (r *REPL) doQuery(ctx context.Context, spec *operation.Spec) error { // Setup cancel context nextPlanNodeID := new(int) ctx, cancelFunc := context.WithCancel(context.WithValue( diff --git a/stdlib/contrib/tomhollingworth/events/duration_test.go b/stdlib/contrib/tomhollingworth/events/duration_test.go index a5cf0dda65..6a3fb731f7 100644 --- a/stdlib/contrib/tomhollingworth/events/duration_test.go +++ b/stdlib/contrib/tomhollingworth/events/duration_test.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" _ "github.com/influxdata/flux/fluxinit/static" // We need to init flux for the tests to work. + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/contrib/tomhollingworth/events" "github.com/influxdata/flux/stdlib/influxdata/influxdb" @@ -30,8 +31,8 @@ func TestDuration_NewQuery(t *testing.T) { Name: "duration default", Raw: `import "contrib/tomhollingworth/events" from(bucket:"mydb") |> range(start:-1h) |> events.duration()`, WantErr: false, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -63,7 +64,7 @@ func TestDuration_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "duration2"}, }, @@ -73,8 +74,8 @@ func TestDuration_NewQuery(t *testing.T) { Name: "duration different unit and columns", Raw: `import "contrib/tomhollingworth/events" from(bucket:"mydb") |> range(start:-1h) |> events.duration(unit: 1ms, timeColumn: "start", stopColumn: "end", columnName: "result")`, WantErr: false, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -106,7 +107,7 @@ func TestDuration_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "duration2"}, }, @@ -116,8 +117,8 @@ func TestDuration_NewQuery(t *testing.T) { Name: "duration with stop", Raw: `import "contrib/tomhollingworth/events" from(bucket:"mydb") |> range(start:-1h) |> events.duration(stop: 2020-10-20T08:30:00Z)`, WantErr: false, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -151,7 +152,7 @@ func TestDuration_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "duration2"}, }, diff --git a/stdlib/csv/from_test.go b/stdlib/csv/from_test.go index af8f1d5e10..af94c2da25 100644 --- a/stdlib/csv/from_test.go +++ b/stdlib/csv/from_test.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/flux/execute/executetest" _ "github.com/influxdata/flux/fluxinit/static" // We need to init flux for the tests to work. "github.com/influxdata/flux/internal/errors" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/mock" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/csv" @@ -207,8 +208,8 @@ func TestFromCSV_NewQuery(t *testing.T) { { Name: "fromCSV text", Raw: `import "csv" csv.from(csv: "1,2") |> range(start:-4h, stop:-2h) |> sum()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "fromCSV0", Spec: &csv.FromCSVOpSpec{ @@ -239,7 +240,7 @@ func TestFromCSV_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "fromCSV0", Child: "range1"}, {Parent: "range1", Child: "sum2"}, }, diff --git a/stdlib/experimental/group_test.go b/stdlib/experimental/group_test.go index 9f94833558..43429493a3 100644 --- a/stdlib/experimental/group_test.go +++ b/stdlib/experimental/group_test.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" _ "github.com/influxdata/flux/fluxinit/static" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/experimental" "github.com/influxdata/flux/stdlib/influxdata/influxdb" @@ -21,8 +22,8 @@ func TestGroup_NewQuery(t *testing.T) { Name: "experimental group extend", Raw: `import "experimental" from(bucket: "telegraf") |> range(start: -1m) |> experimental.group(mode: "extend", columns: ["a"])`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -47,7 +48,7 @@ from(bucket: "telegraf") |> range(start: -1m) |> experimental.group(mode: "exten Spec: &experimental.GroupOpSpec{Mode: "extend", Columns: []string{"a"}}, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "experimental-group2"}, }, diff --git a/stdlib/experimental/join.go b/stdlib/experimental/join.go index eef1a55837..7c6befb1b0 100644 --- a/stdlib/experimental/join.go +++ b/stdlib/experimental/join.go @@ -28,18 +28,11 @@ func init() { } type JoinOpSpec struct { - Left flux.OperationID `json:"left"` - Right flux.OperationID `json:"right"` - Fn interpreter.ResolvedFunction `json:"fn"` + Fn interpreter.ResolvedFunction l, r *flux.TableObject } -func (s *JoinOpSpec) IDer(ider flux.IDer) { - s.Left = ider.ID(s.l) - s.Right = ider.ID(s.r) -} - func createJoinOpSpec(args flux.Arguments, p *flux.Administration) (flux.OperationSpec, error) { l, ok := args.Get("left") if !ok { diff --git a/stdlib/experimental/mqtt/to_test.go b/stdlib/experimental/mqtt/to_test.go index eec60b634b..658325be7b 100644 --- a/stdlib/experimental/mqtt/to_test.go +++ b/stdlib/experimental/mqtt/to_test.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" _ "github.com/influxdata/flux/fluxinit/static" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/lang" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/querytest" @@ -27,8 +28,8 @@ func TestToMQTT_NewQuery(t *testing.T) { Raw: ` import "experimental/mqtt" from(bucket:"mybucket") |> mqtt.to(broker: "tcp://iot.eclipse.org:1883", timeout: 0s)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -48,7 +49,7 @@ from(bucket:"mybucket") |> mqtt.to(broker: "tcp://iot.eclipse.org:1883", timeout }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "toMQTT1"}, }, }, @@ -58,8 +59,8 @@ from(bucket:"mybucket") |> mqtt.to(broker: "tcp://iot.eclipse.org:1883", timeout Raw: ` import "experimental/mqtt" from(bucket:"mybucket") |> mqtt.to(broker: "tcp://iot.eclipse.org:1883", retain: true)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -81,7 +82,7 @@ from(bucket:"mybucket") |> mqtt.to(broker: "tcp://iot.eclipse.org:1883", retain: }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "toMQTT1"}, }, }, diff --git a/stdlib/generate/from_test.go b/stdlib/generate/from_test.go index c7be7a95c6..95c2d3499c 100644 --- a/stdlib/generate/from_test.go +++ b/stdlib/generate/from_test.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute/executetest" _ "github.com/influxdata/flux/fluxinit/static" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/runtime" @@ -31,8 +32,8 @@ func TestFrom_NewQuery(t *testing.T) { Raw: ` import "generate" generate.from(start: 0h, stop: 1h, count: 10, fn: (n) => n)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "fromGenerator0", Spec: &generate.FromGeneratorOpSpec{ @@ -58,8 +59,8 @@ func TestFrom_NewQuery(t *testing.T) { Raw: ` import "generate" generate.from(start: 2030-01-01T00:00:00Z, stop: 2030-01-01T00:00:01Z, count: 10, fn: (n) => n)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "fromGenerator0", Spec: &generate.FromGeneratorOpSpec{ diff --git a/stdlib/influxdata/influxdb/buckets_test.go b/stdlib/influxdata/influxdb/buckets_test.go index b5004fbd41..7e38a755b7 100644 --- a/stdlib/influxdata/influxdb/buckets_test.go +++ b/stdlib/influxdata/influxdb/buckets_test.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb" "github.com/influxdata/flux/stdlib/influxdata/influxdb/internal/testutil" @@ -16,8 +17,8 @@ func TestBuckets_NewQuery(t *testing.T) { { Name: "buckets no args", Raw: `buckets()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "buckets0", Spec: &influxdb.BucketsOpSpec{}, @@ -33,8 +34,8 @@ func TestBuckets_NewQuery(t *testing.T) { { Name: "buckets with host and token", Raw: `buckets(host: "http://localhost:8086", token: "mytoken")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "buckets0", Spec: &influxdb.BucketsOpSpec{ @@ -48,8 +49,8 @@ func TestBuckets_NewQuery(t *testing.T) { { Name: "buckets with org", Raw: `buckets(org: "influxdata")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "buckets0", Spec: &influxdb.BucketsOpSpec{ @@ -62,8 +63,8 @@ func TestBuckets_NewQuery(t *testing.T) { { Name: "buckets with org id", Raw: `buckets(orgID: "97aa81cc0e247dc4")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "buckets0", Spec: &influxdb.BucketsOpSpec{ diff --git a/stdlib/influxdata/influxdb/cardinality_test.go b/stdlib/influxdata/influxdb/cardinality_test.go index 1e90464692..1ec525b9fc 100644 --- a/stdlib/influxdata/influxdb/cardinality_test.go +++ b/stdlib/influxdata/influxdb/cardinality_test.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/runtime" @@ -33,8 +34,8 @@ func TestCardinality_NewQuery(t *testing.T) { { Name: "cardinality with bucket and range", Raw: `influxdb.cardinality(bucket:"mybucket",start:-4h,stop:-2h) |> sum()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "influxdata/influxdb.cardinality0", Spec: &influxdb.CardinalityOpSpec{ @@ -58,7 +59,7 @@ func TestCardinality_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "influxdata/influxdb.cardinality0", Child: "sum1"}, }, }, @@ -66,8 +67,8 @@ func TestCardinality_NewQuery(t *testing.T) { { Name: "cardinality with host and token", Raw: `influxdb.cardinality(bucket:"mybucket", host: "http://localhost:8086", token: "mytoken", start: -2h)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "influxdata/influxdb.cardinality0", Spec: &influxdb.CardinalityOpSpec{ @@ -89,8 +90,8 @@ func TestCardinality_NewQuery(t *testing.T) { { Name: "cardinality with org", Raw: `influxdb.cardinality(org: "influxdata", bucket:"mybucket", start: -2h)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "influxdata/influxdb.cardinality0", Spec: &influxdb.CardinalityOpSpec{ @@ -111,8 +112,8 @@ func TestCardinality_NewQuery(t *testing.T) { { Name: "cardinality with org id and bucket id", Raw: `influxdb.cardinality(orgID: "97aa81cc0e247dc4", bucketID: "1e01ac57da723035", start: -2h)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "influxdata/influxdb.cardinality0", Spec: &influxdb.CardinalityOpSpec{ diff --git a/stdlib/influxdata/influxdb/from_test.go b/stdlib/influxdata/influxdb/from_test.go index 7d01b81b20..6baeb669f3 100644 --- a/stdlib/influxdata/influxdb/from_test.go +++ b/stdlib/influxdata/influxdb/from_test.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/runtime" @@ -33,8 +34,8 @@ func TestFrom_NewQuery(t *testing.T) { { Name: "from with database", Raw: `from(bucket:"mybucket") |> range(start:-4h, stop:-2h) |> sum()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -64,7 +65,7 @@ func TestFrom_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "sum2"}, }, @@ -73,8 +74,8 @@ func TestFrom_NewQuery(t *testing.T) { { Name: "from with host and token", Raw: `from(bucket:"mybucket", host: "http://localhost:8086", token: "mytoken")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -89,8 +90,8 @@ func TestFrom_NewQuery(t *testing.T) { { Name: "from with org", Raw: `from(org: "influxdata", bucket:"mybucket")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -104,8 +105,8 @@ func TestFrom_NewQuery(t *testing.T) { { Name: "from with org id and bucket id", Raw: `from(orgID: "97aa81cc0e247dc4", bucketID: "1e01ac57da723035")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ diff --git a/stdlib/influxdata/influxdb/v1/databases_test.go b/stdlib/influxdata/influxdb/v1/databases_test.go index 36cecb3e18..cf8ddc20db 100644 --- a/stdlib/influxdata/influxdb/v1/databases_test.go +++ b/stdlib/influxdata/influxdb/v1/databases_test.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb" "github.com/influxdata/flux/stdlib/influxdata/influxdb/internal/testutil" @@ -18,8 +19,8 @@ func TestDatabases_NewQuery(t *testing.T) { Name: "databases no args", Raw: `import "influxdata/influxdb/v1" v1.databases()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "databases0", Spec: &v1.DatabasesOpSpec{}, @@ -37,8 +38,8 @@ v1.databases(chicken:"what is this?")`, Name: "databases with host and token", Raw: `import "influxdata/influxdb/v1" v1.databases(host: "http://localhost:8086", token: "mytoken")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "databases0", Spec: &v1.DatabasesOpSpec{ @@ -53,8 +54,8 @@ v1.databases(host: "http://localhost:8086", token: "mytoken")`, Name: "databases with org", Raw: `import "influxdata/influxdb/v1" v1.databases(org: "influxdata")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "databases0", Spec: &v1.DatabasesOpSpec{ @@ -68,8 +69,8 @@ v1.databases(org: "influxdata")`, Name: "databases with org id", Raw: `import "influxdata/influxdb/v1" v1.databases(orgID: "97aa81cc0e247dc4")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "databases0", Spec: &v1.DatabasesOpSpec{ diff --git a/stdlib/influxdata/influxdb/v1/from_influx_json_test.go b/stdlib/influxdata/influxdb/v1/from_influx_json_test.go index c3331db8c8..afe499a89d 100644 --- a/stdlib/influxdata/influxdb/v1/from_influx_json_test.go +++ b/stdlib/influxdata/influxdb/v1/from_influx_json_test.go @@ -3,8 +3,8 @@ package v1_test import ( "testing" - "github.com/influxdata/flux" _ "github.com/influxdata/flux/fluxinit/static" // We need to init flux for the tests to work. + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb/v1" ) @@ -29,8 +29,8 @@ func TestFromInfluxJSON_NewQuery(t *testing.T) { { Name: "text", Raw: `import "influxdata/influxdb/v1" v1.json(json: "{results: []}")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "fromInfluxJSON0", Spec: &v1.FromInfluxJSONOpSpec{ diff --git a/stdlib/influxdata/influxdb/wide_to_test.go b/stdlib/influxdata/influxdb/wide_to_test.go index 8e0c079b51..6d9792af55 100644 --- a/stdlib/influxdata/influxdb/wide_to_test.go +++ b/stdlib/influxdata/influxdb/wide_to_test.go @@ -19,6 +19,7 @@ import ( "github.com/influxdata/flux/execute/executetest" "github.com/influxdata/flux/execute/table/static" _ "github.com/influxdata/flux/fluxinit/static" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/querytest" @@ -36,8 +37,8 @@ from(bucket:"mydb") |> range(start: -1h) |> v1.fieldsAsCols() |> wideTo(bucket:"series1", org:"fred", host:"localhost", token:"auth-token")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -71,7 +72,7 @@ from(bucket:"mydb") }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "pivot2"}, {Parent: "pivot2", Child: "wide-to3"}, diff --git a/stdlib/join/equijoin_test.go b/stdlib/join/equijoin_test.go index baf0d8864f..58bc31897c 100644 --- a/stdlib/join/equijoin_test.go +++ b/stdlib/join/equijoin_test.go @@ -6,12 +6,12 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/influxdata/flux" "github.com/influxdata/flux/codes" "github.com/influxdata/flux/dependencies/dependenciestest" "github.com/influxdata/flux/dependency" _ "github.com/influxdata/flux/fluxinit/static" "github.com/influxdata/flux/internal/errors" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/internal/spec" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/plan/plantest" @@ -21,7 +21,7 @@ import ( "github.com/influxdata/flux/stdlib/universe" ) -func compile(fluxText string, now time.Time) (*flux.Spec, error) { +func compile(fluxText string, now time.Time) (*operation.Spec, error) { ctx, deps := dependency.Inject(context.Background(), dependenciestest.Default()) defer deps.Finish() return spec.FromScript(ctx, runtime.Default, now, fluxText) diff --git a/stdlib/kafka/to_test.go b/stdlib/kafka/to_test.go index 4153876591..db97800ff8 100644 --- a/stdlib/kafka/to_test.go +++ b/stdlib/kafka/to_test.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" _ "github.com/influxdata/flux/fluxinit/static" // We need to init flux for the tests to work. + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb" @@ -27,8 +28,8 @@ func TestToKafka_NewQuery(t *testing.T) { { Name: "from with database", Raw: `import "kafka" from(bucket:"mybucket") |> kafka.to(brokers:["brokerurl:8989"], name:"series1", topic:"totallynotfaketopic")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -46,7 +47,7 @@ func TestToKafka_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "toKafka1"}, }, }, diff --git a/stdlib/socket/from_test.go b/stdlib/socket/from_test.go index 0e294fc4fd..c883751629 100644 --- a/stdlib/socket/from_test.go +++ b/stdlib/socket/from_test.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/mock" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/querytest" @@ -39,8 +40,8 @@ socket.from(url: "url", decoder: "wrong")`, Name: "from ok", Raw: `import "socket" socket.from(url: "url", decoder: "line") |> range(start:-4h, stop:-2h) |> sum()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "fromSocket0", Spec: &socket.FromSocketOpSpec{ @@ -71,7 +72,7 @@ socket.from(url: "url", decoder: "line") |> range(start:-4h, stop:-2h) |> sum()` }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "fromSocket0", Child: "range1"}, {Parent: "range1", Child: "sum2"}, }, diff --git a/stdlib/sql/to_test.go b/stdlib/sql/to_test.go index 069f3c79f1..6818c52549 100644 --- a/stdlib/sql/to_test.go +++ b/stdlib/sql/to_test.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" _ "github.com/influxdata/flux/fluxinit/static" // We need to init flux for the tests to work. + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb" @@ -25,8 +26,8 @@ func TestSqlTo(t *testing.T) { { Name: "from with database", Raw: `import "sql" from(bucket: "mybucket") |> sql.to(driverName:"sqlmock", dataSourceName:"root@/db", table:"TestTable")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -43,7 +44,7 @@ func TestSqlTo(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "toSQL1"}, }, }, @@ -390,8 +391,8 @@ func TestSqlite3To(t *testing.T) { { Name: "from with database", Raw: `import "sql" from(bucket: "mybucket") |> sql.to(driverName:"sqlite3", dataSourceName:"file::memory:", table:"TestTable", batchSize:10000)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -408,7 +409,7 @@ func TestSqlite3To(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "toSQL1"}, }, }, diff --git a/stdlib/universe/columns_test.go b/stdlib/universe/columns_test.go index 5a7c0b8c35..cc153333c1 100644 --- a/stdlib/universe/columns_test.go +++ b/stdlib/universe/columns_test.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb" "github.com/influxdata/flux/stdlib/universe" @@ -17,8 +18,8 @@ func TestColumns_NewQuery(t *testing.T) { { Name: "from range columns", Raw: `from(bucket:"mydb") |> range(start:-1h) |> columns()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -45,7 +46,7 @@ func TestColumns_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "columns2"}, }, @@ -54,8 +55,8 @@ func TestColumns_NewQuery(t *testing.T) { { Name: "from columns custom label", Raw: `from(bucket:"mydb") |> columns(column: "labels")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -69,7 +70,7 @@ func TestColumns_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "columns1"}, }, }, diff --git a/stdlib/universe/count_test.go b/stdlib/universe/count_test.go index 480b615137..3ff4d159b7 100644 --- a/stdlib/universe/count_test.go +++ b/stdlib/universe/count_test.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/flux/arrow" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb" @@ -20,8 +21,8 @@ func TestCount_NewQuery(t *testing.T) { { Name: "from with range and count", Raw: `from(bucket:"mydb") |> range(start:-4h, stop:-2h) |> count()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -51,7 +52,7 @@ func TestCount_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "count2"}, }, diff --git a/stdlib/universe/covariance_test.go b/stdlib/universe/covariance_test.go index 71a60180e6..4b6a612c02 100644 --- a/stdlib/universe/covariance_test.go +++ b/stdlib/universe/covariance_test.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/runtime" _ "github.com/influxdata/flux/stdlib" @@ -22,8 +23,8 @@ func TestCovariance_NewQuery(t *testing.T) { { Name: "simple covariance", Raw: `from(bucket:"mybucket") |> covariance(columns:["a","b"],)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -38,7 +39,7 @@ func TestCovariance_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "covariance1"}, }, }, @@ -46,8 +47,8 @@ func TestCovariance_NewQuery(t *testing.T) { { Name: "pearsonr", Raw: `from(bucket:"mybucket")|>covariance(columns:["a","b"],pearsonr:true)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -63,7 +64,7 @@ func TestCovariance_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "covariance1"}, }, }, @@ -71,8 +72,8 @@ func TestCovariance_NewQuery(t *testing.T) { { Name: "global covariance", Raw: `cov(x: from(bucket:"mybucket"), y:from(bucket:"mybucket"), on:["host"], pearsonr:true)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -101,7 +102,7 @@ func TestCovariance_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "join2"}, {Parent: "from1", Child: "join2"}, {Parent: "join2", Child: "covariance3"}, diff --git a/stdlib/universe/fill_test.go b/stdlib/universe/fill_test.go index 7f83c7eb01..d331336011 100644 --- a/stdlib/universe/fill_test.go +++ b/stdlib/universe/fill_test.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/flux/dependencies/dependenciestest" "github.com/influxdata/flux/dependency" "github.com/influxdata/flux/internal/gen" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/semantic" "github.com/influxdata/flux/values" @@ -29,8 +30,8 @@ func TestFill_NewQuery(t *testing.T) { { Name: "from with range and fill", Raw: `from(bucket:"mydb") |> range(start:-4h, stop:-2h) |> fill(column: "c1", value: 1.0)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -62,7 +63,7 @@ func TestFill_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "fill2"}, }, diff --git a/stdlib/universe/filter_test.go b/stdlib/universe/filter_test.go index 9f3e68086b..3397cde803 100644 --- a/stdlib/universe/filter_test.go +++ b/stdlib/universe/filter_test.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" "github.com/influxdata/flux/internal/gen" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/lang" "github.com/influxdata/flux/memory" @@ -30,8 +31,8 @@ func TestFilter_NewQuery(t *testing.T) { { Name: "from with database filter and range", Raw: `from(bucket:"mybucket") |> filter(fn: (r) => r["t1"]=="val1" and r["t2"]=="val2") |> range(start:-4h, stop:-2h) |> count()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -70,7 +71,7 @@ func TestFilter_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "filter1"}, {Parent: "filter1", Child: "range2"}, {Parent: "range2", Child: "count3"}, @@ -91,8 +92,8 @@ func TestFilter_NewQuery(t *testing.T) { ) |> range(start:-4h, stop:-2h) |> count()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -131,7 +132,7 @@ func TestFilter_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "filter1"}, {Parent: "filter1", Child: "range2"}, {Parent: "range2", Child: "count3"}, @@ -148,8 +149,8 @@ func TestFilter_NewQuery(t *testing.T) { ) |> range(start:-4h, stop:-2h) |> count()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -188,7 +189,7 @@ func TestFilter_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "filter1"}, {Parent: "filter1", Child: "range2"}, {Parent: "range2", Child: "count3"}, @@ -205,8 +206,8 @@ func TestFilter_NewQuery(t *testing.T) { ) |> range(start:-4h, stop:-2h) |> count()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -245,7 +246,7 @@ func TestFilter_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "filter1"}, {Parent: "filter1", Child: "range2"}, {Parent: "range2", Child: "count3"}, @@ -262,8 +263,8 @@ func TestFilter_NewQuery(t *testing.T) { ) |> range(start:-4h, stop:-2h) |> count()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -302,7 +303,7 @@ func TestFilter_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "filter1"}, {Parent: "filter1", Child: "range2"}, {Parent: "range2", Child: "count3"}, @@ -315,8 +316,8 @@ func TestFilter_NewQuery(t *testing.T) { |> filter(fn: (r) => r["t1"]=~/^va\/l1/ )`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -333,7 +334,7 @@ func TestFilter_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "filter1"}, }, }, @@ -346,8 +347,8 @@ func TestFilter_NewQuery(t *testing.T) { and r["t2"] !~ /^val2/ )`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -364,7 +365,7 @@ func TestFilter_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "filter1"}, }, }, @@ -372,8 +373,8 @@ func TestFilter_NewQuery(t *testing.T) { { Name: "from with drop", Raw: `from(bucket:"mybucket") |> filter(fn: (r) => r._value > 0.0, onEmpty: "drop")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -391,7 +392,7 @@ func TestFilter_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "filter1"}, }, }, @@ -399,8 +400,8 @@ func TestFilter_NewQuery(t *testing.T) { { Name: "from with keep", Raw: `from(bucket:"mybucket") |> filter(fn: (r) => r._value > 0.0, onEmpty: "keep")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -418,7 +419,7 @@ func TestFilter_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "filter1"}, }, }, diff --git a/stdlib/universe/group_test.go b/stdlib/universe/group_test.go index 1ece6c2a1e..ae2639d046 100644 --- a/stdlib/universe/group_test.go +++ b/stdlib/universe/group_test.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" "github.com/influxdata/flux/internal/gen" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/plan/plantest" @@ -26,8 +27,8 @@ func TestGroup_NewQuery(t *testing.T) { Name: "group with no arguments", // group() defaults to group(columns: [], mode: "by") Raw: `from(bucket: "telegraf") |> range(start: -1m) |> group()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -52,7 +53,7 @@ func TestGroup_NewQuery(t *testing.T) { Spec: &universe.GroupOpSpec{Mode: "by", Columns: []string{}}, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "group2"}, }, @@ -61,8 +62,8 @@ func TestGroup_NewQuery(t *testing.T) { { Name: "group all", Raw: `from(bucket: "telegraf") |> range(start: -1m) |> group(columns:[], mode: "except")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -87,7 +88,7 @@ func TestGroup_NewQuery(t *testing.T) { Spec: &universe.GroupOpSpec{Mode: "except", Columns: []string{}}, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "group2"}, }, @@ -96,8 +97,8 @@ func TestGroup_NewQuery(t *testing.T) { { Name: "group none", Raw: `from(bucket: "telegraf") |> range(start: -1m) |> group(columns: [], mode: "by")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -122,7 +123,7 @@ func TestGroup_NewQuery(t *testing.T) { Spec: &universe.GroupOpSpec{Mode: "by", Columns: []string{}}, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "group2"}, }, @@ -131,8 +132,8 @@ func TestGroup_NewQuery(t *testing.T) { { Name: "group by", Raw: `from(bucket: "telegraf") |> range(start: -1m) |> group(columns: ["host"], mode: "by")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -160,7 +161,7 @@ func TestGroup_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "group2"}, }, @@ -169,8 +170,8 @@ func TestGroup_NewQuery(t *testing.T) { { Name: "group except", Raw: `from(bucket: "telegraf") |> range(start: -1m) |> group(columns: ["host"], mode: "except")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -198,7 +199,7 @@ func TestGroup_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "group2"}, }, diff --git a/stdlib/universe/holt_winters_test.go b/stdlib/universe/holt_winters_test.go index 86ca33cfef..af0aef3bc0 100644 --- a/stdlib/universe/holt_winters_test.go +++ b/stdlib/universe/holt_winters_test.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/flux/execute/executetest" "github.com/influxdata/flux/internal/errors" "github.com/influxdata/flux/internal/gen" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb" @@ -32,8 +33,8 @@ func TestHoltWinters_NewQuery(t *testing.T) { { Name: "holt winters defaults", Raw: `from(bucket:"mydb") |> range(start:-1h) |> holtWinters(n: 84, interval: 42d)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -65,7 +66,7 @@ func TestHoltWinters_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "holtWinters2"}, }, @@ -74,8 +75,8 @@ func TestHoltWinters_NewQuery(t *testing.T) { { Name: "holt winters no defaults", Raw: `from(bucket:"mydb") |> range(start:-1h) |> holtWinters(n: 84, seasonality: 4, interval: 42d, timeColumn: "t", column: "v", withFit: true)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -107,7 +108,7 @@ func TestHoltWinters_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "holtWinters2"}, }, diff --git a/stdlib/universe/join_test.go b/stdlib/universe/join_test.go index d272f43212..357d9d954e 100644 --- a/stdlib/universe/join_test.go +++ b/stdlib/universe/join_test.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb" @@ -24,8 +25,8 @@ func TestJoin_NewQuery(t *testing.T) { a = from(bucket:"dbA") |> range(start:-1h) b = from(bucket:"dbB") |> range(start:-1h) join(tables:{a:a,b:b}, on:["host"])`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -76,7 +77,7 @@ func TestJoin_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "from2", Child: "range3"}, {Parent: "range1", Child: "join4"}, @@ -91,8 +92,8 @@ func TestJoin_NewQuery(t *testing.T) { b = from(bucket:"flux") |> range(start:-1h) join(tables:{a:a,b:b}, on:["t1"]) `, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -143,7 +144,7 @@ func TestJoin_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "from2", Child: "range3"}, {Parent: "range1", Child: "join4"}, diff --git a/stdlib/universe/keys_test.go b/stdlib/universe/keys_test.go index 6b29b0058a..b62c28b3f1 100644 --- a/stdlib/universe/keys_test.go +++ b/stdlib/universe/keys_test.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb" "github.com/influxdata/flux/stdlib/universe" @@ -17,8 +18,8 @@ func TestKeys_NewQuery(t *testing.T) { { Name: "from range keys", Raw: `from(bucket: "mydb") |> range(start:-1h) |> keys()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -45,7 +46,7 @@ func TestKeys_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "keys2"}, }, @@ -54,8 +55,8 @@ func TestKeys_NewQuery(t *testing.T) { { Name: "from keys custom label", Raw: `from(bucket: "mydb") |> keys(column: "keys")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -69,7 +70,7 @@ func TestKeys_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "keys1"}, }, }, diff --git a/stdlib/universe/map_test.go b/stdlib/universe/map_test.go index fb09ff85e0..88a0c62a55 100644 --- a/stdlib/universe/map_test.go +++ b/stdlib/universe/map_test.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" "github.com/influxdata/flux/internal/gen" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/querytest" @@ -27,8 +28,8 @@ func TestMap_NewQuery(t *testing.T) { { Name: "simple static map", Raw: `from(bucket:"mybucket") |> map(fn: (r) => r._value + 1)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -45,7 +46,7 @@ func TestMap_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "map1"}, }, }, @@ -53,8 +54,8 @@ func TestMap_NewQuery(t *testing.T) { { Name: "simple static map mergeKey=true", Raw: `from(bucket:"mybucket") |> map(fn: (r) => r._value + 1, mergeKey:true)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -72,7 +73,7 @@ func TestMap_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "map1"}, }, }, @@ -80,8 +81,8 @@ func TestMap_NewQuery(t *testing.T) { { Name: "resolve map", Raw: `x = 2 from(bucket:"mybucket") |> map(fn: (r) => r._value + x)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -102,7 +103,7 @@ func TestMap_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "map1"}, }, }, diff --git a/stdlib/universe/pivot_test.go b/stdlib/universe/pivot_test.go index 07a6eb01da..cabca5d52b 100644 --- a/stdlib/universe/pivot_test.go +++ b/stdlib/universe/pivot_test.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/flux/execute/executetest" "github.com/influxdata/flux/internal/errors" "github.com/influxdata/flux/internal/gen" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb" @@ -22,8 +23,8 @@ func TestPivot_NewQuery(t *testing.T) { { Name: "pivot [_measurement, _field] around _time", Raw: `from(bucket:"testdb") |> range(start: -1h) |> pivot(rowKey: ["_time"], columnKey: ["_measurement", "_field"], valueColumn: "_value")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -54,7 +55,7 @@ func TestPivot_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "pivot2"}, }, diff --git a/stdlib/universe/quantile_test.go b/stdlib/universe/quantile_test.go index 591d211dda..9197fc81ff 100644 --- a/stdlib/universe/quantile_test.go +++ b/stdlib/universe/quantile_test.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/flux/arrow" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb" @@ -20,8 +21,8 @@ func TestQuantile_NewQuery(t *testing.T) { { Name: "tdigest", Raw: `from(bucket:"testdb") |> range(start: -1h) |> quantile(q: 0.99, method: "estimate_tdigest", compression: 1000.0)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -53,7 +54,7 @@ func TestQuantile_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "quantile2"}, }, @@ -62,8 +63,8 @@ func TestQuantile_NewQuery(t *testing.T) { { Name: "exact_mean", Raw: `from(bucket:"testdb") |> range(start: -1h) |> quantile(q: 0.99, method: "exact_mean")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -94,7 +95,7 @@ func TestQuantile_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "quantile2"}, }, @@ -103,8 +104,8 @@ func TestQuantile_NewQuery(t *testing.T) { { Name: "exact_selector", Raw: `from(bucket:"testdb") |> range(start: -1h) |> quantile(q: 0.99, method: "exact_selector")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -135,7 +136,7 @@ func TestQuantile_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "quantile2"}, }, @@ -144,8 +145,8 @@ func TestQuantile_NewQuery(t *testing.T) { { Name: "custom col", Raw: `from(bucket:"testdb") |> range(start: -1h) |> quantile(q: 0.99, method: "exact_selector", column: "foo")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -178,7 +179,7 @@ func TestQuantile_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "quantile2"}, }, @@ -187,8 +188,8 @@ func TestQuantile_NewQuery(t *testing.T) { { Name: "custom column", Raw: `from(bucket:"testdb") |> range(start: -1h) |> quantile(q: 0.99, method: "exact_mean", column: "foo")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -221,7 +222,7 @@ func TestQuantile_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "quantile2"}, }, @@ -230,8 +231,8 @@ func TestQuantile_NewQuery(t *testing.T) { { Name: "default", Raw: `from(bucket:"testdb") |> range(start: -1h) |> quantile(q: 0.99)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -263,7 +264,7 @@ func TestQuantile_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "quantile2"}, }, diff --git a/stdlib/universe/range_test.go b/stdlib/universe/range_test.go index 095b41f098..7df086129d 100644 --- a/stdlib/universe/range_test.go +++ b/stdlib/universe/range_test.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/csv" "github.com/influxdata/flux/stdlib/influxdata/influxdb" @@ -20,8 +21,8 @@ func TestRange_NewQuery(t *testing.T) { { Name: "from with database with range", Raw: `from(bucket:"mybucket") |> range(start:-4h, stop:-2h) |> sum()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -51,7 +52,7 @@ func TestRange_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "sum2"}, }, @@ -60,8 +61,8 @@ func TestRange_NewQuery(t *testing.T) { { Name: "from csv with range", Raw: `import "csv" csv.from(csv: "1,2") |> range(start:-4h, stop:-2h) |> sum()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "fromCSV0", Spec: &csv.FromCSVOpSpec{ @@ -92,7 +93,7 @@ func TestRange_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "fromCSV0", Child: "range1"}, {Parent: "range1", Child: "sum2"}, }, diff --git a/stdlib/universe/schema_functions_test.go b/stdlib/universe/schema_functions_test.go index 02395116eb..a19ea39b90 100644 --- a/stdlib/universe/schema_functions_test.go +++ b/stdlib/universe/schema_functions_test.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" "github.com/influxdata/flux/internal/gen" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/plan" @@ -23,8 +24,8 @@ func TestSchemaMutions_NewQueries(t *testing.T) { { Name: "test rename query", Raw: `from(bucket:"mybucket") |> rename(columns:{old:"new"}) |> sum()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -46,7 +47,7 @@ func TestSchemaMutions_NewQueries(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "rename1"}, {Parent: "rename1", Child: "sum2"}, }, @@ -55,8 +56,8 @@ func TestSchemaMutions_NewQueries(t *testing.T) { { Name: "test drop query", Raw: `from(bucket:"mybucket") |> drop(columns:["col1", "col2", "col3"]) |> sum()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -76,7 +77,7 @@ func TestSchemaMutions_NewQueries(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "drop1"}, {Parent: "drop1", Child: "sum2"}, }, @@ -85,8 +86,8 @@ func TestSchemaMutions_NewQueries(t *testing.T) { { Name: "test keep query", Raw: `from(bucket:"mybucket") |> keep(columns:["col1", "col2", "col3"]) |> sum()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -106,7 +107,7 @@ func TestSchemaMutions_NewQueries(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "keep1"}, {Parent: "keep1", Child: "sum2"}, }, @@ -115,8 +116,8 @@ func TestSchemaMutions_NewQueries(t *testing.T) { { Name: "test duplicate query", Raw: `from(bucket:"mybucket") |> duplicate(column: "col1", as: "col1_new") |> sum()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -137,7 +138,7 @@ func TestSchemaMutions_NewQueries(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "duplicate1"}, {Parent: "duplicate1", Child: "sum2"}, }, @@ -146,8 +147,8 @@ func TestSchemaMutions_NewQueries(t *testing.T) { { Name: "test drop query fn param", Raw: `from(bucket:"mybucket") |> drop(fn: (column) => column =~ /reg*/) |> sum()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -170,7 +171,7 @@ func TestSchemaMutions_NewQueries(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "drop1"}, {Parent: "drop1", Child: "sum2"}, }, @@ -179,8 +180,8 @@ func TestSchemaMutions_NewQueries(t *testing.T) { { Name: "test keep query fn param", Raw: `from(bucket:"mybucket") |> keep(fn: (column) => column =~ /reg*/) |> sum()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -203,7 +204,7 @@ func TestSchemaMutions_NewQueries(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "keep1"}, {Parent: "keep1", Child: "sum2"}, }, @@ -212,8 +213,8 @@ func TestSchemaMutions_NewQueries(t *testing.T) { { Name: "test rename query fn param", Raw: `from(bucket:"mybucket") |> rename(fn: (column) => "new_name") |> sum()`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -236,7 +237,7 @@ func TestSchemaMutions_NewQueries(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "rename1"}, {Parent: "rename1", Child: "sum2"}, }, diff --git a/stdlib/universe/state_tracking_test.go b/stdlib/universe/state_tracking_test.go index 2cc4885603..d282757569 100644 --- a/stdlib/universe/state_tracking_test.go +++ b/stdlib/universe/state_tracking_test.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/flux/dependency" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/querytest" @@ -25,8 +26,8 @@ func TestStateTracking_NewQuery(t *testing.T) { { Name: "from range count", Raw: `from(bucket:"mydb") |> range(start:-1h) |> stateCount(fn: (r) => true)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -60,7 +61,7 @@ func TestStateTracking_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "range1", Child: "stateTracking2"}, }, @@ -74,8 +75,8 @@ func TestStateTracking_NewQuery(t *testing.T) { { Name: "from duration", Raw: `from(bucket:"mydb") |> stateDuration(fn: (r) => true, timeColumn: "ts")`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -96,7 +97,7 @@ func TestStateTracking_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "stateTracking1"}, }, }, diff --git a/stdlib/universe/union_test.go b/stdlib/universe/union_test.go index 5f3615180e..c082cf9571 100644 --- a/stdlib/universe/union_test.go +++ b/stdlib/universe/union_test.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb" @@ -23,7 +24,7 @@ func TestUnion_NewQuery(t *testing.T) { a = from(bucket:"dbA") |> range(start:-1h) b = from(bucket:"dbB") |> range(start:-1h) union(tables: [a, b])`, - Want: &flux.Spec{Operations: []*flux.Operation{ + Want: &operation.Spec{Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -71,7 +72,7 @@ func TestUnion_NewQuery(t *testing.T) { Spec: &universe.UnionOpSpec{}, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "from2", Child: "range3"}, {Parent: "range1", Child: "union4"}, @@ -86,7 +87,7 @@ func TestUnion_NewQuery(t *testing.T) { b = from(bucket:"dbB") |> range(start:-1h) c = from(bucket:"dbC") |> range(start:-1h) union(tables: [a, b, c])`, - Want: &flux.Spec{Operations: []*flux.Operation{ + Want: &operation.Spec{Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -155,7 +156,7 @@ func TestUnion_NewQuery(t *testing.T) { Spec: &universe.UnionOpSpec{}, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "range1"}, {Parent: "from2", Child: "range3"}, {Parent: "from4", Child: "range5"}, diff --git a/stdlib/universe/window_test.go b/stdlib/universe/window_test.go index 0c50b60774..59e69936f6 100644 --- a/stdlib/universe/window_test.go +++ b/stdlib/universe/window_test.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/interval" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/plan/plantest" @@ -25,8 +26,8 @@ func TestWindow_NewQuery(t *testing.T) { { Name: "from with window", Raw: `from(bucket:"mybucket") |> window(every:1h, offset: -5m)`, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -48,7 +49,7 @@ func TestWindow_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ {Parent: "from0", Child: "window1"}, }, }, diff --git a/stdlib/universe/yield_test.go b/stdlib/universe/yield_test.go index 5118b5f7ab..818cc85bc9 100644 --- a/stdlib/universe/yield_test.go +++ b/stdlib/universe/yield_test.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/internal/operation" "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/stdlib/influxdata/influxdb" "github.com/influxdata/flux/stdlib/universe" @@ -19,8 +20,8 @@ func TestYield_NewQuery(t *testing.T) { from(bucket: "foo") |> range(start:-1h) |> yield(name: "1") from(bucket: "foo") |> range(start:-2h) |> yield(name: "2") `, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -76,22 +77,22 @@ func TestYield_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ { - Parent: flux.OperationID("from0"), - Child: flux.OperationID("range1"), + Parent: operation.NodeID("from0"), + Child: operation.NodeID("range1"), }, { - Parent: flux.OperationID("range1"), - Child: flux.OperationID("yield2"), + Parent: operation.NodeID("range1"), + Child: operation.NodeID("yield2"), }, { - Parent: flux.OperationID("from3"), - Child: flux.OperationID("range4"), + Parent: operation.NodeID("from3"), + Child: operation.NodeID("range4"), }, { - Parent: flux.OperationID("range4"), - Child: flux.OperationID("yield5"), + Parent: operation.NodeID("range4"), + Child: operation.NodeID("yield5"), }, }, }, @@ -105,8 +106,8 @@ func TestYield_NewQuery(t *testing.T) { } f()() `, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -135,14 +136,14 @@ func TestYield_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ { - Parent: flux.OperationID("from0"), - Child: flux.OperationID("range1"), + Parent: operation.NodeID("from0"), + Child: operation.NodeID("range1"), }, { - Parent: flux.OperationID("range1"), - Child: flux.OperationID("yield2"), + Parent: operation.NodeID("range1"), + Child: operation.NodeID("yield2"), }, }, }, @@ -152,8 +153,8 @@ func TestYield_NewQuery(t *testing.T) { Raw: ` from(bucket: "foo") |> range(start:-1h) |> yield(name: "1") |> sum() |> yield(name: "2") `, - Want: &flux.Spec{ - Operations: []*flux.Operation{ + Want: &operation.Spec{ + Operations: []*operation.Node{ { ID: "from0", Spec: &influxdb.FromOpSpec{ @@ -196,22 +197,22 @@ func TestYield_NewQuery(t *testing.T) { }, }, }, - Edges: []flux.Edge{ + Edges: []operation.Edge{ { - Parent: flux.OperationID("from0"), - Child: flux.OperationID("range1"), + Parent: operation.NodeID("from0"), + Child: operation.NodeID("range1"), }, { - Parent: flux.OperationID("range1"), - Child: flux.OperationID("yield2"), + Parent: operation.NodeID("range1"), + Child: operation.NodeID("yield2"), }, { - Parent: flux.OperationID("yield2"), - Child: flux.OperationID("sum3"), + Parent: operation.NodeID("yield2"), + Child: operation.NodeID("sum3"), }, { - Parent: flux.OperationID("sum3"), - Child: flux.OperationID("yield4"), + Parent: operation.NodeID("sum3"), + Child: operation.NodeID("yield4"), }, }, },