Skip to content

Commit

Permalink
chore: flush the local tracer every 30s
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Nov 27, 2024
1 parent a1268f7 commit 637e427
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 6 deletions.
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,7 @@ func NewNodeWithContext(ctx context.Context,

// create an optional tracer client to collect trace data.
tracer, err := trace.NewTracer(
ctx,
config,
logger,
genDoc.ChainID,
Expand Down
23 changes: 21 additions & 2 deletions pkg/trace/buffered_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package trace

import (
"bufio"
"context"
"errors"
"github.com/tendermint/tendermint/libs/log"
"io"
"os"
"sync"
"sync/atomic"
"time"
)

// bufferedFile is a file that is being written to and read from. It is thread
Expand All @@ -28,10 +31,26 @@ type bufferedFile struct {
}

// newbufferedFile creates a new buffered file that writes to the given file.
func newbufferedFile(file *os.File) *bufferedFile {
func newbufferedFile(ctx context.Context, logger log.Logger, file *os.File) *bufferedFile {
bufferedWriter := bufio.NewWriter(file)
go func() {
ticker := time.NewTicker(30 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := bufferedWriter.Flush()
if err != nil {
logger.Error("error flushing buffered file", "err", err)
return
}
}
}
}()
return &bufferedFile{
file: file,
wr: bufio.NewWriter(file),
wr: bufferedWriter,
reading: atomic.Bool{},
mut: &sync.Mutex{},
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/trace/local_tracer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package trace

import (
"context"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -69,7 +70,7 @@ type LocalTracer struct {
// safe to avoid the overhead of locking with each event save. Only pass events
// to the returned channel. Call CloseAll to close all open files. Goroutine to
// save events is started in this function.
func NewLocalTracer(cfg *config.Config, logger log.Logger, chainID, nodeID string) (*LocalTracer, error) {
func NewLocalTracer(ctx context.Context, cfg *config.Config, logger log.Logger, chainID, nodeID string) (*LocalTracer, error) {
fm := make(map[string]*bufferedFile)
p := path.Join(cfg.RootDir, "data", "traces")
for _, table := range splitAndTrimEmpty(cfg.Instrumentation.TracingTables, ",", " ") {
Expand All @@ -82,7 +83,7 @@ func NewLocalTracer(cfg *config.Config, logger log.Logger, chainID, nodeID strin
if err != nil {
return nil, fmt.Errorf("failed to open or create file %s: %w", fileName, err)
}
fm[table] = newbufferedFile(file)
fm[table] = newbufferedFile(ctx, logger, file)
}

lt := &LocalTracer{
Expand Down
5 changes: 3 additions & 2 deletions pkg/trace/tracer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package trace

import (
"context"
"errors"
"os"

Expand All @@ -22,10 +23,10 @@ type Tracer interface {
Stop()
}

func NewTracer(cfg *config.Config, logger log.Logger, chainID, nodeID string) (Tracer, error) {
func NewTracer(ctx context.Context, cfg *config.Config, logger log.Logger, chainID, nodeID string) (Tracer, error) {
switch cfg.Instrumentation.TraceType {
case "local":
return NewLocalTracer(cfg, logger, chainID, nodeID)
return NewLocalTracer(ctx, cfg, logger, chainID, nodeID)
case "noop":
return NoOpTracer(), nil
default:
Expand Down

0 comments on commit 637e427

Please sign in to comment.