From 625b6f2c15d08ca4c5be7d09fb2b5d8d007273b2 Mon Sep 17 00:00:00 2001 From: "Yiqun (Ethan) Zhang" Date: Mon, 24 Aug 2020 10:14:47 -0500 Subject: [PATCH] feat: add the profiler package (#3129) --- execute/profiler.go | 140 ++++++++++++++++++++++++ execute/profiler_test.go | 63 +++++++++++ execute/table/profiler_result.go | 21 ++++ lang/compiler.go | 68 ++++++++---- lang/query.go | 4 + libflux/go/libflux/buildinfo.gen.go | 1 + mock/query.go | 4 + query.go | 6 +- stdlib/packages.go | 1 + stdlib/profiler/flux_gen.go | 163 ++++++++++++++++++++++++++++ stdlib/profiler/profiler.flux | 3 + 11 files changed, 449 insertions(+), 25 deletions(-) create mode 100644 execute/profiler.go create mode 100644 execute/profiler_test.go create mode 100644 execute/table/profiler_result.go create mode 100644 stdlib/profiler/flux_gen.go create mode 100644 stdlib/profiler/profiler.flux diff --git a/execute/profiler.go b/execute/profiler.go new file mode 100644 index 0000000000..511255e9e8 --- /dev/null +++ b/execute/profiler.go @@ -0,0 +1,140 @@ +package execute + +import ( + "fmt" + "strings" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/memory" + "github.com/influxdata/flux/values" +) + +type Profiler interface { + Name() string + GetResult(q flux.Query, alloc *memory.Allocator) (flux.Table, error) +} + +var AllProfilers map[string]Profiler = make(map[string]Profiler) + +func RegisterProfilers(ps ...Profiler) { + for _, p := range ps { + AllProfilers[p.Name()] = p + } +} + +type FluxStatisticsProfiler struct{} + +func init() { + RegisterProfilers(FluxStatisticsProfiler{}) +} + +func (s FluxStatisticsProfiler) Name() string { + return "FluxStatistics" +} + +func (s FluxStatisticsProfiler) GetResult(q flux.Query, alloc *memory.Allocator) (flux.Table, error) { + groupKey := NewGroupKey( + []flux.ColMeta{ + { + Label: "_measurement", + Type: flux.TString, + }, + }, + []values.Value{ + values.NewString("profiler/FluxStatistics"), + }, + ) + b := NewColListTableBuilder(groupKey, alloc) + stats := q.Statistics() + colMeta := []flux.ColMeta{ + { + Label: "_measurement", + Type: flux.TString, + }, + { + Label: "TotalDuration", + Type: flux.TInt, + }, + { + Label: "CompileDuration", + Type: flux.TInt, + }, + { + Label: "QueueDuration", + Type: flux.TInt, + }, + { + Label: "PlanDuration", + Type: flux.TInt, + }, + { + Label: "RequeueDuration", + Type: flux.TInt, + }, + { + Label: "ExecuteDuration", + Type: flux.TInt, + }, + { + Label: "Concurrency", + Type: flux.TInt, + }, + { + Label: "MaxAllocated", + Type: flux.TInt, + }, + { + Label: "TotalAllocated", + Type: flux.TInt, + }, + { + Label: "RuntimeErrors", + Type: flux.TString, + }, + } + colData := []interface{}{ + "profiler/FluxStatistics", + stats.TotalDuration.Nanoseconds(), + stats.CompileDuration.Nanoseconds(), + stats.QueueDuration.Nanoseconds(), + stats.PlanDuration.Nanoseconds(), + stats.RequeueDuration.Nanoseconds(), + stats.ExecuteDuration.Nanoseconds(), + int64(stats.Concurrency), + stats.MaxAllocated, + stats.TotalAllocated, + strings.Join(stats.RuntimeErrors, "\n"), + } + stats.Metadata.Range(func(key string, value interface{}) bool { + var ty flux.ColType + if intValue, ok := value.(int); ok { + ty = flux.TInt + colData = append(colData, int64(intValue)) + } else { + ty = flux.TString + colData = append(colData, fmt.Sprintf("%v", value)) + } + colMeta = append(colMeta, flux.ColMeta{ + Label: key, + Type: ty, + }) + return true + }) + for _, col := range colMeta { + if _, err := b.AddCol(col); err != nil { + return nil, err + } + } + for i := 0; i < len(colData); i++ { + if intValue, ok := colData[i].(int64); ok { + b.AppendInt(i, intValue) + } else { + b.AppendString(i, colData[i].(string)) + } + } + tbl, err := b.Table() + if err != nil { + return nil, err + } + return tbl, nil +} diff --git a/execute/profiler_test.go b/execute/profiler_test.go new file mode 100644 index 0000000000..806122111e --- /dev/null +++ b/execute/profiler_test.go @@ -0,0 +1,63 @@ +package execute_test + +import ( + "io/ioutil" + "strings" + "testing" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/csv" + "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/flux/execute/table" + "github.com/influxdata/flux/memory" + "github.com/influxdata/flux/metadata" + "github.com/influxdata/flux/mock" +) + +func TestFluxStatisticsProfiler_GetResult(t *testing.T) { + p := &execute.FluxStatisticsProfiler{} + q := &mock.Query{} + q.SetStatistics(flux.Statistics{ + TotalDuration: 1, + CompileDuration: 2, + QueueDuration: 3, + PlanDuration: 4, + RequeueDuration: 5, + ExecuteDuration: 6, + Concurrency: 7, + MaxAllocated: 8, + TotalAllocated: 9, + RuntimeErrors: []string{"1", "2"}, + Metadata: metadata.Metadata{ + "influxdb/scanned-bytes": []interface{}{10}, + "influxdb/scanned-values": []interface{}{11}, + "flux/query-plan": []interface{}{"query plan"}, + }, + }) + wantStr := ` +#datatype,string,long,string,long,long,long,long,long,long,long,long,long,string,string,long,long +#group,false,false,true,false,false,false,false,false,false,false,false,false,false,false,false,false +#default,_profiler,,,,,,,,,,,,,,, +,result,table,_measurement,TotalDuration,CompileDuration,QueueDuration,PlanDuration,RequeueDuration,ExecuteDuration,Concurrency,MaxAllocated,TotalAllocated,RuntimeErrors,flux/query-plan,influxdb/scanned-bytes,influxdb/scanned-values +,,0,profiler/FluxStatistics,1,2,3,4,5,6,7,8,9,"1 +2","query plan",10,11 +` + q.Done() + tbl, err := p.GetResult(q, &memory.Allocator{}) + if err != nil { + t.Error(err) + } + result := table.NewProfilerResult(tbl) + got := flux.NewSliceResultIterator([]flux.Result{&result}) + dec := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{}) + want, e := dec.Decode(ioutil.NopCloser(strings.NewReader(wantStr))) + if e != nil { + t.Error(err) + } + defer want.Release() + + if err := executetest.EqualResultIterators(want, got); err != nil { + t.Fatal(err) + } +} diff --git a/execute/table/profiler_result.go b/execute/table/profiler_result.go new file mode 100644 index 0000000000..5f176e2594 --- /dev/null +++ b/execute/table/profiler_result.go @@ -0,0 +1,21 @@ +package table + +import ( + "github.com/influxdata/flux" +) + +type ProfilerResult struct { + tables Iterator +} + +func NewProfilerResult(tables ...flux.Table) ProfilerResult { + return ProfilerResult{tables} +} + +func (r *ProfilerResult) Name() string { + return "_profiler" +} + +func (r *ProfilerResult) Tables() flux.TableIterator { + return r.tables +} diff --git a/lang/compiler.go b/lang/compiler.go index dc6db3c298..b182f4912e 100644 --- a/lang/compiler.go +++ b/lang/compiler.go @@ -274,7 +274,8 @@ type Program struct { PlanSpec *plan.Spec Runtime flux.Runtime - opts *compileOptions + opts *compileOptions + Profilers []execute.Profiler } func (p *Program) SetLogger(logger *zap.Logger) { @@ -424,6 +425,9 @@ func (p *AstProgram) Start(ctx context.Context, alloc *memory.Allocator) (flux.Q if err := p.updateOpts(scope); err != nil { return nil, errors.Wrap(err, codes.Inherit, "error in reading options while starting program") } + if err := p.updateProfilers(scope); err != nil { + return nil, errors.Wrap(err, codes.Inherit, "error in reading profiler settings while starting program") + } ps, err := buildPlan(cctx, sp, p.opts) if err != nil { return nil, errors.Wrap(err, codes.Inherit, "error in building plan while starting program") @@ -437,8 +441,40 @@ func (p *AstProgram) Start(ctx context.Context, alloc *memory.Allocator) (flux.Q return p.Program.Start(cctx, alloc) } +func (p *AstProgram) updateProfilers(scope values.Scope) error { + pkg, ok := getPackageFromScope("profiler", scope) + if !ok { + return nil + } + if pkg.Type().Nature() != semantic.Object { + // No import for profiler, this is useless. + return nil + } + + profilerNames, err := getOptionValues(pkg.Object(), "enabledProfilers") + if err != nil { + return err + } + dedupeMap := make(map[string]bool) + p.Profilers = make([]execute.Profiler, 0) + for _, profilerName := range profilerNames { + if profiler, exists := execute.AllProfilers[profilerName]; !exists { + // profiler does not exist + continue + } else { + if _, exists := dedupeMap[profilerName]; exists { + // Ignore duplicates + continue + } + dedupeMap[profilerName] = true + p.Profilers = append(p.Profilers, profiler) + } + } + return nil +} + func (p *AstProgram) updateOpts(scope values.Scope) error { - pkg, ok := getPlannerPkg(scope) + pkg, ok := getPackageFromScope("planner", scope) if !ok { return nil } @@ -455,7 +491,7 @@ func (p *AstProgram) updateOpts(scope values.Scope) error { return nil } -func getPlannerPkg(scope values.Scope) (values.Package, bool) { +func getPackageFromScope(pkgName string, scope values.Scope) (values.Package, bool) { found := false var foundPkg values.Package scope.Range(func(k string, v values.Value) { @@ -463,7 +499,7 @@ func getPlannerPkg(scope values.Scope) (values.Package, bool) { return } if pkg, ok := v.(values.Package); ok { - if pkg.Name() == "planner" { + if pkg.Name() == pkgName { found = true foundPkg = pkg } @@ -478,41 +514,25 @@ func getPlanOptions(plannerPkg values.Package) (plan.LogicalOption, plan.Physica return nil, nil, nil } - ls, err := getRules(plannerPkg.Object(), "disableLogicalRules") + ls, err := getOptionValues(plannerPkg.Object(), "disableLogicalRules") if err != nil { return nil, nil, err } - ps, err := getRules(plannerPkg.Object(), "disablePhysicalRules") + ps, err := getOptionValues(plannerPkg.Object(), "disablePhysicalRules") if err != nil { return nil, nil, err } return plan.RemoveLogicalRules(ls...), plan.RemovePhysicalRules(ps...), nil } -func getRules(plannerPkg values.Object, optionName string) ([]string, error) { - value, ok := plannerPkg.Get(optionName) +func getOptionValues(pkg values.Object, optionName string) ([]string, error) { + value, ok := pkg.Get(optionName) if !ok { // No value in package. return []string{}, nil } - // TODO(affo): the rules are arrays of strings as defined in the 'planner' package. - // During evaluation, the interpreter should raise an error if the user tries to assign - // an option of a type to another. So we should be able to rely on the fact that the type - // for value is fixed. At the moment is it not so. - // So, we have to check and return an error to avoid a panic. - // See (https://github.com/influxdata/flux/issues/1829). - if t := value.Type().Nature(); t != semantic.Array { - return nil, fmt.Errorf("'planner.%s' must be an array of string, got %s", optionName, t.String()) - } rules := value.Array() - et, err := rules.Type().ElemType() - if err != nil { - return nil, err - } - if et.Nature() != semantic.String { - return nil, fmt.Errorf("'planner.%s' must be an array of string, got an array of %s", optionName, et.String()) - } noRules := rules.Len() rs := make([]string, noRules) rules.Range(func(i int, v values.Value) { diff --git a/lang/query.go b/lang/query.go index b689b8c89c..d86a72bc69 100644 --- a/lang/query.go +++ b/lang/query.go @@ -45,3 +45,7 @@ func (q *query) Err() error { func (q *query) Statistics() flux.Statistics { return q.stats } + +func (q *query) ProfilerResults() (flux.ResultIterator, error) { + return nil, nil +} diff --git a/libflux/go/libflux/buildinfo.gen.go b/libflux/go/libflux/buildinfo.gen.go index b9278b214c..7f32ac7298 100644 --- a/libflux/go/libflux/buildinfo.gen.go +++ b/libflux/go/libflux/buildinfo.gen.go @@ -221,6 +221,7 @@ var sourceHashes = map[string]string{ "stdlib/planner/window_push_test.flux": "2df784f60da893fa0ad0cee12c017343f305bdbc7cf786c986b61add2768e17b", "stdlib/planner/window_sum_eval_test.flux": "cf202a46c2154eea78a395a969988bd03d3f26182c39aa7906f73071bce5d9ca", "stdlib/planner/window_sum_push_test.flux": "6df79df4810d81b011defbef6440334e8fb6a4a1b9a71e59f74eebb62644ffe3", + "stdlib/profiler/profiler.flux": "2c716863a6dba86c4071b3d9b4578edc025b63efe4d3b9fd4ed763e65165af8b", "stdlib/pushbullet/pushbullet.flux": "37342f80bd11f92360be89c1ce85ba662f8171a4c60b7059421e34926e7374f4", "stdlib/regexp/regexp.flux": "7ca6dc639c7178c6772004a36800d861c5500ca28f7f426144c86735cf4ed4b2", "stdlib/regexp/replaceAllString_test.flux": "15d2027aa0dd0160adf61ff4952f644fd2ad4b5f2df4c56ff7214f520762e16a", diff --git a/mock/query.go b/mock/query.go index 2f4b08e98d..aa643b6a21 100644 --- a/mock/query.go +++ b/mock/query.go @@ -51,6 +51,10 @@ func (q *Query) Statistics() flux.Statistics { return q.stats } +func (q *Query) ProfilerResults() (flux.ResultIterator, error) { + return nil, nil +} + // ProduceResults lets the user provide a function to produce results on the channel returned by `Results`. // `resultProvider` should check if `canceled` has been closed before sending results. E.g.: // ``` diff --git a/query.go b/query.go index a2ed6f2612..da3ddfbff8 100644 --- a/query.go +++ b/query.go @@ -1,8 +1,9 @@ package flux import ( - "github.com/influxdata/flux/metadata" "time" + + "github.com/influxdata/flux/metadata" ) // Query represents an active query. @@ -27,6 +28,9 @@ type Query interface { // Statistics reports the statistics for the query. // The statistics are not complete until Done is called. Statistics() Statistics + + // ProfilerResults returns profiling results for the query + ProfilerResults() (ResultIterator, error) } // Statistics is a collection of statistics about the processing of a query. diff --git a/stdlib/packages.go b/stdlib/packages.go index 7f62ace8e6..3d1ca9f92b 100644 --- a/stdlib/packages.go +++ b/stdlib/packages.go @@ -39,6 +39,7 @@ import ( _ "github.com/influxdata/flux/stdlib/math" _ "github.com/influxdata/flux/stdlib/pagerduty" _ "github.com/influxdata/flux/stdlib/planner" + _ "github.com/influxdata/flux/stdlib/profiler" _ "github.com/influxdata/flux/stdlib/pushbullet" _ "github.com/influxdata/flux/stdlib/regexp" _ "github.com/influxdata/flux/stdlib/runtime" diff --git a/stdlib/profiler/flux_gen.go b/stdlib/profiler/flux_gen.go new file mode 100644 index 0000000000..ac0f7cd5e0 --- /dev/null +++ b/stdlib/profiler/flux_gen.go @@ -0,0 +1,163 @@ +// DO NOT EDIT: This file is autogenerated via the builtin command. + +package profiler + +import ( + ast "github.com/influxdata/flux/ast" + runtime "github.com/influxdata/flux/runtime" +) + +func init() { + runtime.RegisterPackage(pkgAST) +} + +var pkgAST = &ast.Package{ + BaseNode: ast.BaseNode{ + Errors: nil, + Loc: nil, + }, + Files: []*ast.File{&ast.File{ + BaseNode: ast.BaseNode{ + Errors: nil, + Loc: &ast.SourceLocation{ + End: ast.Position{ + Column: 31, + Line: 3, + }, + File: "profiler.flux", + Source: "package profiler\n\noption enabledProfilers = [\"\"]", + Start: ast.Position{ + Column: 1, + Line: 1, + }, + }, + }, + Body: []ast.Statement{&ast.OptionStatement{ + Assignment: &ast.VariableAssignment{ + BaseNode: ast.BaseNode{ + Errors: nil, + Loc: &ast.SourceLocation{ + End: ast.Position{ + Column: 31, + Line: 3, + }, + File: "profiler.flux", + Source: "enabledProfilers = [\"\"]", + Start: ast.Position{ + Column: 8, + Line: 3, + }, + }, + }, + ID: &ast.Identifier{ + BaseNode: ast.BaseNode{ + Errors: nil, + Loc: &ast.SourceLocation{ + End: ast.Position{ + Column: 24, + Line: 3, + }, + File: "profiler.flux", + Source: "enabledProfilers", + Start: ast.Position{ + Column: 8, + Line: 3, + }, + }, + }, + Name: "enabledProfilers", + }, + Init: &ast.ArrayExpression{ + BaseNode: ast.BaseNode{ + Errors: nil, + Loc: &ast.SourceLocation{ + End: ast.Position{ + Column: 31, + Line: 3, + }, + File: "profiler.flux", + Source: "[\"\"]", + Start: ast.Position{ + Column: 27, + Line: 3, + }, + }, + }, + Elements: []ast.Expression{&ast.StringLiteral{ + BaseNode: ast.BaseNode{ + Errors: nil, + Loc: &ast.SourceLocation{ + End: ast.Position{ + Column: 30, + Line: 3, + }, + File: "profiler.flux", + Source: "\"\"", + Start: ast.Position{ + Column: 28, + Line: 3, + }, + }, + }, + Value: "", + }}, + }, + }, + BaseNode: ast.BaseNode{ + Errors: nil, + Loc: &ast.SourceLocation{ + End: ast.Position{ + Column: 31, + Line: 3, + }, + File: "profiler.flux", + Source: "option enabledProfilers = [\"\"]", + Start: ast.Position{ + Column: 1, + Line: 3, + }, + }, + }, + }}, + Imports: nil, + Metadata: "parser-type=rust", + Name: "profiler.flux", + Package: &ast.PackageClause{ + BaseNode: ast.BaseNode{ + Errors: nil, + Loc: &ast.SourceLocation{ + End: ast.Position{ + Column: 17, + Line: 1, + }, + File: "profiler.flux", + Source: "package profiler", + Start: ast.Position{ + Column: 1, + Line: 1, + }, + }, + }, + Name: &ast.Identifier{ + BaseNode: ast.BaseNode{ + Errors: nil, + Loc: &ast.SourceLocation{ + End: ast.Position{ + Column: 17, + Line: 1, + }, + File: "profiler.flux", + Source: "profiler", + Start: ast.Position{ + Column: 9, + Line: 1, + }, + }, + }, + Name: "profiler", + }, + }, + }}, + Package: "profiler", + Path: "profiler", +} diff --git a/stdlib/profiler/profiler.flux b/stdlib/profiler/profiler.flux new file mode 100644 index 0000000000..e8fcf5e2ef --- /dev/null +++ b/stdlib/profiler/profiler.flux @@ -0,0 +1,3 @@ +package profiler + +option enabledProfilers = [""]