Skip to content

Commit

Permalink
refactor: move spec details into an internal package (#5116)
Browse files Browse the repository at this point in the history
The spec was something that originally existed to accomodate
transpilers. We later decided transpilers should just use the AST rather
than something internal like the spec and the repl became the only thing
that uses the spec.

This moves the spec to an internal package so it stays as an internal
detail and does not exist as part of the external Go API.

This also removes the ider interface. It didn't appear to be used for
anything other than an implementation inside of the spec, so this just
removes it. The transformations that needed to know the ids of their
parents didn't use the operation ids anyway so it didn't matter.
  • Loading branch information
jsternberg authored Aug 22, 2022
1 parent ac9b827 commit 32fe18d
Show file tree
Hide file tree
Showing 50 changed files with 472 additions and 459 deletions.
47 changes: 12 additions & 35 deletions compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,43 +66,23 @@ func functionValue(name string, c CreateOperationSpec, mt semantic.MonoType, sid

var _ = tableSpecKey // So that linter doesn't think tableSpecKey is unused, considering above TODO.

// IDer produces the mapping of table Objects to OperationIDs
type IDer interface {
ID(*TableObject) OperationID
}

// IDerOpSpec is the interface any operation spec that needs
// access to OperationIDs in the query spec must implement.
type IDerOpSpec interface {
IDer(ider IDer)
}

// TableObject represents the value returned by a transformation.
// As such, it holds the OperationSpec of the transformation it is associated with,
// and it is a values.Value (and, also, a values.Object).
// It can be compiled and executed as a flux.Program by using a lang.TableObjectCompiler.
type TableObject struct {
// TODO(Josh): Remove args once the
// OperationSpec interface has an Equal method.
t semantic.MonoType
args Arguments
Kind OperationKind
Spec OperationSpec
Source OperationSource
t semantic.MonoType
args Arguments
Kind OperationKind
Spec OperationSpec
Source struct {
Stack []interpreter.StackEntry
}
Parents []*TableObject
}

func (t *TableObject) Operation(ider IDer) *Operation {
if iderOpSpec, ok := t.Spec.(IDerOpSpec); ok {
iderOpSpec.IDer(ider)
}

return &Operation{
ID: ider.ID(t),
Spec: t.Spec,
Source: t.Source,
}
}
func (t *TableObject) IsNull() bool {
return false
}
Expand Down Expand Up @@ -326,17 +306,14 @@ func (f *function) call(ctx context.Context, args interpreter.Arguments) (values
return nil, err
}

stack := interpreter.Stack(ctx)
t := &TableObject{
t: returnType,
args: arguments,
Kind: spec.Kind(),
Spec: spec,
Source: OperationSource{
Stack: stack,
},
t: returnType,
args: arguments,
Kind: spec.Kind(),
Spec: spec,
Parents: a.parents,
}
t.Source.Stack = interpreter.Stack(ctx)
return t, nil
}
func (f *function) String() string {
Expand Down
4 changes: 2 additions & 2 deletions execute/concurrency_quota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"testing"
"time"

"github.com/influxdata/flux/internal/operation"
"github.com/influxdata/flux/plan"
"go.uber.org/zap/zaptest"

"github.com/influxdata/flux"
"github.com/influxdata/flux/dependencies/dependenciestest"
"github.com/influxdata/flux/dependency"
"github.com/influxdata/flux/execute"
Expand Down Expand Up @@ -108,7 +108,7 @@ func (rule parallelizeFromTo) Rewrite(ctx context.Context, pn plan.Node) (plan.N

type flagger map[string]interface{}

func compile(fluxText string, now time.Time) (context.Context, *flux.Spec, error) {
func compile(fluxText string, now time.Time) (context.Context, *operation.Spec, error) {
ctx, deps := dependency.Inject(context.Background(), dependenciestest.Default())
defer deps.Finish()
spec, err := spec.FromScript(ctx, runtime.Default, now, fluxText)
Expand Down
66 changes: 42 additions & 24 deletions spec.go → internal/operation/spec.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,52 @@
package flux
package operation

import (
"time"

"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/interpreter"
)

// Node denotes a single operation in a query.
type Node struct {
ID NodeID `json:"id"`
Spec flux.OperationSpec `json:"spec"`
Source NodeSource `json:"source"`
}

// NodeSource specifies the source location that created
// an operation.
type NodeSource struct {
Stack []interpreter.StackEntry `json:"stack"`
}

// NodeID is a unique ID within a query for the operation.
type NodeID string

// Spec specifies a query.
type Spec struct {
Operations []*Operation `json:"operations"`
Edges []Edge `json:"edges"`
Resources ResourceManagement `json:"resources"`
Now time.Time `json:"now"`

sorted []*Operation
children map[OperationID][]*Operation
parents map[OperationID][]*Operation
Operations []*Node `json:"operations"`
Edges []Edge `json:"edges"`
Resources flux.ResourceManagement `json:"resources"`
Now time.Time `json:"now"`

sorted []*Node
children map[NodeID][]*Node
parents map[NodeID][]*Node
}

// Edge is a data flow relationship between a parent and a child
type Edge struct {
Parent OperationID `json:"parent"`
Child OperationID `json:"child"`
Parent NodeID `json:"parent"`
Child NodeID `json:"child"`
}

// Walk calls f on each operation exactly once.
// The function f will be called on an operation only after
// all of its parents have already been passed to f.
func (q *Spec) Walk(f func(o *Operation) error) error {
func (q *Spec) Walk(f func(o *Node) error) error {
if len(q.sorted) == 0 {
if err := q.prepare(); err != nil {
return err
Expand All @@ -53,7 +71,7 @@ func (q *Spec) Validate() error {

// Children returns a list of children for a given operation.
// If the query is invalid no children will be returned.
func (q *Spec) Children(id OperationID) []*Operation {
func (q *Spec) Children(id NodeID) []*Node {
if q.children == nil {
err := q.prepare()
if err != nil {
Expand All @@ -65,7 +83,7 @@ func (q *Spec) Children(id OperationID) []*Operation {

// Parents returns a list of parents for a given operation.
// If the query is invalid no parents will be returned.
func (q *Spec) Parents(id OperationID) []*Operation {
func (q *Spec) Parents(id NodeID) []*Node {
if q.parents == nil {
err := q.prepare()
if err != nil {
Expand All @@ -91,23 +109,23 @@ func (q *Spec) prepare() error {
q.parents = parents
q.children = children

tMarks := make(map[OperationID]bool)
pMarks := make(map[OperationID]bool)
tMarks := make(map[NodeID]bool)
pMarks := make(map[NodeID]bool)

for _, r := range roots {
if err := q.visit(tMarks, pMarks, r); err != nil {
return err
}
}
//reverse q.sorted
// reverse q.sorted
for i, j := 0, len(q.sorted)-1; i < j; i, j = i+1, j-1 {
q.sorted[i], q.sorted[j] = q.sorted[j], q.sorted[i]
}
return nil
}

func (q *Spec) computeLookup() (map[OperationID]*Operation, error) {
lookup := make(map[OperationID]*Operation, len(q.Operations))
func (q *Spec) computeLookup() (map[NodeID]*Node, error) {
lookup := make(map[NodeID]*Node, len(q.Operations))
for _, o := range q.Operations {
if _, ok := lookup[o.ID]; ok {
return nil, errors.Newf(codes.Internal, "found duplicate operation ID %q", o.ID)
Expand All @@ -117,13 +135,13 @@ func (q *Spec) computeLookup() (map[OperationID]*Operation, error) {
return lookup, nil
}

func (q *Spec) determineParentsChildrenAndRoots() (parents, children map[OperationID][]*Operation, roots []*Operation, _ error) {
func (q *Spec) determineParentsChildrenAndRoots() (parents, children map[NodeID][]*Node, roots []*Node, _ error) {
lookup, err := q.computeLookup()
if err != nil {
return nil, nil, nil, err
}
children = make(map[OperationID][]*Operation, len(q.Operations))
parents = make(map[OperationID][]*Operation, len(q.Operations))
children = make(map[NodeID][]*Node, len(q.Operations))
parents = make(map[NodeID][]*Node, len(q.Operations))
for _, e := range q.Edges {
// Build children map
c, ok := lookup[e.Child]
Expand All @@ -150,7 +168,7 @@ func (q *Spec) determineParentsChildrenAndRoots() (parents, children map[Operati

// Depth first search topological sorting of a DAG.
// https://en.wikipedia.org/wiki/Topological_sorting#Algorithms
func (q *Spec) visit(tMarks, pMarks map[OperationID]bool, o *Operation) error {
func (q *Spec) visit(tMarks, pMarks map[NodeID]bool, o *Node) error {
id := o.ID
if tMarks[id] {
return errors.New(codes.Invalid, "found cycle in query")
Expand All @@ -173,7 +191,7 @@ func (q *Spec) visit(tMarks, pMarks map[OperationID]bool, o *Operation) error {
// Functions return the names of all functions used in the plan
func (q *Spec) Functions() ([]string, error) {
funcs := []string{}
err := q.Walk(func(o *Operation) error {
err := q.Walk(func(o *Node) error {
funcs = append(funcs, string(o.Spec.Kind()))
return nil
})
Expand Down
Loading

0 comments on commit 32fe18d

Please sign in to comment.