diff --git a/CHANGELOG.md b/CHANGELOG.md index 88887d0722..1c83dfd0e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,8 @@ Main (unreleased) for `discovery.*` is reloaded in such a way that no new targets were discovered. (@ptodev, @thampiotr) +- Fix a memory leak which would occur any time `loki.process` had its configuration reloaded. (@ptodev) + v1.3.0 ----------------- diff --git a/internal/component/loki/process/process.go b/internal/component/loki/process/process.go index 35785fe020..d78405030a 100644 --- a/internal/component/loki/process/process.go +++ b/internal/component/loki/process/process.go @@ -101,7 +101,6 @@ func (c *Component) Run(ctx context.Context) error { if c.entryHandler != nil { c.entryHandler.Stop() } - close(c.processIn) c.mut.RUnlock() }() wg := &sync.WaitGroup{} @@ -138,8 +137,9 @@ func (c *Component) Update(args component.Arguments) error { if err != nil { return err } - c.entryHandler = loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() }) - c.processIn = pipeline.Wrap(c.entryHandler).Chan() + entryHandler := loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() }) + c.entryHandler = pipeline.Wrap(entryHandler) + c.processIn = c.entryHandler.Chan() c.stages = newArgs.Stages } diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index 92825ec2f8..c64bdfd40f 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "strings" + "sync" "testing" "time" @@ -321,6 +322,8 @@ stage.labels { } func TestEntrySentToTwoProcessComponents(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + // Set up two different loki.process components. stg1 := ` forward_to = [] @@ -342,13 +345,16 @@ stage.static_labels { args1.ForwardTo = []loki.LogsReceiver{ch1} args2.ForwardTo = []loki.LogsReceiver{ch2} + ctx, ctxCancel := context.WithCancel(context.Background()) + defer ctxCancel() + // Start the loki.process components. tc1, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.process") require.NoError(t, err) tc2, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.process") require.NoError(t, err) - go func() { require.NoError(t, tc1.Run(componenttest.TestContext(t), args1)) }() - go func() { require.NoError(t, tc2.Run(componenttest.TestContext(t), args2)) }() + go func() { require.NoError(t, tc1.Run(ctx, args1)) }() + go func() { require.NoError(t, tc2.Run(ctx, args2)) }() require.NoError(t, tc1.WaitExports(time.Second)) require.NoError(t, tc2.WaitExports(time.Second)) @@ -362,7 +368,7 @@ stage.static_labels { require.NoError(t, err) go func() { - err := ctrl.Run(context.Background(), lsf.Arguments{ + err := ctrl.Run(ctx, lsf.Arguments{ Targets: []discovery.Target{{"__path__": f.Name(), "somelbl": "somevalue"}}, ForwardTo: []loki.LogsReceiver{ tc1.Exports().(Exports).Receiver, @@ -400,69 +406,103 @@ stage.static_labels { } } -func TestDeadlockWithFrequentUpdates(t *testing.T) { - stg := `stage.json { - expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" } - drop_malformed = true - } - stage.json { - expressions = { "user" = "" } - source = "extra" - } - stage.labels { - values = { - stream = "", - user = "", - ts = "timestamp", - } - }` +type testFrequentUpdate struct { + t *testing.T + c *Component - // Unmarshal the Alloy relabel rules into a custom struct, as we don't have - // an easy way to refer to a loki.LogsReceiver value for the forward_to - // argument. - type cfg struct { - Stages []stages.StageConfig `alloy:"stage,enum"` + receiver1 loki.LogsReceiver + receiver2 loki.LogsReceiver + + keepSending atomic.Bool + keepReceiving atomic.Bool + keepUpdating atomic.Bool + + wgLogSend sync.WaitGroup + wgRun sync.WaitGroup + wgUpdate sync.WaitGroup + + lastSend atomic.Value + + stop func() +} + +func startTestFrequentUpdate(t *testing.T, cfg string) *testFrequentUpdate { + res := testFrequentUpdate{ + t: t, + receiver1: loki.NewLogsReceiver(), + receiver2: loki.NewLogsReceiver(), } - var stagesCfg cfg - err := syntax.Unmarshal([]byte(stg), &stagesCfg) + + ctx, cancel := context.WithCancel(context.Background()) + + res.keepSending.Store(true) + res.keepReceiving.Store(true) + res.keepUpdating.Store(true) + + res.stop = func() { + res.keepUpdating.Store(false) + res.wgUpdate.Wait() + + res.keepSending.Store(false) + res.wgLogSend.Wait() + + cancel() + res.wgRun.Wait() + + close(res.receiver1.Chan()) + close(res.receiver2.Chan()) + } + + var args Arguments + err := syntax.Unmarshal([]byte(cfg), &args) require.NoError(t, err) - ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() + args.ForwardTo = []loki.LogsReceiver{res.receiver1, res.receiver2} - // Create and run the component, so that it can process and forwards logs. opts := component.Options{ Logger: util.TestAlloyLogger(t), Registerer: prometheus.NewRegistry(), OnStateChange: func(e component.Exports) {}, GetServiceData: getServiceData, } - args := Arguments{ - ForwardTo: []loki.LogsReceiver{ch1, ch2}, - Stages: stagesCfg.Stages, - } - c, err := New(opts, args) + res.c, err = New(opts, args) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go c.Run(ctx) - var lastSend atomic.Value - // Drain received logs + res.wgRun.Add(1) go func() { - for { + res.c.Run(ctx) + res.wgRun.Done() + }() + + return &res +} + +// Continuously receive the logs from both channels +func (r *testFrequentUpdate) drainLogs() { + drainLogs := func() { + r.lastSend.Store(time.Now()) + } + + r.wgLogSend.Add(1) + go func() { + for r.keepReceiving.Load() { select { - case <-ch1.Chan(): - lastSend.Store(time.Now()) - case <-ch2.Chan(): - lastSend.Store(time.Now()) + case <-r.receiver1.Chan(): + drainLogs() + case <-r.receiver2.Chan(): + drainLogs() } } + r.wgLogSend.Done() }() +} - // Continuously send entries to both channels +// Continuously send entries to both channels +func (r *testFrequentUpdate) sendLogs() { + r.wgLogSend.Add(1) go func() { - for { + for r.keepSending.Load() { ts := time.Now() logEntry := loki.Entry{ Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, @@ -471,29 +511,87 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) { Line: logline, }, } - c.receiver.Chan() <- logEntry + select { + case r.c.receiver.Chan() <- logEntry: + default: + // continue + } } + r.keepReceiving.Store(false) + r.wgLogSend.Done() }() +} - // Call Updates - args1 := Arguments{ - ForwardTo: []loki.LogsReceiver{ch1}, - Stages: stagesCfg.Stages, - } - args2 := Arguments{ - ForwardTo: []loki.LogsReceiver{ch2}, - Stages: stagesCfg.Stages, - } +func (r *testFrequentUpdate) updateContinuously(cfg1, cfg2 string) { + var args1 Arguments + err := syntax.Unmarshal([]byte(cfg1), &args1) + require.NoError(r.t, err) + args1.ForwardTo = []loki.LogsReceiver{r.receiver1} + + var args2 Arguments + err = syntax.Unmarshal([]byte(cfg2), &args2) + require.NoError(r.t, err) + args2.ForwardTo = []loki.LogsReceiver{r.receiver2} + + r.wgUpdate.Add(1) go func() { - for { - c.Update(args1) - c.Update(args2) + for r.keepUpdating.Load() { + r.c.Update(args1) + r.c.Update(args2) } + r.wgUpdate.Done() }() +} + +func TestDeadlockWithFrequentUpdates(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + + cfg1 := `stage.json { + expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" } + drop_malformed = true + } + stage.json { + expressions = { "user" = "" } + source = "extra" + } + stage.labels { + values = { + stream = "", + user = "", + ts = "timestamp", + } + } + forward_to = []` + + cfg2 := `stage.json { + expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" } + drop_malformed = true + } + stage.labels { + values = { + stream = "", + ts = "timestamp", + } + } + forward_to = []` + + r := startTestFrequentUpdate(t, `forward_to = []`) + + // Continuously send entries to both channels + r.sendLogs() + + // Continuously receive entries on both channels + r.drainLogs() + + // Call Updates + r.updateContinuously(cfg1, cfg2) // Run everything for a while time.Sleep(1 * time.Second) - require.WithinDuration(t, time.Now(), lastSend.Load().(time.Time), 300*time.Millisecond) + require.WithinDuration(t, time.Now(), r.lastSend.Load().(time.Time), 300*time.Millisecond) + + // Clean up + r.stop() } func getServiceData(name string) (interface{}, error) { @@ -505,6 +603,62 @@ func getServiceData(name string) (interface{}, error) { } } +// Make sure there are no goroutine leaks when the config is updated. +// Goroutine leaks often cause memory leaks. +func TestLeakyUpdate(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + + tester := newTester(t) + defer tester.stop() + + forwardArgs := ` + // This will be filled later + forward_to = []` + + numLogsToSend := 1 + + cfg1 := ` + stage.metrics { + metric.counter { + name = "paulin_test1" + action = "inc" + match_all = true + } + }` + forwardArgs + + cfg2 := ` + stage.metrics { + metric.counter { + name = "paulin_test2" + action = "inc" + match_all = true + } + }` + forwardArgs + + metricsTemplate1 := ` + # HELP loki_process_custom_paulin_test1 + # TYPE loki_process_custom_paulin_test1 counter + loki_process_custom_paulin_test1{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + ` + + metricsTemplate2 := ` + # HELP loki_process_custom_paulin_test2 + # TYPE loki_process_custom_paulin_test2 counter + loki_process_custom_paulin_test2{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + ` + + metrics1 := fmt.Sprintf(metricsTemplate1, numLogsToSend) + metrics2 := fmt.Sprintf(metricsTemplate2, numLogsToSend) + + tester.updateAndTest(numLogsToSend, cfg1, "", metrics1) + tester.updateAndTest(numLogsToSend, cfg2, "", metrics2) + + for i := 0; i < 100; i++ { + tester.updateAndTest(numLogsToSend, cfg1, "", metrics1) + tester.updateAndTest(numLogsToSend, cfg2, "", metrics2) + } +} + func TestMetricsStageRefresh(t *testing.T) { tester := newTester(t) defer tester.stop() diff --git a/internal/component/mimir/rules/kubernetes/events_test.go b/internal/component/mimir/rules/kubernetes/events_test.go index eb2c268650..8bd09bd6b8 100644 --- a/internal/component/mimir/rules/kubernetes/events_test.go +++ b/internal/component/mimir/rules/kubernetes/events_test.go @@ -275,7 +275,7 @@ func TestAdditionalLabels(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(rules)) return len(rules) == 1 - }, time.Second, 10*time.Millisecond) + }, 3*time.Second, 10*time.Millisecond) // The map of rules has only one element. for ruleName, rule := range rules {