Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: flush the local tracer every 30s #1541

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,7 @@ func NewNodeWithContext(ctx context.Context,

// create an optional tracer client to collect trace data.
tracer, err := trace.NewTracer(
ctx,
config,
logger,
genDoc.ChainID,
Expand Down
29 changes: 26 additions & 3 deletions pkg/trace/buffered_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package trace

import (
"bufio"
"context"
"errors"
"io"
"os"
"sync"
"sync/atomic"
"time"

"github.com/tendermint/tendermint/libs/log"
)

// bufferedFile is a file that is being written to and read from. It is thread
Expand All @@ -28,13 +32,32 @@ type bufferedFile struct {
}

// newbufferedFile creates a new buffered file that writes to the given file.
func newbufferedFile(file *os.File) *bufferedFile {
return &bufferedFile{
func newbufferedFile(ctx context.Context, logger log.Logger, file *os.File) *bufferedFile {
bufferedWriter := bufio.NewWriter(file)
bf := bufferedFile{
file: file,
wr: bufio.NewWriter(file),
wr: bufferedWriter,
reading: atomic.Bool{},
mut: &sync.Mutex{},
}
go func() {
ticker := time.NewTicker(30 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
bf.mut.Lock()
err := bufferedWriter.Flush()
bf.mut.Unlock()
if err != nil {
logger.Error("error flushing buffered file", "err", err)
return
}
}
}
}()
return &bf
}

// Write writes the given bytes to the file. If the file is currently being read
Expand Down
5 changes: 3 additions & 2 deletions pkg/trace/local_tracer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package trace

import (
"context"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -69,7 +70,7 @@ type LocalTracer struct {
// safe to avoid the overhead of locking with each event save. Only pass events
// to the returned channel. Call CloseAll to close all open files. Goroutine to
// save events is started in this function.
func NewLocalTracer(cfg *config.Config, logger log.Logger, chainID, nodeID string) (*LocalTracer, error) {
func NewLocalTracer(ctx context.Context, cfg *config.Config, logger log.Logger, chainID, nodeID string) (*LocalTracer, error) {
fm := make(map[string]*bufferedFile)
p := path.Join(cfg.RootDir, "data", "traces")
for _, table := range splitAndTrimEmpty(cfg.Instrumentation.TracingTables, ",", " ") {
Expand All @@ -82,7 +83,7 @@ func NewLocalTracer(cfg *config.Config, logger log.Logger, chainID, nodeID strin
if err != nil {
return nil, fmt.Errorf("failed to open or create file %s: %w", fileName, err)
}
fm[table] = newbufferedFile(file)
fm[table] = newbufferedFile(ctx, logger, file)
}

lt := &LocalTracer{
Expand Down
3 changes: 2 additions & 1 deletion pkg/trace/local_tracer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package trace

import (
"context"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -159,7 +160,7 @@ func setupLocalTracer(t *testing.T, port int) *LocalTracer {
cfg.Instrumentation.TracingTables = testEventTable
cfg.Instrumentation.TracePullAddress = fmt.Sprintf(":%d", port)

client, err := NewLocalTracer(cfg, logger, "test_chain", "test_node")
client, err := NewLocalTracer(context.Background(), cfg, logger, "test_chain", "test_node")
if err != nil {
t.Fatalf("failed to create local client: %v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/trace/tracer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package trace

import (
"context"
"errors"
"os"

Expand All @@ -22,10 +23,10 @@ type Tracer interface {
Stop()
}

func NewTracer(cfg *config.Config, logger log.Logger, chainID, nodeID string) (Tracer, error) {
func NewTracer(ctx context.Context, cfg *config.Config, logger log.Logger, chainID, nodeID string) (Tracer, error) {
switch cfg.Instrumentation.TraceType {
case "local":
return NewLocalTracer(cfg, logger, chainID, nodeID)
return NewLocalTracer(ctx, cfg, logger, chainID, nodeID)
case "noop":
return NoOpTracer(), nil
default:
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/runner/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func Start(testnet *e2e.Testnet) error {
if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil {
return err
}
status, err := waitForNode(node, node.StartAt, 3*time.Minute)
status, err := waitForNode(node, node.StartAt, 5*time.Minute)
if err != nil {
return err
}
Expand Down
Loading