Skip to content

Commit

Permalink
refactor(lang): Update Compiler interface to produce a Program (#1095)
Browse files Browse the repository at this point in the history
Fixes #1086.
  • Loading branch information
Christopher M. Wolff committed Apr 12, 2019
1 parent 90ce58c commit 0e244f9
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 87 deletions.
2 changes: 1 addition & 1 deletion cmd/flux/cmd/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
var replCmd = &cobra.Command{
Use: "repl",
Short: "Launch a Flux REPL",
Long: "Launch a Flux REPL (Run-Execute-Print-Loop)",
Long: "Launch a Flux REPL (Read-Eval-Print-Loop)",
Run: func(cmd *cobra.Command, args []string) {
q := NewQuerier()
r := repl.New(q)
Expand Down
12 changes: 11 additions & 1 deletion compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package flux
import (
"context"
"fmt"

"github.com/influxdata/flux/memory"
)

// Compiler produces a specification for the query.
type Compiler interface {
// Compile produces a specification for the query.
Compile(ctx context.Context) (*Spec, error)
Compile(ctx context.Context) (Program, error)
CompilerType() CompilerType
}

Expand All @@ -24,3 +26,11 @@ func (m CompilerMappings) Add(t CompilerType, c CreateCompiler) error {
m[t] = c
return nil
}

// Program defines a Flux script which has been compiled.
type Program interface {
// Start begins execution of the program and returns immediately.
// As results are produced they arrive on the channel.
// The program is finished once the result channel is closed and all results have been consumed.
Start(context.Context, *memory.Allocator) (Query, error)
}
54 changes: 8 additions & 46 deletions control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,13 @@ func New(c Config) *Controller {
// Query submits a query for execution returning immediately.
// Done must be called on any returned Query objects.
func (c *Controller) Query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) {
q := c.createQuery(ctx, compiler.CompilerType())
if err := c.compileQuery(q, compiler); err != nil {
q.setErr(err)
c.countQueryRequest(q, labelCompileError)
return nil, q.Err()
}
if err := c.enqueueQuery(q); err != nil {
q.setErr(err)
c.countQueryRequest(q, labelQueueError)
return nil, q.Err()
prog, err := compiler.Compile(ctx)
if err != nil {
return nil, err
}
c.countQueryRequest(q, labelSuccess)
return q, nil

a := new(memory.Allocator)
return prog.Start(ctx, a)
}

type Stringer interface {
Expand Down Expand Up @@ -171,38 +165,6 @@ func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) *Que
}

func (c *Controller) compileQuery(q *Query, compiler flux.Compiler) error {
if !q.tryCompile() {
return errors.New("failed to transition query to compiling state")
}
spec, err := compiler.Compile(q.currentCtx)
if err != nil {
return errors.Wrap(err, "failed to compile query")
}

// Incoming query spec may have been produced by an entity other than the
// Flux interpreter, so we must set the default Now time if not already set.
if spec.Now.IsZero() {
spec.Now = q.now
}

q.spec = *spec

if q.tryPlan() {
// Plan query to determine needed resources
p, err := c.planner.Plan(&q.spec)
if err != nil {
return errors.Wrap(err, "failed to plan query")
}
q.plan = p
q.concurrency = p.Resources.ConcurrencyQuota
if q.concurrency > c.maxConcurrency {
q.concurrency = c.maxConcurrency
}
q.memory = p.Resources.MemoryBytesQuota
if entry := c.logger.Check(zapcore.DebugLevel, "physical plan"); entry != nil {
entry.Write(zap.String("plan", fmt.Sprint(plan.Formatted(q.plan))))
}
}
return nil
}

Expand Down Expand Up @@ -507,7 +469,7 @@ func (q *Query) Cancel() {
q.cancel()
}

// Ready returns a channel that will deliver the query results.
// Results returns a channel that will deliver the query results.
//
// It's possible that the channel is closed before any results arrive.
// In particular, if a query's context or the query itself is canceled,
Expand All @@ -522,7 +484,7 @@ func (q *Query) Ready() <-chan map[string]flux.Result {
// Done signals to the Controller that this query is no longer
// being used and resources related to the query may be freed.
//
// The Ready method must have returned a result before calling
// The Results method must have returned a result before calling
// this method either by the query executing, being canceled, or
// an error occurring.
func (q *Query) Done() {
Expand Down
20 changes: 10 additions & 10 deletions control/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestController_ExecuteQuery_Failure(t *testing.T) {
}

// We do not care about the results, just that the query is ready.
<-q.Ready()
<-q.Results()

if err := q.Err(); err == nil {
t.Fatal("expected error")
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestController_CancelQuery_Ready(t *testing.T) {
}

// We do not care about the results, just that the query is ready.
<-q.Ready()
<-q.Results()

// Cancel the query. This is after the executor has already run,
// but before we finalize the query. This ensures that canceling
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestController_CancelQuery_Execute(t *testing.T) {
// We should not receive any results as the cancellation should
// have signaled to the executor to cancel the query.
select {
case <-q.Ready():
case <-q.Results():
// The execute function should have received the cancel signal and exited
// with an error.
case <-ctx.Done():
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestController_CancelQuery_Concurrent(t *testing.T) {
// change.
query := q
doneWaitGroup.Do(func() error {
<-query.Ready()
<-query.Results()
query.Done()
return nil
})
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestController_BlockedExecutor(t *testing.T) {
}
defer func() {
close(done)
<-q.Ready()
<-q.Results()
q.Done()
}()

Expand Down Expand Up @@ -494,8 +494,8 @@ func TestController_CancelledContextPropagatesToExecutor(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %s", err)
}
// Ready will unblock when executor unblocks
<-q.Ready()
// Results will unblock when executor unblocks
<-q.Results()
// TODO(jlapacik): query should expose error if cancelled during execution
// if q.Err() == nil {
// t.Errorf("expected error; cancelled query context before execution finished")
Expand Down Expand Up @@ -536,7 +536,7 @@ func TestController_Shutdown(t *testing.T) {
ctrl := New(Config{})
ctrl.executor = executor

// Create a bunch of queries and never call Ready which should leave them in the controller.
// Create a bunch of queries and never call Results which should leave them in the controller.
queries := make([]flux.Query, 0, 15)
for i := 0; i < 10; i++ {
q, err := ctrl.Query(context.Background(), mockCompiler)
Expand Down Expand Up @@ -593,7 +593,7 @@ func TestController_Shutdown(t *testing.T) {
for _, q := range queries {
q := q
wg.Do(func() error {
<-q.Ready()
<-q.Results()
q.Done()
return nil
})
Expand Down Expand Up @@ -645,7 +645,7 @@ func TestController_Statistics(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}

<-q.Ready()
<-q.Results()
time.Sleep(time.Millisecond)
q.Done()

Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/refactortests/cmd/refactortests.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func executeScript(pkg *ast.Package) (string, string, error) {
return "", "", errors.Wrap(err, "error during compilation, check your script and retry")
}
defer r.Done()
results, ok := <-r.Ready()
results, ok := <-r.Results()
if !ok {
return "", "", errors.Wrap(r.Err(), "error retrieving query result")
}
Expand Down
80 changes: 72 additions & 8 deletions lang/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (

"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
)

const (
Expand Down Expand Up @@ -38,8 +41,21 @@ type FluxCompiler struct {
Query string `json:"query"`
}

func (c FluxCompiler) Compile(ctx context.Context) (*flux.Spec, error) {
return flux.Compile(ctx, c.Query, time.Now())
func (c FluxCompiler) Compile(ctx context.Context) (flux.Program, error) {
spec, err := flux.Compile(ctx, c.Query, time.Now())
if err != nil {
return nil, err
}

planner := (&plan.PlannerBuilder{}).Build()
ps, err := planner.Plan(spec)
if err != nil {
return nil, err
}

return Program{
ps: ps,
}, err
}

func (c FluxCompiler) CompilerType() flux.CompilerType {
Expand All @@ -51,9 +67,18 @@ type SpecCompiler struct {
Spec *flux.Spec `json:"spec"`
}

func (c SpecCompiler) Compile(ctx context.Context) (*flux.Spec, error) {
return c.Spec, nil
func (c SpecCompiler) Compile(ctx context.Context) (flux.Program, error) {
planner := (&plan.PlannerBuilder{}).Build()
ps, err := planner.Plan(c.Spec)
if err != nil {
return nil, err
}

return Program{
ps: ps,
}, err
}

func (c SpecCompiler) CompilerType() flux.CompilerType {
return SpecCompilerType
}
Expand All @@ -64,11 +89,24 @@ type ASTCompiler struct {
Now time.Time
}

func (c ASTCompiler) Compile(ctx context.Context) (*flux.Spec, error) {
if c.Now.IsZero() {
return flux.CompileAST(ctx, c.AST, time.Now())
func (c ASTCompiler) Compile(ctx context.Context) (flux.Program, error) {
now := c.Now
if now.IsZero() {
now = time.Now()
}
return flux.CompileAST(ctx, c.AST, c.Now)

spec, err := flux.CompileAST(ctx, c.AST, now)
if err != nil {
return Program{}, err
}

planner := (&plan.PlannerBuilder{}).Build()
ps, err := planner.Plan(spec)
if err != nil {
return Program{}, err
}

return Program{ps: ps}, err
}

func (ASTCompiler) CompilerType() flux.CompilerType {
Expand All @@ -79,3 +117,29 @@ func (ASTCompiler) CompilerType() flux.CompilerType {
func (c *ASTCompiler) PrependFile(file *ast.File) {
c.AST.Files = append([]*ast.File{file}, c.AST.Files...)
}

// Program implements the flux.Program interface
type Program struct {
deps execute.Dependencies
ps *plan.Spec
}

func (p Program) Start(ctx context.Context, allocator *memory.Allocator) (flux.Query, error) {
e := execute.NewExecutor(p.deps, nil)
results, _, err := e.Execute(ctx, p.ps, allocator)
if err != nil {
return nil, err
}

ch := make(chan flux.Result)
go func() {
for _, r := range results {
ch <- r
}
close(ch)
}()

return &Query{
ch: ch,
}, nil
}
30 changes: 30 additions & 0 deletions lang/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package lang

import (
"github.com/influxdata/flux"
)

// Query implements the flux.Query interface.
type Query struct {
ch chan flux.Result
}

func (q *Query) Results() <-chan flux.Result {
return q.ch
}

func (q *Query) Done() {
// consume all remaining elements so channel can be closed
for ok := true; ok == true; _, ok = <-q.ch {}
}

func (*Query) Cancel() {
}

func (*Query) Err() error {
return nil
}

func (*Query) Statistics() flux.Statistics {
return flux.Statistics{}
}
8 changes: 2 additions & 6 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ import (

// Query represents an active query.
type Query interface {
// Spec returns the spec used to execute this query.
// Spec must not be modified.
Spec() *Spec

// Ready returns a channel that will deliver the query results.
// Results returns a channel that will deliver the query results.
// Its possible that the channel is closed before any results arrive,
// in which case the query should be inspected for an error using Err().
Ready() <-chan map[string]Result
Results() <-chan Result

// Done must always be called to free resources. It is safe to call Done
// multiple times.
Expand Down
Loading

0 comments on commit 0e244f9

Please sign in to comment.