From 236f91d2b6ea07e382accf961121b53670bf7cc2 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 8 May 2024 18:10:01 +0200 Subject: [PATCH] Add support for docker containers restart in loki.source.docker (#742) (#803) * Attempt to restart docker target for loki source at a fixed interval to support containers restart * add test * add missing target cleanup * improve test readability * rename config option (cherry picked from commit 5d777a5d1e101857631378e50e3ea7ac16ee8ea4) Co-authored-by: William Dumont --- CHANGELOG.md | 2 + .../component/loki/source/docker/docker.go | 7 +- .../loki/source/docker/docker_test.go | 105 ++++++++++++++++++ .../docker/internal/dockertarget/target.go | 2 - .../component/loki/source/docker/runner.go | 41 ++++--- 5 files changed, 134 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8cbdb423..7d36de020 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -79,6 +79,8 @@ v1.1.0-rc.0 - Fix a bug where custom components would not shadow the stdlib. If you have a module whose name conflicts with an stdlib function and if you use this exact function in your config, then you will need to rename your module. (@wildum) +- Fix an issue where `loki.source.docker` stops collecting logs after a container restart. (@wildum) + ### Other changes - Update `alloy-mixin` to use more specific alert group names (for example, diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index b7d06b91b..a64b142d8 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -305,9 +305,10 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) { } return &options{ - client: client, - handler: loki.NewEntryHandler(c.handler.Chan(), func() {}), - positions: c.posFile, + client: client, + handler: loki.NewEntryHandler(c.handler.Chan(), func() {}), + positions: c.posFile, + targetRestartInterval: 5 * time.Second, }, nil } diff --git a/internal/component/loki/source/docker/docker_test.go b/internal/component/loki/source/docker/docker_test.go index e020bcda0..96d847a66 100644 --- a/internal/component/loki/source/docker/docker_test.go +++ b/internal/component/loki/source/docker/docker_test.go @@ -4,17 +4,32 @@ package docker import ( "context" + "io" + "os" + "strings" "testing" "time" + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/client" + "github.com/go-kit/log" "github.com/grafana/alloy/internal/alloy/componenttest" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/common/loki/client/fake" + "github.com/grafana/alloy/internal/component/common/loki/positions" + dt "github.com/grafana/alloy/internal/component/loki/source/docker/internal/dockertarget" "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +const targetRestartInterval = 20 * time.Millisecond + func Test(t *testing.T) { // Use host that works on all platforms (including Windows). var cfg = ` @@ -73,3 +88,93 @@ func TestDuplicateTargets(t *testing.T) { require.Len(t, cmp.manager.tasks, 1) } + +func TestRestart(t *testing.T) { + runningState := true + client := clientMock{ + logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor", + running: func() bool { return runningState }, + } + expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor" + + tailer, entryHandler := setupTailer(t, client) + go tailer.Run(context.Background()) + + // The container is already running, expect log lines. + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logLines := entryHandler.Received() + if assert.NotEmpty(c, logLines) { + assert.Equal(c, expectedLogLine, logLines[0].Line) + } + }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit.") + + // Stops the container. + runningState = false + time.Sleep(targetRestartInterval + 10*time.Millisecond) // Sleep for a duration greater than targetRestartInterval to make sure it stops sending log lines. + entryHandler.Clear() + time.Sleep(targetRestartInterval + 10*time.Millisecond) + assert.Empty(t, entryHandler.Received()) // No log lines because the container was not running. + + // Restart the container and expect log lines. + runningState = true + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logLines := entryHandler.Received() + if assert.NotEmpty(c, logLines) { + assert.Equal(c, expectedLogLine, logLines[0].Line) + } + }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.") +} + +func setupTailer(t *testing.T, client clientMock) (tailer *tailer, entryHandler *fake.Client) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + entryHandler = fake.NewClient(func() {}) + + ps, err := positions.New(logger, positions.Config{ + SyncPeriod: 10 * time.Second, + PositionsFile: t.TempDir() + "/positions.yml", + }) + require.NoError(t, err) + + tgt, err := dt.NewTarget( + dt.NewMetrics(prometheus.NewRegistry()), + logger, + entryHandler, + ps, + "flog", + model.LabelSet{"job": "docker"}, + []*relabel.Config{}, + client, + ) + require.NoError(t, err) + tailerTask := &tailerTask{ + options: &options{ + client: client, + targetRestartInterval: targetRestartInterval, + }, + target: tgt, + } + return newTailer(logger, tailerTask), entryHandler +} + +type clientMock struct { + client.APIClient + logLine string + running func() bool +} + +func (mock clientMock) ContainerInspect(ctx context.Context, c string) (types.ContainerJSON, error) { + return types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: c, + State: &types.ContainerState{ + Running: mock.running(), + }, + }, + Config: &container.Config{Tty: true}, + }, nil +} + +func (mock clientMock) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader(mock.logLine)), nil +} diff --git a/internal/component/loki/source/docker/internal/dockertarget/target.go b/internal/component/loki/source/docker/internal/dockertarget/target.go index b30241ab7..02d48b31c 100644 --- a/internal/component/loki/source/docker/internal/dockertarget/target.go +++ b/internal/component/loki/source/docker/internal/dockertarget/target.go @@ -230,8 +230,6 @@ func (t *Target) StartIfNotRunning() { ctx, cancel := context.WithCancel(context.Background()) t.cancel = cancel go t.processLoop(ctx) - } else { - level.Debug(t.logger).Log("msg", "attempted to start process loop but it's already running", "container", t.containerName) } } diff --git a/internal/component/loki/source/docker/runner.go b/internal/component/loki/source/docker/runner.go index 2afe48ade..7f3dfdd0c 100644 --- a/internal/component/loki/source/docker/runner.go +++ b/internal/component/loki/source/docker/runner.go @@ -5,8 +5,8 @@ package docker import ( "context" "sync" + "time" - "github.com/docker/docker/api/types/container" "github.com/docker/docker/client" "github.com/go-kit/log" "github.com/grafana/alloy/internal/alloy/logging/level" @@ -52,6 +52,9 @@ type options struct { // positions interface so tailers can save/restore offsets in log files. positions positions.Positions + + // targetRestartInterval to restart task that has stopped running. + targetRestartInterval time.Duration } // tailerTask is the payload used to create tailers. It implements runner.Task. @@ -95,23 +98,25 @@ func newTailer(l log.Logger, task *tailerTask) *tailer { } func (t *tailer) Run(ctx context.Context) { - ch, chErr := t.opts.client.ContainerWait(ctx, t.target.Name(), container.WaitConditionNextExit) - - t.target.StartIfNotRunning() - - select { - case err := <-chErr: - // Error setting up the Wait request from the client; either failed to - // read from /containers/{containerID}/wait, or couldn't parse the - // response. Stop the target and exit the task after logging; if it was - // a transient error, the target will be retried on the next discovery - // refresh. - level.Error(t.log).Log("msg", "could not set up a wait request to the Docker client", "error", err) - t.target.Stop() - return - case <-ch: - t.target.Stop() - return + ticker := time.NewTicker(t.opts.targetRestartInterval) + tickerC := ticker.C + + for { + select { + case <-tickerC: + res, err := t.opts.client.ContainerInspect(ctx, t.target.Name()) + if err != nil { + level.Error(t.log).Log("msg", "error inspecting Docker container", "id", t.target.Name(), "error", err) + continue + } + if res.State.Running { + t.target.StartIfNotRunning() + } + case <-ctx.Done(): + t.target.Stop() + ticker.Stop() + return + } } }