diff --git a/cmd/flux/cmd/repl.go b/cmd/flux/cmd/repl.go index ff4b2e2f10..716eb462a4 100644 --- a/cmd/flux/cmd/repl.go +++ b/cmd/flux/cmd/repl.go @@ -15,7 +15,7 @@ import ( var replCmd = &cobra.Command{ Use: "repl", Short: "Launch a Flux REPL", - Long: "Launch a Flux REPL (Run-Execute-Print-Loop)", + Long: "Launch a Flux REPL (Read-Eval-Print-Loop)", Run: func(cmd *cobra.Command, args []string) { q := NewQuerier() r := repl.New(q) diff --git a/compiler.go b/compiler.go index 4a1e03de83..29fc4ccd28 100644 --- a/compiler.go +++ b/compiler.go @@ -3,12 +3,14 @@ package flux import ( "context" "fmt" + + "github.com/influxdata/flux/memory" ) // Compiler produces a specification for the query. type Compiler interface { // Compile produces a specification for the query. - Compile(ctx context.Context) (*Spec, error) + Compile(ctx context.Context) (Program, error) CompilerType() CompilerType } @@ -24,3 +26,11 @@ func (m CompilerMappings) Add(t CompilerType, c CreateCompiler) error { m[t] = c return nil } + +// Program defines a Flux script which has been compiled. +type Program interface { + // Start begins execution of the program and returns immediately. + // As results are produced they arrive on the channel. + // The program is finished once the result channel is closed and all results have been consumed. + Start(context.Context, *memory.Allocator) (Query, error) +} diff --git a/control/controller.go b/control/controller.go index d2462708bb..9d10196af1 100644 --- a/control/controller.go +++ b/control/controller.go @@ -111,19 +111,13 @@ func New(c Config) *Controller { // Query submits a query for execution returning immediately. // Done must be called on any returned Query objects. func (c *Controller) Query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) { - q := c.createQuery(ctx, compiler.CompilerType()) - if err := c.compileQuery(q, compiler); err != nil { - q.setErr(err) - c.countQueryRequest(q, labelCompileError) - return nil, q.Err() - } - if err := c.enqueueQuery(q); err != nil { - q.setErr(err) - c.countQueryRequest(q, labelQueueError) - return nil, q.Err() + prog, err := compiler.Compile(ctx) + if err != nil { + return nil, err } - c.countQueryRequest(q, labelSuccess) - return q, nil + + a := new(memory.Allocator) + return prog.Start(ctx, a) } type Stringer interface { @@ -171,38 +165,6 @@ func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) *Que } func (c *Controller) compileQuery(q *Query, compiler flux.Compiler) error { - if !q.tryCompile() { - return errors.New("failed to transition query to compiling state") - } - spec, err := compiler.Compile(q.currentCtx) - if err != nil { - return errors.Wrap(err, "failed to compile query") - } - - // Incoming query spec may have been produced by an entity other than the - // Flux interpreter, so we must set the default Now time if not already set. - if spec.Now.IsZero() { - spec.Now = q.now - } - - q.spec = *spec - - if q.tryPlan() { - // Plan query to determine needed resources - p, err := c.planner.Plan(&q.spec) - if err != nil { - return errors.Wrap(err, "failed to plan query") - } - q.plan = p - q.concurrency = p.Resources.ConcurrencyQuota - if q.concurrency > c.maxConcurrency { - q.concurrency = c.maxConcurrency - } - q.memory = p.Resources.MemoryBytesQuota - if entry := c.logger.Check(zapcore.DebugLevel, "physical plan"); entry != nil { - entry.Write(zap.String("plan", fmt.Sprint(plan.Formatted(q.plan)))) - } - } return nil } @@ -507,7 +469,7 @@ func (q *Query) Cancel() { q.cancel() } -// Ready returns a channel that will deliver the query results. +// Results returns a channel that will deliver the query results. // // It's possible that the channel is closed before any results arrive. // In particular, if a query's context or the query itself is canceled, @@ -522,7 +484,7 @@ func (q *Query) Ready() <-chan map[string]flux.Result { // Done signals to the Controller that this query is no longer // being used and resources related to the query may be freed. // -// The Ready method must have returned a result before calling +// The Results method must have returned a result before calling // this method either by the query executing, being canceled, or // an error occurring. func (q *Query) Done() { diff --git a/control/controller_test.go b/control/controller_test.go index 0359e1aa79..8d34f0772a 100644 --- a/control/controller_test.go +++ b/control/controller_test.go @@ -177,7 +177,7 @@ func TestController_ExecuteQuery_Failure(t *testing.T) { } // We do not care about the results, just that the query is ready. - <-q.Ready() + <-q.Results() if err := q.Err(); err == nil { t.Fatal("expected error") @@ -229,7 +229,7 @@ func TestController_CancelQuery_Ready(t *testing.T) { } // We do not care about the results, just that the query is ready. - <-q.Ready() + <-q.Results() // Cancel the query. This is after the executor has already run, // but before we finalize the query. This ensures that canceling @@ -294,7 +294,7 @@ func TestController_CancelQuery_Execute(t *testing.T) { // We should not receive any results as the cancellation should // have signaled to the executor to cancel the query. select { - case <-q.Ready(): + case <-q.Results(): // The execute function should have received the cancel signal and exited // with an error. case <-ctx.Done(): @@ -387,7 +387,7 @@ func TestController_CancelQuery_Concurrent(t *testing.T) { // change. query := q doneWaitGroup.Do(func() error { - <-query.Ready() + <-query.Results() query.Done() return nil }) @@ -439,7 +439,7 @@ func TestController_BlockedExecutor(t *testing.T) { } defer func() { close(done) - <-q.Ready() + <-q.Results() q.Done() }() @@ -494,8 +494,8 @@ func TestController_CancelledContextPropagatesToExecutor(t *testing.T) { if err != nil { t.Errorf("unexpected error: %s", err) } - // Ready will unblock when executor unblocks - <-q.Ready() + // Results will unblock when executor unblocks + <-q.Results() // TODO(jlapacik): query should expose error if cancelled during execution // if q.Err() == nil { // t.Errorf("expected error; cancelled query context before execution finished") @@ -536,7 +536,7 @@ func TestController_Shutdown(t *testing.T) { ctrl := New(Config{}) ctrl.executor = executor - // Create a bunch of queries and never call Ready which should leave them in the controller. + // Create a bunch of queries and never call Results which should leave them in the controller. queries := make([]flux.Query, 0, 15) for i := 0; i < 10; i++ { q, err := ctrl.Query(context.Background(), mockCompiler) @@ -593,7 +593,7 @@ func TestController_Shutdown(t *testing.T) { for _, q := range queries { q := q wg.Do(func() error { - <-q.Ready() + <-q.Results() q.Done() return nil }) @@ -645,7 +645,7 @@ func TestController_Statistics(t *testing.T) { t.Fatalf("unexpected error: %s", err) } - <-q.Ready() + <-q.Results() time.Sleep(time.Millisecond) q.Done() diff --git a/internal/cmd/refactortests/cmd/refactortests.go b/internal/cmd/refactortests/cmd/refactortests.go index 43267b8398..943781a96b 100644 --- a/internal/cmd/refactortests/cmd/refactortests.go +++ b/internal/cmd/refactortests/cmd/refactortests.go @@ -183,7 +183,7 @@ func executeScript(pkg *ast.Package) (string, string, error) { return "", "", errors.Wrap(err, "error during compilation, check your script and retry") } defer r.Done() - results, ok := <-r.Ready() + results, ok := <-r.Results() if !ok { return "", "", errors.Wrap(r.Err(), "error retrieving query result") } diff --git a/lang/compiler.go b/lang/compiler.go index f71e9bed8e..6336b7bbc7 100644 --- a/lang/compiler.go +++ b/lang/compiler.go @@ -6,6 +6,9 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/memory" + "github.com/influxdata/flux/plan" ) const ( @@ -38,8 +41,21 @@ type FluxCompiler struct { Query string `json:"query"` } -func (c FluxCompiler) Compile(ctx context.Context) (*flux.Spec, error) { - return flux.Compile(ctx, c.Query, time.Now()) +func (c FluxCompiler) Compile(ctx context.Context) (flux.Program, error) { + spec, err := flux.Compile(ctx, c.Query, time.Now()) + if err != nil { + return nil, err + } + + planner := (&plan.PlannerBuilder{}).Build() + ps, err := planner.Plan(spec) + if err != nil { + return nil, err + } + + return Program{ + ps: ps, + }, err } func (c FluxCompiler) CompilerType() flux.CompilerType { @@ -51,9 +67,18 @@ type SpecCompiler struct { Spec *flux.Spec `json:"spec"` } -func (c SpecCompiler) Compile(ctx context.Context) (*flux.Spec, error) { - return c.Spec, nil +func (c SpecCompiler) Compile(ctx context.Context) (flux.Program, error) { + planner := (&plan.PlannerBuilder{}).Build() + ps, err := planner.Plan(c.Spec) + if err != nil { + return nil, err + } + + return Program{ + ps: ps, + }, err } + func (c SpecCompiler) CompilerType() flux.CompilerType { return SpecCompilerType } @@ -64,11 +89,24 @@ type ASTCompiler struct { Now time.Time } -func (c ASTCompiler) Compile(ctx context.Context) (*flux.Spec, error) { - if c.Now.IsZero() { - return flux.CompileAST(ctx, c.AST, time.Now()) +func (c ASTCompiler) Compile(ctx context.Context) (flux.Program, error) { + now := c.Now + if now.IsZero() { + now = time.Now() } - return flux.CompileAST(ctx, c.AST, c.Now) + + spec, err := flux.CompileAST(ctx, c.AST, now) + if err != nil { + return Program{}, err + } + + planner := (&plan.PlannerBuilder{}).Build() + ps, err := planner.Plan(spec) + if err != nil { + return Program{}, err + } + + return Program{ps: ps}, err } func (ASTCompiler) CompilerType() flux.CompilerType { @@ -79,3 +117,29 @@ func (ASTCompiler) CompilerType() flux.CompilerType { func (c *ASTCompiler) PrependFile(file *ast.File) { c.AST.Files = append([]*ast.File{file}, c.AST.Files...) } + +// Program implements the flux.Program interface +type Program struct { + deps execute.Dependencies + ps *plan.Spec +} + +func (p Program) Start(ctx context.Context, allocator *memory.Allocator) (flux.Query, error) { + e := execute.NewExecutor(p.deps, nil) + results, _, err := e.Execute(ctx, p.ps, allocator) + if err != nil { + return nil, err + } + + ch := make(chan flux.Result) + go func() { + for _, r := range results { + ch <- r + } + close(ch) + }() + + return &Query{ + ch: ch, + }, nil +} diff --git a/lang/query.go b/lang/query.go new file mode 100644 index 0000000000..6b7a979b4c --- /dev/null +++ b/lang/query.go @@ -0,0 +1,30 @@ +package lang + +import ( + "github.com/influxdata/flux" +) + +// Query implements the flux.Query interface. +type Query struct { + ch chan flux.Result +} + +func (q *Query) Results() <-chan flux.Result { + return q.ch +} + +func (q *Query) Done() { + // consume all remaining elements so channel can be closed + for ok := true; ok == true; _, ok = <-q.ch {} +} + +func (*Query) Cancel() { +} + +func (*Query) Err() error { + return nil +} + +func (*Query) Statistics() flux.Statistics { + return flux.Statistics{} +} diff --git a/query.go b/query.go index 3f9160bb6c..d4e89643eb 100644 --- a/query.go +++ b/query.go @@ -6,14 +6,10 @@ import ( // Query represents an active query. type Query interface { - // Spec returns the spec used to execute this query. - // Spec must not be modified. - Spec() *Spec - - // Ready returns a channel that will deliver the query results. + // Results returns a channel that will deliver the query results. // Its possible that the channel is closed before any results arrive, // in which case the query should be inspected for an error using Err(). - Ready() <-chan map[string]Result + Results() <-chan Result // Done must always be called to free resources. It is safe to call Done // multiple times. diff --git a/result_iterator.go b/result_iterator.go index a10b23bb59..51dbd4da0c 100644 --- a/result_iterator.go +++ b/result_iterator.go @@ -34,7 +34,7 @@ type ResultIterator interface { type queryResultIterator struct { query Query released bool - results ResultIterator + nextResult Result } func NewResultIteratorFromQuery(q Query) ResultIterator { @@ -43,31 +43,58 @@ func NewResultIteratorFromQuery(q Query) ResultIterator { } } +// More returns true iff there is more data to be produced by the iterator. +// More is idempotent---successive calls to More with no intervening call to Next should +// return the same value and leave the iterator in the same state. func (r *queryResultIterator) More() bool { + + // When the return value is true, r.nextResult should be non-nil, and nil otherwise. + if r.released { return false } - if r.results == nil { - results, ok := <-r.query.Ready() - if !ok { - return false - } - r.results = NewMapResultIterator(results) + if r.nextResult != nil { + return true } - return r.results.More() + + nr, ok := <-r.query.Results() + if !ok { + r.nextResult = nil + return false + } + + r.nextResult = nr + return true } +// Next produces the next result. +// If there is no more data, Next panics. +// It is possible to call Next without calling More first (although not recommended). func (r *queryResultIterator) Next() Result { - return r.results.Next() + if r.released { + panic("call to Next() on released iterator") + } + var nr Result + if r.nextResult == nil { + var ok bool + nr, ok = <-r.query.Results() + if !ok { + panic("call to Next() when More() is false") + } + } else { + nr = r.nextResult + } + + r.nextResult = nil + return nr } +// Release frees resources associated with this iterator. func (r *queryResultIterator) Release() { r.query.Done() r.released = true - if r.results != nil { - r.results.Release() - } + r.nextResult = nil // a panic will occur if caller attempts to call Next(). } func (r *queryResultIterator) Err() error { diff --git a/stdlib/flux_test.go b/stdlib/flux_test.go index e20445abbb..a1fd3338db 100644 --- a/stdlib/flux_test.go +++ b/stdlib/flux_test.go @@ -100,7 +100,7 @@ func doTestRun(t testing.TB, querier *querytest.Querier, c flux.Compiler) { t.Fatalf("unexpected error while executing testing.run: %v", err) } defer r.Done() - result, ok := <-r.Ready() + result, ok := <-r.Results() if !ok { t.Fatalf("unexpected error retrieving testing.run result: %s", r.Err()) } @@ -121,7 +121,7 @@ func doTestInspect(t testing.TB, querier *querytest.Querier, c flux.Compiler) { t.Fatalf("unexpected error while executing testing.inspect: %v", err) } defer r.Done() - result, ok := <-r.Ready() + result, ok := <-r.Results() if !ok { t.Fatalf("unexpected error retrieving testing.inspect result: %s", r.Err()) }