Skip to content

Commit

Permalink
feat: add the profiler package (#3129)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanyzhang authored Aug 24, 2020
1 parent 0b17fcf commit 625b6f2
Show file tree
Hide file tree
Showing 11 changed files with 449 additions and 25 deletions.
140 changes: 140 additions & 0 deletions execute/profiler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package execute

import (
"fmt"
"strings"

"github.com/influxdata/flux"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/values"
)

type Profiler interface {
Name() string
GetResult(q flux.Query, alloc *memory.Allocator) (flux.Table, error)
}

var AllProfilers map[string]Profiler = make(map[string]Profiler)

func RegisterProfilers(ps ...Profiler) {
for _, p := range ps {
AllProfilers[p.Name()] = p
}
}

type FluxStatisticsProfiler struct{}

func init() {
RegisterProfilers(FluxStatisticsProfiler{})
}

func (s FluxStatisticsProfiler) Name() string {
return "FluxStatistics"
}

func (s FluxStatisticsProfiler) GetResult(q flux.Query, alloc *memory.Allocator) (flux.Table, error) {
groupKey := NewGroupKey(
[]flux.ColMeta{
{
Label: "_measurement",
Type: flux.TString,
},
},
[]values.Value{
values.NewString("profiler/FluxStatistics"),
},
)
b := NewColListTableBuilder(groupKey, alloc)
stats := q.Statistics()
colMeta := []flux.ColMeta{
{
Label: "_measurement",
Type: flux.TString,
},
{
Label: "TotalDuration",
Type: flux.TInt,
},
{
Label: "CompileDuration",
Type: flux.TInt,
},
{
Label: "QueueDuration",
Type: flux.TInt,
},
{
Label: "PlanDuration",
Type: flux.TInt,
},
{
Label: "RequeueDuration",
Type: flux.TInt,
},
{
Label: "ExecuteDuration",
Type: flux.TInt,
},
{
Label: "Concurrency",
Type: flux.TInt,
},
{
Label: "MaxAllocated",
Type: flux.TInt,
},
{
Label: "TotalAllocated",
Type: flux.TInt,
},
{
Label: "RuntimeErrors",
Type: flux.TString,
},
}
colData := []interface{}{
"profiler/FluxStatistics",
stats.TotalDuration.Nanoseconds(),
stats.CompileDuration.Nanoseconds(),
stats.QueueDuration.Nanoseconds(),
stats.PlanDuration.Nanoseconds(),
stats.RequeueDuration.Nanoseconds(),
stats.ExecuteDuration.Nanoseconds(),
int64(stats.Concurrency),
stats.MaxAllocated,
stats.TotalAllocated,
strings.Join(stats.RuntimeErrors, "\n"),
}
stats.Metadata.Range(func(key string, value interface{}) bool {
var ty flux.ColType
if intValue, ok := value.(int); ok {
ty = flux.TInt
colData = append(colData, int64(intValue))
} else {
ty = flux.TString
colData = append(colData, fmt.Sprintf("%v", value))
}
colMeta = append(colMeta, flux.ColMeta{
Label: key,
Type: ty,
})
return true
})
for _, col := range colMeta {
if _, err := b.AddCol(col); err != nil {
return nil, err
}
}
for i := 0; i < len(colData); i++ {
if intValue, ok := colData[i].(int64); ok {
b.AppendInt(i, intValue)
} else {
b.AppendString(i, colData[i].(string))
}
}
tbl, err := b.Table()
if err != nil {
return nil, err
}
return tbl, nil
}
63 changes: 63 additions & 0 deletions execute/profiler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package execute_test

import (
"io/ioutil"
"strings"
"testing"

"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/flux/execute/table"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/metadata"
"github.com/influxdata/flux/mock"
)

func TestFluxStatisticsProfiler_GetResult(t *testing.T) {
p := &execute.FluxStatisticsProfiler{}
q := &mock.Query{}
q.SetStatistics(flux.Statistics{
TotalDuration: 1,
CompileDuration: 2,
QueueDuration: 3,
PlanDuration: 4,
RequeueDuration: 5,
ExecuteDuration: 6,
Concurrency: 7,
MaxAllocated: 8,
TotalAllocated: 9,
RuntimeErrors: []string{"1", "2"},
Metadata: metadata.Metadata{
"influxdb/scanned-bytes": []interface{}{10},
"influxdb/scanned-values": []interface{}{11},
"flux/query-plan": []interface{}{"query plan"},
},
})
wantStr := `
#datatype,string,long,string,long,long,long,long,long,long,long,long,long,string,string,long,long
#group,false,false,true,false,false,false,false,false,false,false,false,false,false,false,false,false
#default,_profiler,,,,,,,,,,,,,,,
,result,table,_measurement,TotalDuration,CompileDuration,QueueDuration,PlanDuration,RequeueDuration,ExecuteDuration,Concurrency,MaxAllocated,TotalAllocated,RuntimeErrors,flux/query-plan,influxdb/scanned-bytes,influxdb/scanned-values
,,0,profiler/FluxStatistics,1,2,3,4,5,6,7,8,9,"1
2","query plan",10,11
`
q.Done()
tbl, err := p.GetResult(q, &memory.Allocator{})
if err != nil {
t.Error(err)
}
result := table.NewProfilerResult(tbl)
got := flux.NewSliceResultIterator([]flux.Result{&result})
dec := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
want, e := dec.Decode(ioutil.NopCloser(strings.NewReader(wantStr)))
if e != nil {
t.Error(err)
}
defer want.Release()

if err := executetest.EqualResultIterators(want, got); err != nil {
t.Fatal(err)
}
}
21 changes: 21 additions & 0 deletions execute/table/profiler_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package table

import (
"github.com/influxdata/flux"
)

type ProfilerResult struct {
tables Iterator
}

func NewProfilerResult(tables ...flux.Table) ProfilerResult {
return ProfilerResult{tables}
}

func (r *ProfilerResult) Name() string {
return "_profiler"
}

func (r *ProfilerResult) Tables() flux.TableIterator {
return r.tables
}
68 changes: 44 additions & 24 deletions lang/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ type Program struct {
PlanSpec *plan.Spec
Runtime flux.Runtime

opts *compileOptions
opts *compileOptions
Profilers []execute.Profiler
}

func (p *Program) SetLogger(logger *zap.Logger) {
Expand Down Expand Up @@ -424,6 +425,9 @@ func (p *AstProgram) Start(ctx context.Context, alloc *memory.Allocator) (flux.Q
if err := p.updateOpts(scope); err != nil {
return nil, errors.Wrap(err, codes.Inherit, "error in reading options while starting program")
}
if err := p.updateProfilers(scope); err != nil {
return nil, errors.Wrap(err, codes.Inherit, "error in reading profiler settings while starting program")
}
ps, err := buildPlan(cctx, sp, p.opts)
if err != nil {
return nil, errors.Wrap(err, codes.Inherit, "error in building plan while starting program")
Expand All @@ -437,8 +441,40 @@ func (p *AstProgram) Start(ctx context.Context, alloc *memory.Allocator) (flux.Q
return p.Program.Start(cctx, alloc)
}

func (p *AstProgram) updateProfilers(scope values.Scope) error {
pkg, ok := getPackageFromScope("profiler", scope)
if !ok {
return nil
}
if pkg.Type().Nature() != semantic.Object {
// No import for profiler, this is useless.
return nil
}

profilerNames, err := getOptionValues(pkg.Object(), "enabledProfilers")
if err != nil {
return err
}
dedupeMap := make(map[string]bool)
p.Profilers = make([]execute.Profiler, 0)
for _, profilerName := range profilerNames {
if profiler, exists := execute.AllProfilers[profilerName]; !exists {
// profiler does not exist
continue
} else {
if _, exists := dedupeMap[profilerName]; exists {
// Ignore duplicates
continue
}
dedupeMap[profilerName] = true
p.Profilers = append(p.Profilers, profiler)
}
}
return nil
}

func (p *AstProgram) updateOpts(scope values.Scope) error {
pkg, ok := getPlannerPkg(scope)
pkg, ok := getPackageFromScope("planner", scope)
if !ok {
return nil
}
Expand All @@ -455,15 +491,15 @@ func (p *AstProgram) updateOpts(scope values.Scope) error {
return nil
}

func getPlannerPkg(scope values.Scope) (values.Package, bool) {
func getPackageFromScope(pkgName string, scope values.Scope) (values.Package, bool) {
found := false
var foundPkg values.Package
scope.Range(func(k string, v values.Value) {
if found {
return
}
if pkg, ok := v.(values.Package); ok {
if pkg.Name() == "planner" {
if pkg.Name() == pkgName {
found = true
foundPkg = pkg
}
Expand All @@ -478,41 +514,25 @@ func getPlanOptions(plannerPkg values.Package) (plan.LogicalOption, plan.Physica
return nil, nil, nil
}

ls, err := getRules(plannerPkg.Object(), "disableLogicalRules")
ls, err := getOptionValues(plannerPkg.Object(), "disableLogicalRules")
if err != nil {
return nil, nil, err
}
ps, err := getRules(plannerPkg.Object(), "disablePhysicalRules")
ps, err := getOptionValues(plannerPkg.Object(), "disablePhysicalRules")
if err != nil {
return nil, nil, err
}
return plan.RemoveLogicalRules(ls...), plan.RemovePhysicalRules(ps...), nil
}

func getRules(plannerPkg values.Object, optionName string) ([]string, error) {
value, ok := plannerPkg.Get(optionName)
func getOptionValues(pkg values.Object, optionName string) ([]string, error) {
value, ok := pkg.Get(optionName)
if !ok {
// No value in package.
return []string{}, nil
}

// TODO(affo): the rules are arrays of strings as defined in the 'planner' package.
// During evaluation, the interpreter should raise an error if the user tries to assign
// an option of a type to another. So we should be able to rely on the fact that the type
// for value is fixed. At the moment is it not so.
// So, we have to check and return an error to avoid a panic.
// See (https://github.com/influxdata/flux/issues/1829).
if t := value.Type().Nature(); t != semantic.Array {
return nil, fmt.Errorf("'planner.%s' must be an array of string, got %s", optionName, t.String())
}
rules := value.Array()
et, err := rules.Type().ElemType()
if err != nil {
return nil, err
}
if et.Nature() != semantic.String {
return nil, fmt.Errorf("'planner.%s' must be an array of string, got an array of %s", optionName, et.String())
}
noRules := rules.Len()
rs := make([]string, noRules)
rules.Range(func(i int, v values.Value) {
Expand Down
4 changes: 4 additions & 0 deletions lang/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,7 @@ func (q *query) Err() error {
func (q *query) Statistics() flux.Statistics {
return q.stats
}

func (q *query) ProfilerResults() (flux.ResultIterator, error) {
return nil, nil
}
1 change: 1 addition & 0 deletions libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ var sourceHashes = map[string]string{
"stdlib/planner/window_push_test.flux": "2df784f60da893fa0ad0cee12c017343f305bdbc7cf786c986b61add2768e17b",
"stdlib/planner/window_sum_eval_test.flux": "cf202a46c2154eea78a395a969988bd03d3f26182c39aa7906f73071bce5d9ca",
"stdlib/planner/window_sum_push_test.flux": "6df79df4810d81b011defbef6440334e8fb6a4a1b9a71e59f74eebb62644ffe3",
"stdlib/profiler/profiler.flux": "2c716863a6dba86c4071b3d9b4578edc025b63efe4d3b9fd4ed763e65165af8b",
"stdlib/pushbullet/pushbullet.flux": "37342f80bd11f92360be89c1ce85ba662f8171a4c60b7059421e34926e7374f4",
"stdlib/regexp/regexp.flux": "7ca6dc639c7178c6772004a36800d861c5500ca28f7f426144c86735cf4ed4b2",
"stdlib/regexp/replaceAllString_test.flux": "15d2027aa0dd0160adf61ff4952f644fd2ad4b5f2df4c56ff7214f520762e16a",
Expand Down
4 changes: 4 additions & 0 deletions mock/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (q *Query) Statistics() flux.Statistics {
return q.stats
}

func (q *Query) ProfilerResults() (flux.ResultIterator, error) {
return nil, nil
}

// ProduceResults lets the user provide a function to produce results on the channel returned by `Results`.
// `resultProvider` should check if `canceled` has been closed before sending results. E.g.:
// ```
Expand Down
Loading

0 comments on commit 625b6f2

Please sign in to comment.