From 637e42796e9e2561b2dec61d6118afc29fd328e1 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Wed, 27 Nov 2024 12:50:41 +0100 Subject: [PATCH 1/4] chore: flush the local tracer every 30s --- node/node.go | 1 + pkg/trace/buffered_file.go | 23 +++++++++++++++++++++-- pkg/trace/local_tracer.go | 5 +++-- pkg/trace/tracer.go | 5 +++-- 4 files changed, 28 insertions(+), 6 deletions(-) diff --git a/node/node.go b/node/node.go index 810d3264d4..c37f016366 100644 --- a/node/node.go +++ b/node/node.go @@ -883,6 +883,7 @@ func NewNodeWithContext(ctx context.Context, // create an optional tracer client to collect trace data. tracer, err := trace.NewTracer( + ctx, config, logger, genDoc.ChainID, diff --git a/pkg/trace/buffered_file.go b/pkg/trace/buffered_file.go index 9b228e3f9e..97f5eec404 100644 --- a/pkg/trace/buffered_file.go +++ b/pkg/trace/buffered_file.go @@ -2,11 +2,14 @@ package trace import ( "bufio" + "context" "errors" + "github.com/tendermint/tendermint/libs/log" "io" "os" "sync" "sync/atomic" + "time" ) // bufferedFile is a file that is being written to and read from. It is thread @@ -28,10 +31,26 @@ type bufferedFile struct { } // newbufferedFile creates a new buffered file that writes to the given file. -func newbufferedFile(file *os.File) *bufferedFile { +func newbufferedFile(ctx context.Context, logger log.Logger, file *os.File) *bufferedFile { + bufferedWriter := bufio.NewWriter(file) + go func() { + ticker := time.NewTicker(30 * time.Second) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + err := bufferedWriter.Flush() + if err != nil { + logger.Error("error flushing buffered file", "err", err) + return + } + } + } + }() return &bufferedFile{ file: file, - wr: bufio.NewWriter(file), + wr: bufferedWriter, reading: atomic.Bool{}, mut: &sync.Mutex{}, } diff --git a/pkg/trace/local_tracer.go b/pkg/trace/local_tracer.go index 0d48515eda..2da8853cef 100644 --- a/pkg/trace/local_tracer.go +++ b/pkg/trace/local_tracer.go @@ -1,6 +1,7 @@ package trace import ( + "context" "encoding/json" "fmt" "os" @@ -69,7 +70,7 @@ type LocalTracer struct { // safe to avoid the overhead of locking with each event save. Only pass events // to the returned channel. Call CloseAll to close all open files. Goroutine to // save events is started in this function. -func NewLocalTracer(cfg *config.Config, logger log.Logger, chainID, nodeID string) (*LocalTracer, error) { +func NewLocalTracer(ctx context.Context, cfg *config.Config, logger log.Logger, chainID, nodeID string) (*LocalTracer, error) { fm := make(map[string]*bufferedFile) p := path.Join(cfg.RootDir, "data", "traces") for _, table := range splitAndTrimEmpty(cfg.Instrumentation.TracingTables, ",", " ") { @@ -82,7 +83,7 @@ func NewLocalTracer(cfg *config.Config, logger log.Logger, chainID, nodeID strin if err != nil { return nil, fmt.Errorf("failed to open or create file %s: %w", fileName, err) } - fm[table] = newbufferedFile(file) + fm[table] = newbufferedFile(ctx, logger, file) } lt := &LocalTracer{ diff --git a/pkg/trace/tracer.go b/pkg/trace/tracer.go index 6c4be62c4c..c1c3723bc6 100644 --- a/pkg/trace/tracer.go +++ b/pkg/trace/tracer.go @@ -1,6 +1,7 @@ package trace import ( + "context" "errors" "os" @@ -22,10 +23,10 @@ type Tracer interface { Stop() } -func NewTracer(cfg *config.Config, logger log.Logger, chainID, nodeID string) (Tracer, error) { +func NewTracer(ctx context.Context, cfg *config.Config, logger log.Logger, chainID, nodeID string) (Tracer, error) { switch cfg.Instrumentation.TraceType { case "local": - return NewLocalTracer(cfg, logger, chainID, nodeID) + return NewLocalTracer(ctx, cfg, logger, chainID, nodeID) case "noop": return NoOpTracer(), nil default: From 20c282cd10e1818336a3e9c21aa53d054b78af02 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Wed, 27 Nov 2024 16:06:21 +0100 Subject: [PATCH 2/4] chore: lint --- pkg/trace/buffered_file.go | 3 ++- pkg/trace/local_tracer_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/trace/buffered_file.go b/pkg/trace/buffered_file.go index 97f5eec404..03d35537f8 100644 --- a/pkg/trace/buffered_file.go +++ b/pkg/trace/buffered_file.go @@ -4,12 +4,13 @@ import ( "bufio" "context" "errors" - "github.com/tendermint/tendermint/libs/log" "io" "os" "sync" "sync/atomic" "time" + + "github.com/tendermint/tendermint/libs/log" ) // bufferedFile is a file that is being written to and read from. It is thread diff --git a/pkg/trace/local_tracer_test.go b/pkg/trace/local_tracer_test.go index 68841a34b7..526e9e8e56 100644 --- a/pkg/trace/local_tracer_test.go +++ b/pkg/trace/local_tracer_test.go @@ -1,6 +1,7 @@ package trace import ( + "context" "fmt" "io" "net" @@ -159,7 +160,7 @@ func setupLocalTracer(t *testing.T, port int) *LocalTracer { cfg.Instrumentation.TracingTables = testEventTable cfg.Instrumentation.TracePullAddress = fmt.Sprintf(":%d", port) - client, err := NewLocalTracer(cfg, logger, "test_chain", "test_node") + client, err := NewLocalTracer(context.Background(), cfg, logger, "test_chain", "test_node") if err != nil { t.Fatalf("failed to create local client: %v", err) } From cc84c566bbfc5a49cd030decf1efdeba4ae22e55 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Wed, 27 Nov 2024 19:41:25 +0100 Subject: [PATCH 3/4] chore: increase wait for node e2e test --- test/e2e/runner/start.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/runner/start.go b/test/e2e/runner/start.go index c08afec2f5..f87f3e9f7f 100644 --- a/test/e2e/runner/start.go +++ b/test/e2e/runner/start.go @@ -105,7 +105,7 @@ func Start(testnet *e2e.Testnet) error { if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil { return err } - status, err := waitForNode(node, node.StartAt, 3*time.Minute) + status, err := waitForNode(node, node.StartAt, 5*time.Minute) if err != nil { return err } From ed37081c5f956f165fe4a753ec104360c5a36bd4 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 28 Nov 2024 18:21:24 +0100 Subject: [PATCH 4/4] fix: synchronise flushing the buffer --- pkg/trace/buffered_file.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/trace/buffered_file.go b/pkg/trace/buffered_file.go index 03d35537f8..fa92f6bfee 100644 --- a/pkg/trace/buffered_file.go +++ b/pkg/trace/buffered_file.go @@ -34,6 +34,12 @@ type bufferedFile struct { // newbufferedFile creates a new buffered file that writes to the given file. func newbufferedFile(ctx context.Context, logger log.Logger, file *os.File) *bufferedFile { bufferedWriter := bufio.NewWriter(file) + bf := bufferedFile{ + file: file, + wr: bufferedWriter, + reading: atomic.Bool{}, + mut: &sync.Mutex{}, + } go func() { ticker := time.NewTicker(30 * time.Second) for { @@ -41,7 +47,9 @@ func newbufferedFile(ctx context.Context, logger log.Logger, file *os.File) *buf case <-ctx.Done(): return case <-ticker.C: + bf.mut.Lock() err := bufferedWriter.Flush() + bf.mut.Unlock() if err != nil { logger.Error("error flushing buffered file", "err", err) return @@ -49,12 +57,7 @@ func newbufferedFile(ctx context.Context, logger log.Logger, file *os.File) *buf } } }() - return &bufferedFile{ - file: file, - wr: bufferedWriter, - reading: atomic.Bool{}, - mut: &sync.Mutex{}, - } + return &bf } // Write writes the given bytes to the file. If the file is currently being read