Skip to content

Commit

Permalink
Block profile (#1301)
Browse files Browse the repository at this point in the history
* add flag -dev.blockprofile

* reduce cyclomatic complexity by extracting profiling

---------

Co-authored-by: Haris Osmanagić <[email protected]>
  • Loading branch information
lovromazgon and hariso authored Dec 5, 2023
1 parent e666f44 commit 96481ff
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 29 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,7 @@ dist/
/conduit.yaml

# Escape Analysis Report
escape_analysis.txt
escape_analysis.txt

# Profiles
*.prof
5 changes: 3 additions & 2 deletions pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ type Config struct {
ProcessorBuilderRegistry *processor.BuilderRegistry

dev struct {
cpuprofile string
memprofile string
cpuprofile string
memprofile string
blockprofile string
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/conduit/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet {
showDevHelp := flags.Bool("dev", false, "used together with the dev flag it shows dev flags")
flags.StringVar(&cfg.dev.cpuprofile, "dev.cpuprofile", "", "write cpu profile to file")
flags.StringVar(&cfg.dev.memprofile, "dev.memprofile", "", "write memory profile to file")
flags.StringVar(&cfg.dev.blockprofile, "dev.blockprofile", "", "write block profile to file")

// show user or dev flags
flags.Usage = func() {
Expand Down
101 changes: 75 additions & 26 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,33 +201,13 @@ func newServices(
// HTTP APIs. This function blocks until the supplied context is cancelled or
// one of the services experiences a fatal error.
func (r *Runtime) Run(ctx context.Context) (err error) {
t, ctx := tomb.WithContext(ctx)

if r.Config.dev.cpuprofile != "" {
f, err := os.Create(r.Config.dev.cpuprofile)
if err != nil {
return cerrors.Errorf("could not create CPU profile: %w", err)
}
defer f.Close()
if err := pprof.StartCPUProfile(f); err != nil {
return cerrors.Errorf("could not start CPU profile: %w", err)
}
defer pprof.StopCPUProfile()
}
if r.Config.dev.memprofile != "" {
defer func() {
f, err := os.Create(r.Config.dev.memprofile)
if err != nil {
r.logger.Err(ctx, err).Msg("could not create memory profile")
return
}
defer f.Close()
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
r.logger.Err(ctx, err).Msg("could not write memory profile")
}
}()
cleanup, err := r.initProfiling(ctx)
if err != nil {
return err
}
defer cleanup()

t, ctx := tomb.WithContext(ctx)

defer func() {
if err != nil {
Expand Down Expand Up @@ -314,6 +294,75 @@ func (r *Runtime) Run(ctx context.Context) (err error) {
return nil
}

func (r *Runtime) initProfiling(ctx context.Context) (deferred func(), err error) {
deferred = func() {}

// deferFunc adds the func into deferred so it can be executed by the caller
// in a defer statement
deferFunc := func(f func()) {
oldDeferred := deferred
deferred = func() {
oldDeferred()
f()
}
}
// ignoreErr returns a function that executes f and ignores the returned error
ignoreErr := func(f func() error) func() {
return func() {
_ = f() // ignore error
}
}
defer func() {
if err != nil {
// on error we make sure deferred functions are executed and return
// an empty function as deferred instead
deferred()
deferred = func() {}
}
}()

if r.Config.dev.cpuprofile != "" {
f, err := os.Create(r.Config.dev.cpuprofile)
if err != nil {
return deferred, cerrors.Errorf("could not create CPU profile: %w", err)
}
deferFunc(ignoreErr(f.Close))
if err := pprof.StartCPUProfile(f); err != nil {
return deferred, cerrors.Errorf("could not start CPU profile: %w", err)
}
deferFunc(pprof.StopCPUProfile)
}
if r.Config.dev.memprofile != "" {
deferFunc(func() {
f, err := os.Create(r.Config.dev.memprofile)
if err != nil {
r.logger.Err(ctx, err).Msg("could not create memory profile")
return
}
defer f.Close()
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
r.logger.Err(ctx, err).Msg("could not write memory profile")
}
})
}
if r.Config.dev.blockprofile != "" {
runtime.SetBlockProfileRate(1)
deferFunc(func() {
f, err := os.Create(r.Config.dev.blockprofile)
if err != nil {
r.logger.Err(ctx, err).Msg("could not create block profile")
return
}
defer f.Close()
if err := pprof.Lookup("block").WriteTo(f, 0); err != nil {
r.logger.Err(ctx, err).Msg("could not write block profile")
}
})
}
return
}

func (r *Runtime) registerCleanup(t *tomb.Tomb) {
t.Go(func() error {
<-t.Dying()
Expand Down

0 comments on commit 96481ff

Please sign in to comment.