diff --git a/node/node.go b/node/node.go index 810d3264d4..c37f016366 100644 --- a/node/node.go +++ b/node/node.go @@ -883,6 +883,7 @@ func NewNodeWithContext(ctx context.Context, // create an optional tracer client to collect trace data. tracer, err := trace.NewTracer( + ctx, config, logger, genDoc.ChainID, diff --git a/pkg/trace/buffered_file.go b/pkg/trace/buffered_file.go index 9b228e3f9e..97f5eec404 100644 --- a/pkg/trace/buffered_file.go +++ b/pkg/trace/buffered_file.go @@ -2,11 +2,14 @@ package trace import ( "bufio" + "context" "errors" + "github.com/tendermint/tendermint/libs/log" "io" "os" "sync" "sync/atomic" + "time" ) // bufferedFile is a file that is being written to and read from. It is thread @@ -28,10 +31,26 @@ type bufferedFile struct { } // newbufferedFile creates a new buffered file that writes to the given file. -func newbufferedFile(file *os.File) *bufferedFile { +func newbufferedFile(ctx context.Context, logger log.Logger, file *os.File) *bufferedFile { + bufferedWriter := bufio.NewWriter(file) + go func() { + ticker := time.NewTicker(30 * time.Second) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + err := bufferedWriter.Flush() + if err != nil { + logger.Error("error flushing buffered file", "err", err) + return + } + } + } + }() return &bufferedFile{ file: file, - wr: bufio.NewWriter(file), + wr: bufferedWriter, reading: atomic.Bool{}, mut: &sync.Mutex{}, } diff --git a/pkg/trace/local_tracer.go b/pkg/trace/local_tracer.go index 0d48515eda..2da8853cef 100644 --- a/pkg/trace/local_tracer.go +++ b/pkg/trace/local_tracer.go @@ -1,6 +1,7 @@ package trace import ( + "context" "encoding/json" "fmt" "os" @@ -69,7 +70,7 @@ type LocalTracer struct { // safe to avoid the overhead of locking with each event save. Only pass events // to the returned channel. Call CloseAll to close all open files. Goroutine to // save events is started in this function. -func NewLocalTracer(cfg *config.Config, logger log.Logger, chainID, nodeID string) (*LocalTracer, error) { +func NewLocalTracer(ctx context.Context, cfg *config.Config, logger log.Logger, chainID, nodeID string) (*LocalTracer, error) { fm := make(map[string]*bufferedFile) p := path.Join(cfg.RootDir, "data", "traces") for _, table := range splitAndTrimEmpty(cfg.Instrumentation.TracingTables, ",", " ") { @@ -82,7 +83,7 @@ func NewLocalTracer(cfg *config.Config, logger log.Logger, chainID, nodeID strin if err != nil { return nil, fmt.Errorf("failed to open or create file %s: %w", fileName, err) } - fm[table] = newbufferedFile(file) + fm[table] = newbufferedFile(ctx, logger, file) } lt := &LocalTracer{ diff --git a/pkg/trace/tracer.go b/pkg/trace/tracer.go index 6c4be62c4c..c1c3723bc6 100644 --- a/pkg/trace/tracer.go +++ b/pkg/trace/tracer.go @@ -1,6 +1,7 @@ package trace import ( + "context" "errors" "os" @@ -22,10 +23,10 @@ type Tracer interface { Stop() } -func NewTracer(cfg *config.Config, logger log.Logger, chainID, nodeID string) (Tracer, error) { +func NewTracer(ctx context.Context, cfg *config.Config, logger log.Logger, chainID, nodeID string) (Tracer, error) { switch cfg.Instrumentation.TraceType { case "local": - return NewLocalTracer(cfg, logger, chainID, nodeID) + return NewLocalTracer(ctx, cfg, logger, chainID, nodeID) case "noop": return NoOpTracer(), nil default: