diff --git a/.gitignore b/.gitignore index 85e65191b..07f5d9b45 100644 --- a/.gitignore +++ b/.gitignore @@ -82,4 +82,7 @@ dist/ /conduit.yaml # Escape Analysis Report -escape_analysis.txt \ No newline at end of file +escape_analysis.txt + +# Profiles +*.prof diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 75ef704d0..e2b92b808 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -77,8 +77,9 @@ type Config struct { ProcessorBuilderRegistry *processor.BuilderRegistry dev struct { - cpuprofile string - memprofile string + cpuprofile string + memprofile string + blockprofile string } } diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go index 15691b25a..11167b444 100644 --- a/pkg/conduit/entrypoint.go +++ b/pkg/conduit/entrypoint.go @@ -98,6 +98,7 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { showDevHelp := flags.Bool("dev", false, "used together with the dev flag it shows dev flags") flags.StringVar(&cfg.dev.cpuprofile, "dev.cpuprofile", "", "write cpu profile to file") flags.StringVar(&cfg.dev.memprofile, "dev.memprofile", "", "write memory profile to file") + flags.StringVar(&cfg.dev.blockprofile, "dev.blockprofile", "", "write block profile to file") // show user or dev flags flags.Usage = func() { diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index fde35b61b..189093803 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -201,33 +201,13 @@ func newServices( // HTTP APIs. This function blocks until the supplied context is cancelled or // one of the services experiences a fatal error. func (r *Runtime) Run(ctx context.Context) (err error) { - t, ctx := tomb.WithContext(ctx) - - if r.Config.dev.cpuprofile != "" { - f, err := os.Create(r.Config.dev.cpuprofile) - if err != nil { - return cerrors.Errorf("could not create CPU profile: %w", err) - } - defer f.Close() - if err := pprof.StartCPUProfile(f); err != nil { - return cerrors.Errorf("could not start CPU profile: %w", err) - } - defer pprof.StopCPUProfile() - } - if r.Config.dev.memprofile != "" { - defer func() { - f, err := os.Create(r.Config.dev.memprofile) - if err != nil { - r.logger.Err(ctx, err).Msg("could not create memory profile") - return - } - defer f.Close() - runtime.GC() // get up-to-date statistics - if err := pprof.WriteHeapProfile(f); err != nil { - r.logger.Err(ctx, err).Msg("could not write memory profile") - } - }() + cleanup, err := r.initProfiling(ctx) + if err != nil { + return err } + defer cleanup() + + t, ctx := tomb.WithContext(ctx) defer func() { if err != nil { @@ -314,6 +294,75 @@ func (r *Runtime) Run(ctx context.Context) (err error) { return nil } +func (r *Runtime) initProfiling(ctx context.Context) (deferred func(), err error) { + deferred = func() {} + + // deferFunc adds the func into deferred so it can be executed by the caller + // in a defer statement + deferFunc := func(f func()) { + oldDeferred := deferred + deferred = func() { + oldDeferred() + f() + } + } + // ignoreErr returns a function that executes f and ignores the returned error + ignoreErr := func(f func() error) func() { + return func() { + _ = f() // ignore error + } + } + defer func() { + if err != nil { + // on error we make sure deferred functions are executed and return + // an empty function as deferred instead + deferred() + deferred = func() {} + } + }() + + if r.Config.dev.cpuprofile != "" { + f, err := os.Create(r.Config.dev.cpuprofile) + if err != nil { + return deferred, cerrors.Errorf("could not create CPU profile: %w", err) + } + deferFunc(ignoreErr(f.Close)) + if err := pprof.StartCPUProfile(f); err != nil { + return deferred, cerrors.Errorf("could not start CPU profile: %w", err) + } + deferFunc(pprof.StopCPUProfile) + } + if r.Config.dev.memprofile != "" { + deferFunc(func() { + f, err := os.Create(r.Config.dev.memprofile) + if err != nil { + r.logger.Err(ctx, err).Msg("could not create memory profile") + return + } + defer f.Close() + runtime.GC() // get up-to-date statistics + if err := pprof.WriteHeapProfile(f); err != nil { + r.logger.Err(ctx, err).Msg("could not write memory profile") + } + }) + } + if r.Config.dev.blockprofile != "" { + runtime.SetBlockProfileRate(1) + deferFunc(func() { + f, err := os.Create(r.Config.dev.blockprofile) + if err != nil { + r.logger.Err(ctx, err).Msg("could not create block profile") + return + } + defer f.Close() + if err := pprof.Lookup("block").WriteTo(f, 0); err != nil { + r.logger.Err(ctx, err).Msg("could not write block profile") + } + }) + } + return +} + func (r *Runtime) registerCleanup(t *tomb.Tomb) { t.Go(func() error { <-t.Dying()