Skip to content

Commit

Permalink
Fix memory leak in loki.process on config update (#1431)
Browse files Browse the repository at this point in the history
* Cleanup loki.process on update

* Fix goroutine leaks in other unit tests

* Refactor unit test

* Cleanup unit test code
* close output channels
* stop the updating process first

* Increase timeout for Mimir ruler test
  • Loading branch information
ptodev authored Aug 20, 2024
1 parent 9d98894 commit 5bca979
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 63 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ Main (unreleased)
for `discovery.*` is reloaded in such a way that no new targets were
discovered. (@ptodev, @thampiotr)

- Fix a memory leak which would occur any time `loki.process` had its configuration reloaded. (@ptodev)

v1.3.0
-----------------

Expand Down
6 changes: 3 additions & 3 deletions internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func (c *Component) Run(ctx context.Context) error {
if c.entryHandler != nil {
c.entryHandler.Stop()
}
close(c.processIn)
c.mut.RUnlock()
}()
wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -138,8 +137,9 @@ func (c *Component) Update(args component.Arguments) error {
if err != nil {
return err
}
c.entryHandler = loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() })
c.processIn = pipeline.Wrap(c.entryHandler).Chan()
entryHandler := loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() })
c.entryHandler = pipeline.Wrap(entryHandler)
c.processIn = c.entryHandler.Chan()
c.stages = newArgs.Stages
}

Expand Down
272 changes: 213 additions & 59 deletions internal/component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -321,6 +322,8 @@ stage.labels {
}

func TestEntrySentToTwoProcessComponents(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

// Set up two different loki.process components.
stg1 := `
forward_to = []
Expand All @@ -342,13 +345,16 @@ stage.static_labels {
args1.ForwardTo = []loki.LogsReceiver{ch1}
args2.ForwardTo = []loki.LogsReceiver{ch2}

ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()

// Start the loki.process components.
tc1, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.process")
require.NoError(t, err)
tc2, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.process")
require.NoError(t, err)
go func() { require.NoError(t, tc1.Run(componenttest.TestContext(t), args1)) }()
go func() { require.NoError(t, tc2.Run(componenttest.TestContext(t), args2)) }()
go func() { require.NoError(t, tc1.Run(ctx, args1)) }()
go func() { require.NoError(t, tc2.Run(ctx, args2)) }()
require.NoError(t, tc1.WaitExports(time.Second))
require.NoError(t, tc2.WaitExports(time.Second))

Expand All @@ -362,7 +368,7 @@ stage.static_labels {
require.NoError(t, err)

go func() {
err := ctrl.Run(context.Background(), lsf.Arguments{
err := ctrl.Run(ctx, lsf.Arguments{
Targets: []discovery.Target{{"__path__": f.Name(), "somelbl": "somevalue"}},
ForwardTo: []loki.LogsReceiver{
tc1.Exports().(Exports).Receiver,
Expand Down Expand Up @@ -400,69 +406,103 @@ stage.static_labels {
}
}

func TestDeadlockWithFrequentUpdates(t *testing.T) {
stg := `stage.json {
expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" }
drop_malformed = true
}
stage.json {
expressions = { "user" = "" }
source = "extra"
}
stage.labels {
values = {
stream = "",
user = "",
ts = "timestamp",
}
}`
type testFrequentUpdate struct {
t *testing.T
c *Component

// Unmarshal the Alloy relabel rules into a custom struct, as we don't have
// an easy way to refer to a loki.LogsReceiver value for the forward_to
// argument.
type cfg struct {
Stages []stages.StageConfig `alloy:"stage,enum"`
receiver1 loki.LogsReceiver
receiver2 loki.LogsReceiver

keepSending atomic.Bool
keepReceiving atomic.Bool
keepUpdating atomic.Bool

wgLogSend sync.WaitGroup
wgRun sync.WaitGroup
wgUpdate sync.WaitGroup

lastSend atomic.Value

stop func()
}

func startTestFrequentUpdate(t *testing.T, cfg string) *testFrequentUpdate {
res := testFrequentUpdate{
t: t,
receiver1: loki.NewLogsReceiver(),
receiver2: loki.NewLogsReceiver(),
}
var stagesCfg cfg
err := syntax.Unmarshal([]byte(stg), &stagesCfg)

ctx, cancel := context.WithCancel(context.Background())

res.keepSending.Store(true)
res.keepReceiving.Store(true)
res.keepUpdating.Store(true)

res.stop = func() {
res.keepUpdating.Store(false)
res.wgUpdate.Wait()

res.keepSending.Store(false)
res.wgLogSend.Wait()

cancel()
res.wgRun.Wait()

close(res.receiver1.Chan())
close(res.receiver2.Chan())
}

var args Arguments
err := syntax.Unmarshal([]byte(cfg), &args)
require.NoError(t, err)

ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver()
args.ForwardTo = []loki.LogsReceiver{res.receiver1, res.receiver2}

// Create and run the component, so that it can process and forwards logs.
opts := component.Options{
Logger: util.TestAlloyLogger(t),
Registerer: prometheus.NewRegistry(),
OnStateChange: func(e component.Exports) {},
GetServiceData: getServiceData,
}
args := Arguments{
ForwardTo: []loki.LogsReceiver{ch1, ch2},
Stages: stagesCfg.Stages,
}

c, err := New(opts, args)
res.c, err = New(opts, args)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.Run(ctx)

var lastSend atomic.Value
// Drain received logs
res.wgRun.Add(1)
go func() {
for {
res.c.Run(ctx)
res.wgRun.Done()
}()

return &res
}

// Continuously receive the logs from both channels
func (r *testFrequentUpdate) drainLogs() {
drainLogs := func() {
r.lastSend.Store(time.Now())
}

r.wgLogSend.Add(1)
go func() {
for r.keepReceiving.Load() {
select {
case <-ch1.Chan():
lastSend.Store(time.Now())
case <-ch2.Chan():
lastSend.Store(time.Now())
case <-r.receiver1.Chan():
drainLogs()
case <-r.receiver2.Chan():
drainLogs()
}
}
r.wgLogSend.Done()
}()
}

// Continuously send entries to both channels
// Continuously send entries to both channels
func (r *testFrequentUpdate) sendLogs() {
r.wgLogSend.Add(1)
go func() {
for {
for r.keepSending.Load() {
ts := time.Now()
logEntry := loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Expand All @@ -471,29 +511,87 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) {
Line: logline,
},
}
c.receiver.Chan() <- logEntry
select {
case r.c.receiver.Chan() <- logEntry:
default:
// continue
}
}
r.keepReceiving.Store(false)
r.wgLogSend.Done()
}()
}

// Call Updates
args1 := Arguments{
ForwardTo: []loki.LogsReceiver{ch1},
Stages: stagesCfg.Stages,
}
args2 := Arguments{
ForwardTo: []loki.LogsReceiver{ch2},
Stages: stagesCfg.Stages,
}
func (r *testFrequentUpdate) updateContinuously(cfg1, cfg2 string) {
var args1 Arguments
err := syntax.Unmarshal([]byte(cfg1), &args1)
require.NoError(r.t, err)
args1.ForwardTo = []loki.LogsReceiver{r.receiver1}

var args2 Arguments
err = syntax.Unmarshal([]byte(cfg2), &args2)
require.NoError(r.t, err)
args2.ForwardTo = []loki.LogsReceiver{r.receiver2}

r.wgUpdate.Add(1)
go func() {
for {
c.Update(args1)
c.Update(args2)
for r.keepUpdating.Load() {
r.c.Update(args1)
r.c.Update(args2)
}
r.wgUpdate.Done()
}()
}

func TestDeadlockWithFrequentUpdates(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

cfg1 := `stage.json {
expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" }
drop_malformed = true
}
stage.json {
expressions = { "user" = "" }
source = "extra"
}
stage.labels {
values = {
stream = "",
user = "",
ts = "timestamp",
}
}
forward_to = []`

cfg2 := `stage.json {
expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" }
drop_malformed = true
}
stage.labels {
values = {
stream = "",
ts = "timestamp",
}
}
forward_to = []`

r := startTestFrequentUpdate(t, `forward_to = []`)

// Continuously send entries to both channels
r.sendLogs()

// Continuously receive entries on both channels
r.drainLogs()

// Call Updates
r.updateContinuously(cfg1, cfg2)

// Run everything for a while
time.Sleep(1 * time.Second)
require.WithinDuration(t, time.Now(), lastSend.Load().(time.Time), 300*time.Millisecond)
require.WithinDuration(t, time.Now(), r.lastSend.Load().(time.Time), 300*time.Millisecond)

// Clean up
r.stop()
}

func getServiceData(name string) (interface{}, error) {
Expand All @@ -505,6 +603,62 @@ func getServiceData(name string) (interface{}, error) {
}
}

// Make sure there are no goroutine leaks when the config is updated.
// Goroutine leaks often cause memory leaks.
func TestLeakyUpdate(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

tester := newTester(t)
defer tester.stop()

forwardArgs := `
// This will be filled later
forward_to = []`

numLogsToSend := 1

cfg1 := `
stage.metrics {
metric.counter {
name = "paulin_test1"
action = "inc"
match_all = true
}
}` + forwardArgs

cfg2 := `
stage.metrics {
metric.counter {
name = "paulin_test2"
action = "inc"
match_all = true
}
}` + forwardArgs

metricsTemplate1 := `
# HELP loki_process_custom_paulin_test1
# TYPE loki_process_custom_paulin_test1 counter
loki_process_custom_paulin_test1{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
`

metricsTemplate2 := `
# HELP loki_process_custom_paulin_test2
# TYPE loki_process_custom_paulin_test2 counter
loki_process_custom_paulin_test2{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
`

metrics1 := fmt.Sprintf(metricsTemplate1, numLogsToSend)
metrics2 := fmt.Sprintf(metricsTemplate2, numLogsToSend)

tester.updateAndTest(numLogsToSend, cfg1, "", metrics1)
tester.updateAndTest(numLogsToSend, cfg2, "", metrics2)

for i := 0; i < 100; i++ {
tester.updateAndTest(numLogsToSend, cfg1, "", metrics1)
tester.updateAndTest(numLogsToSend, cfg2, "", metrics2)
}
}

func TestMetricsStageRefresh(t *testing.T) {
tester := newTester(t)
defer tester.stop()
Expand Down
2 changes: 1 addition & 1 deletion internal/component/mimir/rules/kubernetes/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestAdditionalLabels(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(rules))
return len(rules) == 1
}, time.Second, 10*time.Millisecond)
}, 3*time.Second, 10*time.Millisecond)

// The map of rules has only one element.
for ruleName, rule := range rules {
Expand Down

0 comments on commit 5bca979

Please sign in to comment.