Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix loki source file windows #2282

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ Main (unreleased)

- Update `prometheus.write.queue` library for performance increases in cpu. (@mattdurham)

- Add an argument `retry_interval` to allow `loki.source.file` to try re-opening deleted files on Windows. (@wildum)

### Bugfixes

- Fixed issue with automemlimit logging bad messages and trying to access cgroup on non-linux builds (@dehaansa)
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/reference/components/loki/loki.source.file.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@ The component starts a new reader for each of the given `targets` and fans out l
| `encoding` | `string` | The encoding to convert from when reading files. | `""` | no |
| `tail_from_end` | `bool` | Whether a log file is tailed from the end if a stored position isn't found. | `false` | no |
| `legacy_positions_file` | `string` | Allows conversion from legacy positions file. | `""` | no |
| `retry_interval` | `duration` | Frequency to try re-opening files that were closed. | `"0s"` | no |

The `encoding` argument must be a valid [IANA encoding][] name. If not set, it
defaults to UTF-8.

You can use the `tail_from_end` argument when you want to tail a large file without reading its entire content.
When set to true, only new logs will be read, ignoring the existing ones.

`retry_interval` is deactivated by default (`"0s"`). This should be set on Windows-based systems when rotating files with the same names because the
component will no try to re-open the files and the targets won't be updated because of the cache.
This is not needed on Unix-like systems because the component will always try to re-open the deleted files.
Copy link
Contributor

@clayton-cornell clayton-cornell Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drafting up a new suggestion.

We can state the specific OSes we support.

This sentence (and the overall description) isn't really clear to me. Why will loki.source_file try to reopen deleted files? The description in the table says the arg will tell the component to try to reopen closed files. Which is it? I assume it's closed files since trying to reopen deleted files doesn't really make sense. Why won't the component try to reopen closed files on Windows? It's also not really clear to me. Windows caches the files? And the other OSes do not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a case where a user is rotating a "app.log" file everytime it reaches 5MB by deleting it and creating a new app.log file.

On Windows you must first close the file before deleting it. Closing the file will stop the tailer in the loki.source.file component. If the new file would have a different name, the local.file_match would send it to the loki.source.file component and it would work fine. But for the local.file_match component nothing changed because it only checks at a regular interval and it will just see that there is still an "app.log" file. Because of this, it won't notify the loki.source.file component to start a new tailer and the file won't be tailed (it only notifies the next component when the set of files changed).

On Unix-like systems, you can delete the file without closing it. Because the file is not closed, the tailer is not stopped, it continuously tries to read the file until the local.file_match sends an update. In the case of the user, it will restart the tailer once the new "app.log" file is created because of the names it thinks that it's the same file.


{{< admonition type="note" >}}
The `legacy_positions_file` argument is used when you are transitioning from legacy. The legacy positions file is rewritten into the new format.
Expand Down
16 changes: 11 additions & 5 deletions internal/component/loki/source/file/decompresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type decompressor struct {
handler loki.EntryHandler
positions positions.Positions

path string
labels string
path string
labels model.LabelSet
labelsStr string

posAndSizeMtx sync.Mutex
stopOnce sync.Once
Expand All @@ -70,14 +71,14 @@ func newDecompressor(
handler loki.EntryHandler,
positions positions.Positions,
path string,
labels string,
labels model.LabelSet,
encodingFormat string,
cfg DecompressionConfig,
) (*decompressor, error) {

logger = log.With(logger, "component", "decompressor")

pos, err := positions.Get(path, labels)
pos, err := positions.Get(path, labels.String())
if err != nil {
return nil, fmt.Errorf("failed to get positions: %w", err)
}
Expand All @@ -99,6 +100,7 @@ func newDecompressor(
positions: positions,
path: path,
labels: labels,
labelsStr: labels.String(),
running: atomic.NewBool(false),
posquit: make(chan struct{}),
posdone: make(chan struct{}),
Expand Down Expand Up @@ -268,7 +270,7 @@ func (d *decompressor) MarkPositionAndSize() error {

d.metrics.totalBytes.WithLabelValues(d.path).Set(float64(d.size))
d.metrics.readBytes.WithLabelValues(d.path).Set(float64(d.position))
d.positions.Put(d.path, d.labels, d.position)
d.positions.Put(d.path, d.labelsStr, d.position)

return nil
}
Expand Down Expand Up @@ -318,3 +320,7 @@ func (d *decompressor) cleanupMetrics() {
func (d *decompressor) Path() string {
return d.path
}

func (d *decompressor) Labels() model.LabelSet {
return d.labels
}
75 changes: 59 additions & 16 deletions internal/component/loki/source/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Arguments struct {
FileWatch FileWatch `alloy:"file_watch,block,optional"`
TailFromEnd bool `alloy:"tail_from_end,attr,optional"`
LegacyPositionsFile string `alloy:"legacy_positions_file,attr,optional"`
RetryInterval time.Duration `alloy:"retry_interval,attr,optional"`
}

type FileWatch struct {
Expand Down Expand Up @@ -143,6 +144,24 @@ func (c *Component) Run(ctx context.Context) error {
c.mut.RUnlock()
}()

tickerChan := make(chan struct{})
if c.args.RetryInterval > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesnt handle updating the RetryInterval after running, can we add a test for that?

ticker := time.NewTicker(c.args.RetryInterval)
defer ticker.Stop()

go func() {
for {
select {
case <-ticker.C:
tickerChan <- struct{}{}
case <-ctx.Done():
close(tickerChan)
return
}
}
}()
}

for {
select {
case <-ctx.Done():
Expand All @@ -153,6 +172,28 @@ func (c *Component) Run(ctx context.Context) error {
receiver.Chan() <- entry
}
c.mut.RUnlock()
case <-tickerChan:
c.mut.Lock()
// Find readers that are stopped and re-create them if the files that they were tailing are back.
// This helps for log rotation on Windows because the tailer is closed as soon as the file is removed.
// On Unix-like systems, it won't re-create any reader because the reader will stay open till the next Update call.
restartReaders := make(map[positions.Entry]reader)
for key, reader := range c.readers {
if !reader.IsRunning() {
_, err := os.Stat(reader.Path())
if err != nil {
continue
}
restartReaders[key] = reader
}
}
for key, reader := range restartReaders {
level.Debug(c.opts.Logger).Log("msg", "recreate reader", "path", reader.Path())
reader.Stop()
delete(c.readers, key)
c.addReader(key, reader.Path(), reader.Labels())
}
c.mut.Unlock()
}
}
}
Expand Down Expand Up @@ -200,7 +241,6 @@ func (c *Component) Update(args component.Arguments) error {

for _, target := range newArgs.Targets {
path := target[pathLabel]

labels := make(model.LabelSet)
for k, v := range target {
if strings.HasPrefix(k, model.ReservedLabelPrefix) {
Expand All @@ -214,19 +254,7 @@ func (c *Component) Update(args component.Arguments) error {
if _, exist := c.readers[readersKey]; exist {
continue
}

c.reportSize(path, labels.String())

handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler.Chan(), func() {}))
reader, err := c.startTailing(path, labels, handler)
if err != nil {
continue
}

c.readers[readersKey] = readerWithHandler{
reader: reader,
handler: handler,
}
c.addReader(readersKey, path, labels)
}

// Remove from the positions file any entries that had a Reader before, but
Expand All @@ -238,6 +266,21 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

func (c *Component) addReader(key positions.Entry, path string, labels model.LabelSet) {
c.reportSize(path, labels.String())

handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler.Chan(), func() {}))
reader, err := c.startTailing(path, labels, handler)
if err != nil {
return
}

c.readers[key] = readerWithHandler{
reader: reader,
handler: handler,
}
}

// readerWithHandler combines a reader with an entry handler associated with
// it. Closing the reader will also close the handler.
type readerWithHandler struct {
Expand Down Expand Up @@ -331,7 +374,7 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok
handler,
c.posFile,
path,
labels.String(),
labels,
c.args.Encoding,
c.args.DecompressionConfig,
)
Expand All @@ -352,7 +395,7 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok
handler,
c.posFile,
path,
labels.String(),
labels,
c.args.Encoding,
pollOptions,
c.args.TailFromEnd,
Expand Down
70 changes: 70 additions & 0 deletions internal/component/loki/source/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,73 @@ func TestEncoding(t *testing.T) {
"expected positions.yml file to be written eventually",
)
}

func TestDeleteRecreateFile(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

filename := "example"

ctx, cancel := context.WithCancel(componenttest.TestContext(t))
defer cancel()

// Create file to log to.
f, err := os.Create(filename)
require.NoError(t, err)

ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file")
require.NoError(t, err)

ch1 := loki.NewLogsReceiver()

go func() {
err := ctrl.Run(ctx, Arguments{
Targets: []discovery.Target{{
"__path__": f.Name(),
"foo": "bar",
}},
ForwardTo: []loki.LogsReceiver{ch1},
RetryInterval: 1 * time.Second,
})
require.NoError(t, err)
}()

ctrl.WaitRunning(time.Minute)

_, err = f.Write([]byte("writing some text\n"))
require.NoError(t, err)

wantLabelSet := model.LabelSet{
"filename": model.LabelValue(f.Name()),
"foo": "bar",
}

select {
case logEntry := <-ch1.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, "writing some text", logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t, "failed waiting for log line")
}

require.NoError(t, f.Close())
require.NoError(t, os.Remove(f.Name()))

// Create a file with the same name
f, err = os.Create(filename)
require.NoError(t, err)
defer os.Remove(f.Name())
defer f.Close()

_, err = f.Write([]byte("writing some new text\n"))
require.NoError(t, err)

select {
case logEntry := <-ch1.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, "writing some new text", logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t, "failed waiting for log line")
}
}
86 changes: 86 additions & 0 deletions internal/component/loki/source/file/file_win_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//go:build windows

package file

import (
"context"
"os"
"testing"
"time"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/discovery"
"github.com/grafana/alloy/internal/runtime/componenttest"
"github.com/grafana/alloy/internal/util"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

func TestDeleteRecreateFileNoRetry(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

filename := "example"

ctx, cancel := context.WithCancel(componenttest.TestContext(t))
defer cancel()

// Create file to log to.
f, err := os.Create(filename)
require.NoError(t, err)

ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file")
require.NoError(t, err)

ch1 := loki.NewLogsReceiver()

go func() {
err := ctrl.Run(ctx, Arguments{
Targets: []discovery.Target{{
"__path__": f.Name(),
"foo": "bar",
}},
ForwardTo: []loki.LogsReceiver{ch1},
})
require.NoError(t, err)
}()

ctrl.WaitRunning(time.Minute)

_, err = f.Write([]byte("writing some text\n"))
require.NoError(t, err)

wantLabelSet := model.LabelSet{
"filename": model.LabelValue(f.Name()),
"foo": "bar",
}

select {
case logEntry := <-ch1.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, "writing some text", logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t, "failed waiting for log line")
}

require.NoError(t, f.Close())
require.NoError(t, os.Remove(f.Name()))

// Create a file with the same name
f, err = os.Create(filename)
require.NoError(t, err)
defer os.Remove(f.Name())
defer f.Close()

_, err = f.Write([]byte("writing some new text\n"))
require.NoError(t, err)

select {
case <-ch1.Chan():
t.Fatalf("Unexpected log entry received")
case <-time.After(2 * time.Second):
// Test passes if no log entry is received within the timeout
// This indicates that the log source does not retry reading the file
}
}
3 changes: 3 additions & 0 deletions internal/component/loki/source/file/reader.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package file

import "github.com/prometheus/common/model"

// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// This code accommodates the tailer and decompressor implementations as readers.

Expand All @@ -8,5 +10,6 @@ type reader interface {
Stop()
IsRunning() bool
Path() string
Labels() model.LabelSet
MarkPositionAndSize() error
}
Loading
Loading