diff --git a/CHANGELOG.md b/CHANGELOG.md index 9da7c5266b..5d1ebcfd53 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,8 @@ Main (unreleased) - Update `prometheus.write.queue` library for performance increases in cpu. (@mattdurham) +- Add an argument `retry_interval` to allow `loki.source.file` to try re-opening deleted files on Windows. (@wildum) + ### Bugfixes - Fixed issue with automemlimit logging bad messages and trying to access cgroup on non-linux builds (@dehaansa) diff --git a/docs/sources/reference/components/loki/loki.source.file.md b/docs/sources/reference/components/loki/loki.source.file.md index 33eb867a8a..dfca155b2e 100644 --- a/docs/sources/reference/components/loki/loki.source.file.md +++ b/docs/sources/reference/components/loki/loki.source.file.md @@ -40,6 +40,7 @@ The component starts a new reader for each of the given `targets` and fans out l | `encoding` | `string` | The encoding to convert from when reading files. | `""` | no | | `tail_from_end` | `bool` | Whether a log file is tailed from the end if a stored position isn't found. | `false` | no | | `legacy_positions_file` | `string` | Allows conversion from legacy positions file. | `""` | no | +| `retry_interval` | `duration` | Frequency to try re-opening files that were closed. | `"0s"` | no | The `encoding` argument must be a valid [IANA encoding][] name. If not set, it defaults to UTF-8. @@ -47,6 +48,9 @@ defaults to UTF-8. You can use the `tail_from_end` argument when you want to tail a large file without reading its entire content. When set to true, only new logs will be read, ignoring the existing ones. +`retry_interval` is deactivated by default (`"0s"`). This should be set on Windows-based systems when rotating files with the same names because the +component will no try to re-open the files and the targets won't be updated because of the cache. +This is not needed on Unix-like systems because the component will always try to re-open the deleted files. {{< admonition type="note" >}} The `legacy_positions_file` argument is used when you are transitioning from legacy. The legacy positions file is rewritten into the new format. diff --git a/internal/component/loki/source/file/decompresser.go b/internal/component/loki/source/file/decompresser.go index f9dd45d4ad..0569ae2666 100644 --- a/internal/component/loki/source/file/decompresser.go +++ b/internal/component/loki/source/file/decompresser.go @@ -46,8 +46,9 @@ type decompressor struct { handler loki.EntryHandler positions positions.Positions - path string - labels string + path string + labels model.LabelSet + labelsStr string posAndSizeMtx sync.Mutex stopOnce sync.Once @@ -70,14 +71,14 @@ func newDecompressor( handler loki.EntryHandler, positions positions.Positions, path string, - labels string, + labels model.LabelSet, encodingFormat string, cfg DecompressionConfig, ) (*decompressor, error) { logger = log.With(logger, "component", "decompressor") - pos, err := positions.Get(path, labels) + pos, err := positions.Get(path, labels.String()) if err != nil { return nil, fmt.Errorf("failed to get positions: %w", err) } @@ -99,6 +100,7 @@ func newDecompressor( positions: positions, path: path, labels: labels, + labelsStr: labels.String(), running: atomic.NewBool(false), posquit: make(chan struct{}), posdone: make(chan struct{}), @@ -268,7 +270,7 @@ func (d *decompressor) MarkPositionAndSize() error { d.metrics.totalBytes.WithLabelValues(d.path).Set(float64(d.size)) d.metrics.readBytes.WithLabelValues(d.path).Set(float64(d.position)) - d.positions.Put(d.path, d.labels, d.position) + d.positions.Put(d.path, d.labelsStr, d.position) return nil } @@ -318,3 +320,7 @@ func (d *decompressor) cleanupMetrics() { func (d *decompressor) Path() string { return d.path } + +func (d *decompressor) Labels() model.LabelSet { + return d.labels +} diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index dc1222e1fd..47186c19d5 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -46,6 +46,7 @@ type Arguments struct { FileWatch FileWatch `alloy:"file_watch,block,optional"` TailFromEnd bool `alloy:"tail_from_end,attr,optional"` LegacyPositionsFile string `alloy:"legacy_positions_file,attr,optional"` + RetryInterval time.Duration `alloy:"retry_interval,attr,optional"` } type FileWatch struct { @@ -86,6 +87,11 @@ type Component struct { receivers []loki.LogsReceiver posFile positions.Positions readers map[positions.Entry]reader + + tickerChan chan struct{} + ticker *time.Ticker + restartReadersCancel context.CancelFunc + restartReadersContext context.Context } // New creates a new loki.source.file component. @@ -134,13 +140,14 @@ func New(o component.Options, args Arguments) (*Component, error) { func (c *Component) Run(ctx context.Context) error { defer func() { level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping readers and positions file") - c.mut.RLock() + c.mut.Lock() for _, r := range c.readers { r.Stop() } c.posFile.Stop() close(c.handler.Chan()) - c.mut.RUnlock() + c.cleanupReadersRestart() + c.mut.Unlock() }() for { @@ -153,6 +160,28 @@ func (c *Component) Run(ctx context.Context) error { receiver.Chan() <- entry } c.mut.RUnlock() + case <-c.tickerChan: + c.mut.Lock() + // Find readers that are stopped and re-create them if the files that they were tailing are back. + // This helps for log rotation on Windows because the tailer is closed as soon as the file is removed. + // On Unix-like systems, it won't re-create any reader because the reader will stay open till the next Update call. + restartReaders := make(map[positions.Entry]reader) + for key, reader := range c.readers { + if !reader.IsRunning() { + _, err := os.Stat(reader.Path()) + if err != nil { + continue + } + restartReaders[key] = reader + } + } + for key, reader := range restartReaders { + level.Debug(c.opts.Logger).Log("msg", "recreate reader", "path", reader.Path()) + reader.Stop() + delete(c.readers, key) + c.addReader(key, reader.Path(), reader.Labels()) + } + c.mut.Unlock() } } } @@ -185,6 +214,7 @@ func (c *Component) Update(args component.Arguments) error { oldPaths := c.stopReaders() newArgs := args.(Arguments) + previousRetryInterval := c.args.RetryInterval c.mut.Lock() defer c.mut.Unlock() @@ -200,7 +230,6 @@ func (c *Component) Update(args component.Arguments) error { for _, target := range newArgs.Targets { path := target[pathLabel] - labels := make(model.LabelSet) for k, v := range target { if strings.HasPrefix(k, model.ReservedLabelPrefix) { @@ -214,19 +243,7 @@ func (c *Component) Update(args component.Arguments) error { if _, exist := c.readers[readersKey]; exist { continue } - - c.reportSize(path, labels.String()) - - handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler.Chan(), func() {})) - reader, err := c.startTailing(path, labels, handler) - if err != nil { - continue - } - - c.readers[readersKey] = readerWithHandler{ - reader: reader, - handler: handler, - } + c.addReader(readersKey, path, labels) } // Remove from the positions file any entries that had a Reader before, but @@ -235,9 +252,28 @@ func (c *Component) Update(args component.Arguments) error { c.posFile.Remove(r.Path, r.Labels) } + if newArgs.RetryInterval != previousRetryInterval { + c.handleReadersRestart(newArgs.RetryInterval) + } + return nil } +func (c *Component) addReader(key positions.Entry, path string, labels model.LabelSet) { + c.reportSize(path, labels.String()) + + handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler.Chan(), func() {})) + reader, err := c.startTailing(path, labels, handler) + if err != nil { + return + } + + c.readers[key] = readerWithHandler{ + reader: reader, + handler: handler, + } +} + // readerWithHandler combines a reader with an entry handler associated with // it. Closing the reader will also close the handler. type readerWithHandler struct { @@ -331,7 +367,7 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok handler, c.posFile, path, - labels.String(), + labels, c.args.Encoding, c.args.DecompressionConfig, ) @@ -352,7 +388,7 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok handler, c.posFile, path, - labels.String(), + labels, c.args.Encoding, pollOptions, c.args.TailFromEnd, @@ -385,3 +421,35 @@ func (c *Component) reportSize(path, labels string) { c.metrics.totalBytes.WithLabelValues(path).Set(float64(fi.Size())) } } + +func (c *Component) handleReadersRestart(retryInterval time.Duration) { + c.cleanupReadersRestart() + + if retryInterval > 0 { + c.restartReadersContext, c.restartReadersCancel = context.WithCancel(context.Background()) + c.ticker = time.NewTicker(retryInterval) + c.tickerChan = make(chan struct{}) + go func() { + for { + select { + case <-c.ticker.C: + c.tickerChan <- struct{}{} + case <-c.restartReadersContext.Done(): + return + } + } + }() + } +} + +func (c *Component) cleanupReadersRestart() { + if c.restartReadersCancel != nil { + c.restartReadersCancel() + c.restartReadersCancel = nil + } + if c.ticker != nil { + c.ticker.Stop() + close(c.tickerChan) + c.ticker = nil + } +} diff --git a/internal/component/loki/source/file/file_test.go b/internal/component/loki/source/file/file_test.go index fcd418b6f1..cd144c4b76 100644 --- a/internal/component/loki/source/file/file_test.go +++ b/internal/component/loki/source/file/file_test.go @@ -314,3 +314,93 @@ func TestEncoding(t *testing.T) { "expected positions.yml file to be written eventually", ) } + +func TestDeleteRecreateFile(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + + filename := "example" + + ctx, cancel := context.WithCancel(componenttest.TestContext(t)) + defer cancel() + + // Create file to log to. + f, err := os.Create(filename) + require.NoError(t, err) + + ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file") + require.NoError(t, err) + + ch1 := loki.NewLogsReceiver() + + go func() { + err := ctrl.Run(ctx, Arguments{ + Targets: []discovery.Target{{ + "__path__": f.Name(), + "foo": "bar", + }}, + ForwardTo: []loki.LogsReceiver{ch1}, + RetryInterval: 1 * time.Second, + }) + require.NoError(t, err) + }() + + ctrl.WaitRunning(time.Minute) + + _, err = f.Write([]byte("writing some text\n")) + require.NoError(t, err) + + wantLabelSet := model.LabelSet{ + "filename": model.LabelValue(f.Name()), + "foo": "bar", + } + + checkMsg(t, ch1, "writing some text", 5*time.Second, wantLabelSet) + + require.NoError(t, f.Close()) + require.NoError(t, os.Remove(f.Name())) + + // Create a file with the same name + f, err = os.Create(filename) + require.NoError(t, err) + + _, err = f.Write([]byte("writing some new text\n")) + require.NoError(t, err) + + checkMsg(t, ch1, "writing some new text", 5*time.Second, wantLabelSet) + + // Change the retry interval to 200ms + ctrl.Update(Arguments{ + Targets: []discovery.Target{{ + "__path__": f.Name(), + "foo": "bar", + }}, + ForwardTo: []loki.LogsReceiver{ch1}, + RetryInterval: 500 * time.Millisecond, + }) + + require.NoError(t, f.Close()) + require.NoError(t, os.Remove(f.Name())) + + // Create a file with the same name + f, err = os.Create(filename) + require.NoError(t, err) + defer os.Remove(f.Name()) + defer f.Close() + + _, err = f.Write([]byte("writing some new new text\n")) + require.NoError(t, err) + + // Timeout is set to 500ms. If the retry interval would not be updated, it would fail on Windows. + checkMsg(t, ch1, "writing some new new text", 800*time.Millisecond, wantLabelSet) +} + +func checkMsg(t *testing.T, ch loki.LogsReceiver, msg string, timeout time.Duration, labelSet model.LabelSet) { + select { + case logEntry := <-ch.Chan(): + require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) + require.Equal(t, msg, logEntry.Line) + require.Equal(t, labelSet, logEntry.Labels) + case <-time.After(timeout): + require.FailNow(t, "failed waiting for log line") + } +} diff --git a/internal/component/loki/source/file/file_win_test.go b/internal/component/loki/source/file/file_win_test.go new file mode 100644 index 0000000000..d2a3b5621c --- /dev/null +++ b/internal/component/loki/source/file/file_win_test.go @@ -0,0 +1,130 @@ +//go:build windows + +package file + +import ( + "context" + "os" + "testing" + "time" + + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/alloy/internal/util" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" +) + +// This test: +// - create a file without retry interval -> successful read +// - delete the file, recreate it -> no read +// - update the component with retry interval -> successful read +// - update the component without retry interval, delete the file, recreate it -> no read +func TestDeleteRecreateFileWindows(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + + filename := "example" + + ctx, cancel := context.WithCancel(componenttest.TestContext(t)) + defer cancel() + + // Create file to log to. + f, err := os.Create(filename) + require.NoError(t, err) + + ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file") + require.NoError(t, err) + + ch1 := loki.NewLogsReceiver() + + go func() { + err := ctrl.Run(ctx, Arguments{ + Targets: []discovery.Target{{ + "__path__": f.Name(), + "foo": "bar", + }}, + ForwardTo: []loki.LogsReceiver{ch1}, + }) + require.NoError(t, err) + }() + + ctrl.WaitRunning(time.Minute) + + _, err = f.Write([]byte("writing some text\n")) + require.NoError(t, err) + + wantLabelSet := model.LabelSet{ + "filename": model.LabelValue(f.Name()), + "foo": "bar", + } + + checkMsg(t, ch1, "writing some text", 5*time.Second, wantLabelSet) + + require.NoError(t, f.Close()) + require.NoError(t, os.Remove(f.Name())) + + // Create a file with the same name + f, err = os.Create(filename) + require.NoError(t, err) + + _, err = f.Write([]byte("writing some new text\n")) + require.NoError(t, err) + + select { + case <-ch1.Chan(): + t.Fatalf("Unexpected log entry received") + case <-time.After(1 * time.Second): + // Test passes if no log entry is received within the timeout + // This indicates that the log source does not retry reading the file + } + + // Start the retry interval + ctrl.Update(Arguments{ + Targets: []discovery.Target{{ + "__path__": f.Name(), + "foo": "bar", + }}, + ForwardTo: []loki.LogsReceiver{ch1}, + RetryInterval: 200 * time.Millisecond, + }) + + require.NoError(t, f.Close()) + require.NoError(t, os.Remove(f.Name())) + + // Create a file with the same name + f, err = os.Create(filename) + require.NoError(t, err) + + checkMsg(t, ch1, "writing some new text", 1*time.Second, wantLabelSet) + + // Stop the retry interval + ctrl.Update(Arguments{ + Targets: []discovery.Target{{ + "__path__": f.Name(), + "foo": "bar", + }}, + ForwardTo: []loki.LogsReceiver{ch1}, + }) + + require.NoError(t, f.Close()) + require.NoError(t, os.Remove(f.Name())) + + // Create a file with the same name + f, err = os.Create(filename) + require.NoError(t, err) + defer os.Remove(f.Name()) + defer f.Close() + + _, err = f.Write([]byte("writing some new new text\n")) + require.NoError(t, err) + + select { + case <-ch1.Chan(): + t.Fatalf("Unexpected log entry received") + case <-time.After(1 * time.Second): + // Test passes if no log entry is received within the timeout + // This indicates that the log source does not retry reading the file + } +} diff --git a/internal/component/loki/source/file/reader.go b/internal/component/loki/source/file/reader.go index 04016dc6c1..5aca328c69 100644 --- a/internal/component/loki/source/file/reader.go +++ b/internal/component/loki/source/file/reader.go @@ -1,5 +1,7 @@ package file +import "github.com/prometheus/common/model" + // This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. // This code accommodates the tailer and decompressor implementations as readers. @@ -8,5 +10,6 @@ type reader interface { Stop() IsRunning() bool Path() string + Labels() model.LabelSet MarkPositionAndSize() error } diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index d925d2ec5c..955e020a79 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -34,9 +34,10 @@ type tailer struct { handler loki.EntryHandler positions positions.Positions - path string - labels string - tail *tail.Tail + path string + labels model.LabelSet + labelsStr string + tail *tail.Tail posAndSizeMtx sync.Mutex stopOnce sync.Once @@ -50,20 +51,21 @@ type tailer struct { } func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string, - labels string, encoding string, pollOptions watch.PollingFileWatcherOptions, tailFromEnd bool) (*tailer, error) { + labels model.LabelSet, encoding string, pollOptions watch.PollingFileWatcherOptions, tailFromEnd bool) (*tailer, error) { // Simple check to make sure the file we are tailing doesn't // have a position already saved which is past the end of the file. fi, err := os.Stat(path) if err != nil { return nil, err } - pos, err := positions.Get(path, labels) + labelsStr := labels.String() + pos, err := positions.Get(path, labelsStr) if err != nil { return nil, err } if fi.Size() < pos { - positions.Remove(path, labels) + positions.Remove(path, labelsStr) } // If no cached position is found and the tailFromEnd option is enabled. @@ -72,7 +74,7 @@ func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, p if err != nil { level.Error(logger).Log("msg", "failed to get a position from the end of the file, default to start of file", err) } else { - positions.Put(path, labels, pos) + positions.Put(path, labelsStr, pos) level.Info(logger).Log("msg", "retrieved and stored the position of the last line") } } @@ -101,6 +103,7 @@ func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, p positions: positions, path: path, labels: labels, + labelsStr: labelsStr, tail: tail, running: atomic.NewBool(false), posquit: make(chan struct{}), @@ -295,7 +298,7 @@ func (t *tailer) MarkPositionAndSize() error { // Update metrics and positions file all together to avoid race conditions when `t.tail` is stopped. t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size)) t.metrics.readBytes.WithLabelValues(t.path).Set(float64(pos)) - t.positions.Put(t.path, t.labels, pos) + t.positions.Put(t.path, t.labelsStr, pos) return nil } @@ -356,3 +359,7 @@ func (t *tailer) cleanupMetrics() { func (t *tailer) Path() string { return t.path } + +func (t *tailer) Labels() model.LabelSet { + return t.labels +}