From a3ae0c0e1ad245a62cbde15962a07614fe446a02 Mon Sep 17 00:00:00 2001 From: "Christopher M. Wolff" Date: Tue, 30 Apr 2019 13:00:36 -0700 Subject: [PATCH] refactor: add runtime errors to flux.Statistics (#1227) Also make the controller append any runtime errors to stats. --- control/controller.go | 54 +++++++++++++++++++++--- control/controller_test.go | 85 ++++++++++++++++++++++++++++++++++++++ query.go | 7 ++++ 3 files changed, 141 insertions(+), 5 deletions(-) diff --git a/control/controller.go b/control/controller.go index e25fc936c7..b953383fc6 100644 --- a/control/controller.go +++ b/control/controller.go @@ -282,6 +282,7 @@ func (c *Controller) executeQuery(q *Query) { q.alloc.Limit = func(v int64) *int64 { return &v }(c.memoryBytesQuotaPerQuery) exec, err := q.program.Start(ctx, q.alloc) if err != nil { + q.addRuntimeError(err) q.setErr(err) return } @@ -362,10 +363,11 @@ type Query struct { c *Controller // query state. The stateMu protects access for the group below. - stateMu sync.RWMutex - state State - err error - cancel func() + stateMu sync.RWMutex + state State + err error + runtimeErrs []error + cancel func() parentCtx context.Context parentSpan, currentSpan *span @@ -420,6 +422,12 @@ func (q *Query) Done() { q.stats.Metadata = stats.Metadata } + errMsgs := make([]string, 0, len(q.runtimeErrs)) + for _, e := range q.runtimeErrs { + errMsgs = append(errMsgs, e.Error()) + } + q.stats.RuntimeErrors = errMsgs + q.transitionTo(Finished) q.c.finish(q) }) @@ -585,6 +593,13 @@ func (q *Query) setErr(err error) { close(q.results) } +func (q *Query) addRuntimeError(e error) { + q.stateMu.Lock() + defer q.stateMu.Unlock() + + q.runtimeErrs = append(q.runtimeErrs, e) +} + // pump will read from the executing query results and pump the // results to our destination. // When there are no more results, then this will close our own @@ -612,9 +627,13 @@ func (q *Query) pump(exec flux.Query, done <-chan struct{}) { // case, but if the query has been canceled or finished with // done, nobody is going to read these values so we need // to avoid blocking. + ecr := &errorCollectingResult{ + Result: res, + q: q, + } select { case <-done: - case q.results <- res: + case q.results <- ecr: } case <-signalCh: // Signal to the underlying executor that the query @@ -657,6 +676,31 @@ func (q *Query) tryExec() (context.Context, bool) { return q.transitionTo(Executing, Queueing) } +type errorCollectingResult struct { + flux.Result + q *Query +} + +func (r *errorCollectingResult) Tables() flux.TableIterator { + return &errorCollectingTableIterator{ + TableIterator: r.Result.Tables(), + q: r.q, + } +} + +type errorCollectingTableIterator struct { + flux.TableIterator + q *Query +} + +func (ti *errorCollectingTableIterator) Do(f func(t flux.Table) error) error { + err := ti.TableIterator.Do(f) + if err != nil { + ti.q.addRuntimeError(err) + } + return err +} + // State is the query state. type State int diff --git a/control/controller_test.go b/control/controller_test.go index b9682ec0eb..821198ace6 100644 --- a/control/controller_test.go +++ b/control/controller_test.go @@ -3,6 +3,7 @@ package control_test import ( "context" "errors" + "strings" "sync" "testing" "time" @@ -10,11 +11,21 @@ import ( "github.com/influxdata/flux" _ "github.com/influxdata/flux/builtin" "github.com/influxdata/flux/control" + "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/lang" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/mock" + "github.com/influxdata/flux/plan" + "github.com/influxdata/flux/plan/plantest" + "github.com/influxdata/flux/stdlib/universe" + "go.uber.org/zap/zaptest" ) +func init() { + execute.RegisterSource(executetest.AllocatingFromTestKind, executetest.CreateAllocatingFromSource) +} + var ( mockCompiler = &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { @@ -147,6 +158,80 @@ func TestController_ExecuteError(t *testing.T) { } } +func TestController_LimitExceededError(t *testing.T) { + const memoryBytesQuotaPerQuery = 64 + config := config + config.MemoryBytesQuotaPerQuery = memoryBytesQuotaPerQuery + ctrl, err := control.New(config) + if err != nil { + t.Fatal(err) + } + defer shutdown(t, ctrl) + + compiler := &mock.Compiler{ + CompileFn: func(ctx context.Context) (flux.Program, error) { + // Return a program that will allocate one more byte than is allowed. + pts := plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreatePhysicalNode("allocating-from-test", &executetest.AllocatingFromProcedureSpec{ + ByteCount: memoryBytesQuotaPerQuery + 1, + }), + plan.CreatePhysicalNode("yield", &universe.YieldProcedureSpec{Name: "_result"}), + }, + Edges: [][2]int{ + {0, 1}, + }, + Resources: flux.ResourceManagement{ + ConcurrencyQuota: 1, + }, + } + + ps := plantest.CreatePlanSpec(&pts) + prog := &lang.Program{ + Logger: zaptest.NewLogger(t), + PlanSpec: ps, + } + + return prog, nil + }, + } + + q, err := ctrl.Query(context.Background(), compiler) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + ri := flux.NewResultIteratorFromQuery(q) + defer ri.Release() + for ri.More() { + res := ri.Next() + err = res.Tables().Do(func(t flux.Table) error { + return nil + }) + if err != nil { + break + } + } + ri.Release() + + if err == nil { + t.Fatal("expected an error") + } + + if !strings.Contains(err.Error(), "memory") { + t.Fatalf("expected an error about memory limit exceeded, got %v", err) + } + + stats := ri.Statistics() + if len(stats.RuntimeErrors) != 1 { + t.Fatal("expected one runtime error reported in stats") + } + + if !strings.Contains(stats.RuntimeErrors[0], "memory") { + t.Fatalf("expected an error about memory limit exceeded, got %v", err) + } +} + func TestController_ShutdownWithRunningQuery(t *testing.T) { ctrl, err := control.New(config) if err != nil { diff --git a/query.go b/query.go index d4e89643eb..541295e2b0 100644 --- a/query.go +++ b/query.go @@ -77,12 +77,18 @@ type Statistics struct { // MaxAllocated is the maximum number of bytes the query allocated. MaxAllocated int64 `json:"max_allocated"` + // RuntimeErrors contains error messages that happened during the execution of the query. + RuntimeErrors []string `json:"runtime_errors"` + // Metadata contains metadata key/value pairs that have been attached during execution. Metadata Metadata `json:"metadata"` } // Add returns the sum of s and other. func (s Statistics) Add(other Statistics) Statistics { + errs := make([]string, len(s.RuntimeErrors), len(s.RuntimeErrors)+len(other.RuntimeErrors)) + copy(errs, s.RuntimeErrors) + errs = append(errs, other.RuntimeErrors...) md := make(Metadata) md.AddAll(s.Metadata) md.AddAll(other.Metadata) @@ -94,6 +100,7 @@ func (s Statistics) Add(other Statistics) Statistics { RequeueDuration: s.RequeueDuration + other.RequeueDuration, ExecuteDuration: s.ExecuteDuration + other.ExecuteDuration, Concurrency: s.Concurrency + other.Concurrency, + RuntimeErrors: errs, MaxAllocated: s.MaxAllocated + other.MaxAllocated, Metadata: md, }