Skip to content

Commit

Permalink
Refine reporter caches (open-telemetry#215)
Browse files Browse the repository at this point in the history
- Split sizes of reporter caches and size of tracehandler cache
- Lifetime for cache items
- Less memory consumption due to smaller cache sizes
- Periodic purging of expired cache items
  • Loading branch information
rockdaboot authored Oct 30, 2024
1 parent 1e31c69 commit 1618b30
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 19 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.62.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/cilium/ebpf v0.16.0
github.com/elastic/go-freelru v0.13.0
github.com/elastic/go-freelru v0.15.0
github.com/elastic/go-perf v0.0.0-20241016160959-1342461adb4a
github.com/google/uuid v1.6.0
github.com/jsimonetti/rtnetlink v1.4.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-freelru v0.13.0 h1:TKKY6yCfNNNky7Pj9xZAOEpBcdNgZJfihEftOb55omg=
github.com/elastic/go-freelru v0.13.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I=
github.com/elastic/go-freelru v0.15.0 h1:Jo1aY8JAvpyxbTDJEudrsBfjFDaALpfVv8mxuh9sfvI=
github.com/elastic/go-freelru v0.15.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I=
github.com/elastic/go-perf v0.0.0-20241016160959-1342461adb4a h1:ymmtaN4bVCmKKeu4XEf6JEWNZKRXPMng1zjpKd+8rCU=
github.com/elastic/go-perf v0.0.0-20241016160959-1342461adb4a/go.mod h1:Nt+pnRYvf0POC+7pXsrv8ubsEOSsaipJP0zlz1Ms1RM=
github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI=
Expand Down
31 changes: 18 additions & 13 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"go.opentelemetry.io/ebpf-profiler/util"
)

const MiB = 1 << 20

// Controller is an instance that runs, manages and stops the agent.
type Controller struct {
config *Config
Expand Down Expand Up @@ -88,19 +90,22 @@ func (c *Controller) Start(ctx context.Context) error {
var rep reporter.Reporter
// Connect to the collection agent
rep, err = reporter.Start(ctx, &reporter.Config{
CollAgentAddr: c.config.CollAgentAddr,
DisableTLS: c.config.DisableTLS,
MaxRPCMsgSize: 32 << 20, // 32 MiB
MaxGRPCRetries: 5,
GRPCOperationTimeout: intervals.GRPCOperationTimeout(),
GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(),
GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(),
ReportInterval: intervals.ReportInterval(),
CacheSize: traceHandlerCacheSize,
SamplesPerSecond: c.config.SamplesPerSecond,
KernelVersion: kernelVersion,
HostName: hostname,
IPAddress: sourceIP,
CollAgentAddr: c.config.CollAgentAddr,
DisableTLS: c.config.DisableTLS,
MaxRPCMsgSize: 32 * MiB,
MaxGRPCRetries: 5,
GRPCOperationTimeout: intervals.GRPCOperationTimeout(),
GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(),
GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(),
ReportInterval: intervals.ReportInterval(),
ExecutablesCacheElements: 4096,
// Next step: Calculate FramesCacheElements from numCores and samplingRate.
FramesCacheElements: 65536,
CGroupCacheElements: 1024,
SamplesPerSecond: c.config.SamplesPerSecond,
KernelVersion: kernelVersion,
HostName: hostname,
IPAddress: sourceIP,
})
if err != nil {
return fmt.Errorf("failed to start reporting: %w", err)
Expand Down
8 changes: 6 additions & 2 deletions reporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ type Config struct {

// Disable secure communication with Collection Agent.
DisableTLS bool
// CacheSize defines the size of the reporter caches.
CacheSize uint32
// ExecutablesCacheElements defines item capacity of the executables cache.
ExecutablesCacheElements uint32
// FramesCacheElements defines the item capacity of the frames cache.
FramesCacheElements uint32
// CGroupCacheElements defines the item capacity of the cgroup cache.
CGroupCacheElements uint32
// samplesPerSecond defines the number of samples per second.
SamplesPerSecond int
// HostID is the host ID to be sent to the collection agent.
Expand Down
17 changes: 14 additions & 3 deletions reporter/otlp_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,18 +304,22 @@ func (r *OTLPReporter) GetMetrics() Metrics {

// Start sets up and manages the reporting connection to a OTLP backend.
func Start(mainCtx context.Context, cfg *Config) (Reporter, error) {
executables, err := lru.NewSynced[libpf.FileID, execInfo](cfg.CacheSize, libpf.FileID.Hash32)
executables, err :=
lru.NewSynced[libpf.FileID, execInfo](cfg.ExecutablesCacheElements, libpf.FileID.Hash32)
if err != nil {
return nil, err
}
executables.SetLifetime(1 * time.Hour) // Allow GC to clean stale items.

frames, err := lru.NewSynced[libpf.FileID,
*xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]](cfg.CacheSize, libpf.FileID.Hash32)
*xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]](
cfg.FramesCacheElements, libpf.FileID.Hash32)
if err != nil {
return nil, err
}
frames.SetLifetime(1 * time.Hour) // Allow GC to clean stale items.

cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CacheSize,
cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements,
func(pid libpf.PID) uint32 { return uint32(pid) })
if err != nil {
return nil, err
Expand Down Expand Up @@ -367,6 +371,8 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) {
go func() {
tick := time.NewTicker(cfg.ReportInterval)
defer tick.Stop()
purgeTick := time.NewTicker(5 * time.Minute)
defer purgeTick.Stop()
for {
select {
case <-ctx.Done():
Expand All @@ -378,6 +384,11 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) {
log.Errorf("Request failed: %v", err)
}
tick.Reset(libpf.AddJitter(cfg.ReportInterval, 0.2))
case <-purgeTick.C:
// Allow the GC to purge expired entries to avoid memory leaks.
r.executables.PurgeExpired()
r.frames.PurgeExpired()
r.cgroupv2ID.PurgeExpired()
}
}
}()
Expand Down

0 comments on commit 1618b30

Please sign in to comment.