Skip to content

Commit

Permalink
refactor: add runtime errors to flux.Statistics (#1227)
Browse files Browse the repository at this point in the history
Also make the controller append any runtime errors to stats.
  • Loading branch information
Christopher M. Wolff authored Apr 30, 2019
1 parent 840ca21 commit a3ae0c0
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 5 deletions.
54 changes: 49 additions & 5 deletions control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func (c *Controller) executeQuery(q *Query) {
q.alloc.Limit = func(v int64) *int64 { return &v }(c.memoryBytesQuotaPerQuery)
exec, err := q.program.Start(ctx, q.alloc)
if err != nil {
q.addRuntimeError(err)
q.setErr(err)
return
}
Expand Down Expand Up @@ -362,10 +363,11 @@ type Query struct {
c *Controller

// query state. The stateMu protects access for the group below.
stateMu sync.RWMutex
state State
err error
cancel func()
stateMu sync.RWMutex
state State
err error
runtimeErrs []error
cancel func()

parentCtx context.Context
parentSpan, currentSpan *span
Expand Down Expand Up @@ -420,6 +422,12 @@ func (q *Query) Done() {
q.stats.Metadata = stats.Metadata
}

errMsgs := make([]string, 0, len(q.runtimeErrs))
for _, e := range q.runtimeErrs {
errMsgs = append(errMsgs, e.Error())
}
q.stats.RuntimeErrors = errMsgs

q.transitionTo(Finished)
q.c.finish(q)
})
Expand Down Expand Up @@ -585,6 +593,13 @@ func (q *Query) setErr(err error) {
close(q.results)
}

func (q *Query) addRuntimeError(e error) {
q.stateMu.Lock()
defer q.stateMu.Unlock()

q.runtimeErrs = append(q.runtimeErrs, e)
}

// pump will read from the executing query results and pump the
// results to our destination.
// When there are no more results, then this will close our own
Expand Down Expand Up @@ -612,9 +627,13 @@ func (q *Query) pump(exec flux.Query, done <-chan struct{}) {
// case, but if the query has been canceled or finished with
// done, nobody is going to read these values so we need
// to avoid blocking.
ecr := &errorCollectingResult{
Result: res,
q: q,
}
select {
case <-done:
case q.results <- res:
case q.results <- ecr:
}
case <-signalCh:
// Signal to the underlying executor that the query
Expand Down Expand Up @@ -657,6 +676,31 @@ func (q *Query) tryExec() (context.Context, bool) {
return q.transitionTo(Executing, Queueing)
}

type errorCollectingResult struct {
flux.Result
q *Query
}

func (r *errorCollectingResult) Tables() flux.TableIterator {
return &errorCollectingTableIterator{
TableIterator: r.Result.Tables(),
q: r.q,
}
}

type errorCollectingTableIterator struct {
flux.TableIterator
q *Query
}

func (ti *errorCollectingTableIterator) Do(f func(t flux.Table) error) error {
err := ti.TableIterator.Do(f)
if err != nil {
ti.q.addRuntimeError(err)
}
return err
}

// State is the query state.
type State int

Expand Down
85 changes: 85 additions & 0 deletions control/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,29 @@ package control_test
import (
"context"
"errors"
"strings"
"sync"
"testing"
"time"

"github.com/influxdata/flux"
_ "github.com/influxdata/flux/builtin"
"github.com/influxdata/flux/control"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/mock"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/plan/plantest"
"github.com/influxdata/flux/stdlib/universe"
"go.uber.org/zap/zaptest"
)

func init() {
execute.RegisterSource(executetest.AllocatingFromTestKind, executetest.CreateAllocatingFromSource)
}

var (
mockCompiler = &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
Expand Down Expand Up @@ -147,6 +158,80 @@ func TestController_ExecuteError(t *testing.T) {
}
}

func TestController_LimitExceededError(t *testing.T) {
const memoryBytesQuotaPerQuery = 64
config := config
config.MemoryBytesQuotaPerQuery = memoryBytesQuotaPerQuery
ctrl, err := control.New(config)
if err != nil {
t.Fatal(err)
}
defer shutdown(t, ctrl)

compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
// Return a program that will allocate one more byte than is allowed.
pts := plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("allocating-from-test", &executetest.AllocatingFromProcedureSpec{
ByteCount: memoryBytesQuotaPerQuery + 1,
}),
plan.CreatePhysicalNode("yield", &universe.YieldProcedureSpec{Name: "_result"}),
},
Edges: [][2]int{
{0, 1},
},
Resources: flux.ResourceManagement{
ConcurrencyQuota: 1,
},
}

ps := plantest.CreatePlanSpec(&pts)
prog := &lang.Program{
Logger: zaptest.NewLogger(t),
PlanSpec: ps,
}

return prog, nil
},
}

q, err := ctrl.Query(context.Background(), compiler)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

ri := flux.NewResultIteratorFromQuery(q)
defer ri.Release()
for ri.More() {
res := ri.Next()
err = res.Tables().Do(func(t flux.Table) error {
return nil
})
if err != nil {
break
}
}
ri.Release()

if err == nil {
t.Fatal("expected an error")
}

if !strings.Contains(err.Error(), "memory") {
t.Fatalf("expected an error about memory limit exceeded, got %v", err)
}

stats := ri.Statistics()
if len(stats.RuntimeErrors) != 1 {
t.Fatal("expected one runtime error reported in stats")
}

if !strings.Contains(stats.RuntimeErrors[0], "memory") {
t.Fatalf("expected an error about memory limit exceeded, got %v", err)
}
}

func TestController_ShutdownWithRunningQuery(t *testing.T) {
ctrl, err := control.New(config)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,18 @@ type Statistics struct {
// MaxAllocated is the maximum number of bytes the query allocated.
MaxAllocated int64 `json:"max_allocated"`

// RuntimeErrors contains error messages that happened during the execution of the query.
RuntimeErrors []string `json:"runtime_errors"`

// Metadata contains metadata key/value pairs that have been attached during execution.
Metadata Metadata `json:"metadata"`
}

// Add returns the sum of s and other.
func (s Statistics) Add(other Statistics) Statistics {
errs := make([]string, len(s.RuntimeErrors), len(s.RuntimeErrors)+len(other.RuntimeErrors))
copy(errs, s.RuntimeErrors)
errs = append(errs, other.RuntimeErrors...)
md := make(Metadata)
md.AddAll(s.Metadata)
md.AddAll(other.Metadata)
Expand All @@ -94,6 +100,7 @@ func (s Statistics) Add(other Statistics) Statistics {
RequeueDuration: s.RequeueDuration + other.RequeueDuration,
ExecuteDuration: s.ExecuteDuration + other.ExecuteDuration,
Concurrency: s.Concurrency + other.Concurrency,
RuntimeErrors: errs,
MaxAllocated: s.MaxAllocated + other.MaxAllocated,
Metadata: md,
}
Expand Down

0 comments on commit a3ae0c0

Please sign in to comment.