Skip to content

Commit

Permalink
feat(execute): refactor the operator profile to be in the statistics (#…
Browse files Browse the repository at this point in the history
…4791)

The operator profile is now always computed and it is part of the
`flux.Statistics` struct that is returned by the query. The operator
profile now uses that to return the profile instead of using Go channels
and the execution dependency injected into the context.

This allows the results of the operator profile to be present without it
being tied directly to the profiler return.

It has also been refactored to remove the use of channels for sending
span data and aggregating in a goroutine to instead just aggregate the
results as part of the source/transport code. This should simplify and
speed up those calculations as the overall time should be equivalent but
without the overhead of Go channels and only requiring a short-lived
lock for appending the source profiles in the executor.
  • Loading branch information
jsternberg authored May 26, 2022
1 parent 6acd449 commit b66bfb9
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 208 deletions.
59 changes: 43 additions & 16 deletions execute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/metadata"
"github.com/influxdata/flux/plan"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
Expand All @@ -26,7 +25,7 @@ type Executor interface {
// may return zero or more values. The returned channel must not require itself to
// be read so the executor must allocate enough space in the channel so if the channel
// is unread that it will not block.
Execute(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan metadata.Metadata, error)
Execute(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan flux.Statistics, error)
}

type executor struct {
Expand Down Expand Up @@ -62,21 +61,21 @@ type executionState struct {

results map[string]flux.Result
sources []Source
metaCh chan metadata.Metadata
statsCh chan flux.Statistics

transports []AsyncTransport

dispatcher *poolDispatcher
logger *zap.Logger
}

func (e *executor) Execute(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan metadata.Metadata, error) {
func (e *executor) Execute(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan flux.Statistics, error) {
es, err := e.createExecutionState(ctx, p, a)
if err != nil {
return nil, nil, errors.Wrap(err, codes.Inherit, "failed to initialize execute state")
}
es.do()
return es.results, es.metaCh, nil
return es.results, es.statsCh, nil
}

func (e *executor) createExecutionState(ctx context.Context, p *plan.Spec, a memory.Allocator) (*executionState, error) {
Expand All @@ -101,10 +100,9 @@ func (e *executor) createExecutionState(ctx context.Context, p *plan.Spec, a mem
return nil, err
}

// Only sources can be a MetadataNode at the moment so allocate enough
// space for all of them to report metadata. Not all of them will necessarily
// report metadata.
es.metaCh = make(chan metadata.Metadata, len(es.sources))
// Only one statistics struct will be sent. Allocate space for it
// so we don't block on its creation.
es.statsCh = make(chan flux.Statistics, 1)

// Choose some default resource limits based on execution options, if necessary.
es.chooseDefaultResources(ctx, p)
Expand Down Expand Up @@ -424,17 +422,30 @@ func (es *executionState) abort(err error) {
}

func (es *executionState) do() {
var wg sync.WaitGroup
var (
wg sync.WaitGroup
stats flux.Statistics
statsMu sync.Mutex
)

updateStats := func(fn func(stats *flux.Statistics)) {
statsMu.Lock()
defer statsMu.Unlock()
fn(&stats)
}

for _, src := range es.sources {
wg.Add(1)
go func(src Source) {
ctx := es.ctx
opName := reflect.TypeOf(src).String()

// If operator profiling is enabled for this execution, begin profiling
if opState := NewOperatorProfilingState(ctx, opName, src.Label()); opState != nil {
defer opState.Finish()
profile := flux.TransportProfile{
NodeType: opName,
Label: src.Label(),
}
profileSpan := profile.StartSpan()

if span, spanCtx := opentracing.StartSpanFromContext(ctx, opName, opentracing.Tag{Key: "label", Value: src.Label()}); span != nil {
ctx = spanCtx
Expand All @@ -446,22 +457,32 @@ func (es *executionState) do() {
// Setup panic handling on the source goroutines
defer es.recover()
src.Run(ctx)
profileSpan.Finish()

if mdn, ok := src.(MetadataNode); ok {
es.metaCh <- mdn.Metadata()
}
updateStats(func(stats *flux.Statistics) {
stats.Profiles = append(stats.Profiles, profile)
if mdn, ok := src.(MetadataNode); ok {
stats.Metadata.AddAll(mdn.Metadata())
}
})
}(src)
}

wg.Add(1)
es.dispatcher.Start(es.resources.ConcurrencyQuota, es.ctx)

// Keep the transport profiles in a separate array from the source profiles.
// This ensures that sources are before transformations.
profiles := make([]flux.TransportProfile, 0, len(es.transports))
go func() {
defer wg.Done()

// Wait for all transports to finish
for _, t := range es.transports {
select {
case <-t.Finished():
tp := t.TransportProfile()
profiles = append(profiles, tp)
case <-es.ctx.Done():
es.abort(es.ctx.Err())
case err := <-es.dispatcher.Err():
Expand All @@ -478,8 +499,14 @@ func (es *executionState) do() {
}()

go func() {
defer close(es.metaCh)
defer close(es.statsCh)
wg.Wait()

// Merge the transport profiles in with the ones already filled
// by the sources.
stats.Profiles = append(stats.Profiles, profiles...)

es.statsCh <- stats
}()
}

Expand Down
150 changes: 15 additions & 135 deletions execute/profiler.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package execute

import (
"context"
"fmt"
"strings"
"time"

"github.com/influxdata/flux"
"github.com/influxdata/flux/memory"
Expand Down Expand Up @@ -35,111 +33,19 @@ func init() {
)
}

type OperatorProfilingResult struct {
Type string
// Those labels are actually their operation name. See flux/internal/spec.buildSpec.
// Some examples are:
// merged_fromRemote_range1_filter2_filter3_filter4, window5, window8, generated_yield, etc.
Label string
Start time.Time
Stop time.Time
}

type OperatorProfilingState struct {
profiler *OperatorProfiler
Result OperatorProfilingResult
}

func (t *OperatorProfilingState) Finish() {
t.FinishWithTime(time.Now())
}

func (t *OperatorProfilingState) FinishWithTime(finishTime time.Time) {
t.Result.Stop = finishTime
if t.profiler != nil && t.profiler.chIn != nil {
t.profiler.chIn <- t.Result
}
}

type operatorProfilingResultAggregate struct {
operationType string
label string
resultCount int64
resultMin int64
resultMax int64
resultSum int64
resultMean float64
}

type operatorProfilerLabelGroup = map[string]*operatorProfilingResultAggregate
type operatorProfilerTypeGroup = map[string]operatorProfilerLabelGroup

type OperatorProfiler struct {
// Receive the profiling results from the operator states.
chIn chan OperatorProfilingResult
chOut chan operatorProfilingResultAggregate
}
type OperatorProfiler struct{}

func createOperatorProfiler() Profiler {
p := &OperatorProfiler{
chIn: make(chan OperatorProfilingResult),
chOut: make(chan operatorProfilingResultAggregate),
}
go func(p *OperatorProfiler) {
aggs := make(operatorProfilerTypeGroup)
for result := range p.chIn {
_, ok := aggs[result.Type]
if !ok {
aggs[result.Type] = make(operatorProfilerLabelGroup)
}
_, ok = aggs[result.Type][result.Label]
if !ok {
aggs[result.Type][result.Label] = &operatorProfilingResultAggregate{}
}
a := aggs[result.Type][result.Label]

// Aggregate the results
a.resultCount++
duration := result.Stop.Sub(result.Start).Nanoseconds()
if duration > a.resultMax {
a.resultMax = duration
}
if duration < a.resultMin || a.resultMin == 0 {
a.resultMin = duration
}
a.resultSum += duration
}

// Write the aggregated results to chOut, where they'll be
// converted into rows and appended to the final table
for typ, labels := range aggs {
for label, agg := range labels {
agg.resultMean = float64(agg.resultSum) / float64(agg.resultCount)
agg.operationType = typ
agg.label = label
p.chOut <- *agg
}
}
close(p.chOut)
}(p)

return p
return &OperatorProfiler{}
}

func (o *OperatorProfiler) Name() string {
return "operator"
}

func (o *OperatorProfiler) closeIncomingChannel() {
if o.chIn != nil {
close(o.chIn)
o.chIn = nil
}
}

func (o *OperatorProfiler) GetResult(q flux.Query, alloc memory.Allocator) (flux.Table, error) {
o.closeIncomingChannel()
b, err := o.getTableBuilder(alloc)
stats := q.Statistics()
b, err := o.getTableBuilder(stats, alloc)
if err != nil {
return nil, err
}
Expand All @@ -154,8 +60,8 @@ func (o *OperatorProfiler) GetResult(q flux.Query, alloc memory.Allocator) (flux
// on the ColListTableBuilder to make testing easier.
// sortKeys and desc are passed directly into the Sort() call
func (o *OperatorProfiler) GetSortedResult(q flux.Query, alloc memory.Allocator, desc bool, sortKeys ...string) (flux.Table, error) {
o.closeIncomingChannel()
b, err := o.getTableBuilder(alloc)
stats := q.Statistics()
b, err := o.getTableBuilder(stats, alloc)
if err != nil {
return nil, err
}
Expand All @@ -167,7 +73,7 @@ func (o *OperatorProfiler) GetSortedResult(q flux.Query, alloc memory.Allocator,
return tbl, nil
}

func (o *OperatorProfiler) getTableBuilder(alloc memory.Allocator) (*ColListTableBuilder, error) {
func (o *OperatorProfiler) getTableBuilder(stats flux.Statistics, alloc memory.Allocator) (*ColListTableBuilder, error) {
groupKey := NewGroupKey(
[]flux.ColMeta{
{
Expand Down Expand Up @@ -220,45 +126,19 @@ func (o *OperatorProfiler) getTableBuilder(alloc memory.Allocator) (*ColListTabl
}
}

for agg := range o.chOut {
for _, profile := range stats.Profiles {
b.AppendString(0, "profiler/operator")
b.AppendString(1, agg.operationType)
b.AppendString(2, agg.label)
b.AppendInt(3, agg.resultCount)
b.AppendInt(4, agg.resultMin)
b.AppendInt(5, agg.resultMax)
b.AppendInt(6, agg.resultSum)
b.AppendFloat(7, agg.resultMean)
b.AppendString(1, profile.NodeType)
b.AppendString(2, profile.Label)
b.AppendInt(3, profile.Count)
b.AppendInt(4, profile.Min)
b.AppendInt(5, profile.Max)
b.AppendInt(6, profile.Sum)
b.AppendFloat(7, profile.Mean)
}
return b, nil
}

// NewOperatorProfilingState creates a new state instance for the given operation name
// and label, provided an operator profiler exists in the execution options.
// If there is no operator profiler, this function returns nil.
func NewOperatorProfilingState(ctx context.Context, operationName string, label string, start ...time.Time) *OperatorProfilingState {
opStart := time.Now()
if len(start) > 0 {
opStart = start[0]
}
var state *OperatorProfilingState
if HaveExecutionDependencies(ctx) {
deps := GetExecutionDependencies(ctx)
if deps.ExecutionOptions.OperatorProfiler != nil {
tfp := deps.ExecutionOptions.OperatorProfiler
state = &OperatorProfilingState{
profiler: tfp,
Result: OperatorProfilingResult{
Type: operationName,
Label: label,
Start: opStart,
},
}
}
}
return state
}

type QueryProfiler struct{}

func createQueryProfiler() Profiler {
Expand Down
Loading

0 comments on commit b66bfb9

Please sign in to comment.