Skip to content

Commit

Permalink
restart readers that have been stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed Dec 12, 2024
1 parent 218a683 commit 27cee0f
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 29 deletions.
16 changes: 11 additions & 5 deletions internal/component/loki/source/file/decompresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type decompressor struct {
handler loki.EntryHandler
positions positions.Positions

path string
labels string
path string
labels model.LabelSet
labelsStr string

posAndSizeMtx sync.Mutex
stopOnce sync.Once
Expand All @@ -70,14 +71,14 @@ func newDecompressor(
handler loki.EntryHandler,
positions positions.Positions,
path string,
labels string,
labels model.LabelSet,
encodingFormat string,
cfg DecompressionConfig,
) (*decompressor, error) {

logger = log.With(logger, "component", "decompressor")

pos, err := positions.Get(path, labels)
pos, err := positions.Get(path, labels.String())
if err != nil {
return nil, fmt.Errorf("failed to get positions: %w", err)
}
Expand All @@ -99,6 +100,7 @@ func newDecompressor(
positions: positions,
path: path,
labels: labels,
labelsStr: labels.String(),
running: atomic.NewBool(false),
posquit: make(chan struct{}),
posdone: make(chan struct{}),
Expand Down Expand Up @@ -268,7 +270,7 @@ func (d *decompressor) MarkPositionAndSize() error {

d.metrics.totalBytes.WithLabelValues(d.path).Set(float64(d.size))
d.metrics.readBytes.WithLabelValues(d.path).Set(float64(d.position))
d.positions.Put(d.path, d.labels, d.position)
d.positions.Put(d.path, d.labelsStr, d.position)

return nil
}
Expand Down Expand Up @@ -318,3 +320,7 @@ func (d *decompressor) cleanupMetrics() {
func (d *decompressor) Path() string {
return d.path
}

func (d *decompressor) Labels() model.LabelSet {
return d.labels
}
61 changes: 45 additions & 16 deletions internal/component/loki/source/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func (c *Component) Run(ctx context.Context) error {
c.mut.RUnlock()
}()

// Check every 2 seconds for readers that were stopped
// Should we have a parameter for this?
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -153,6 +158,28 @@ func (c *Component) Run(ctx context.Context) error {
receiver.Chan() <- entry
}
c.mut.RUnlock()
case <-ticker.C:
c.mut.RLock()
// Find readers that are stopped and re-create them if the files that they were tailing are back.
// This helps for log rotation on Windows because the tailer is closed as soon as the file is removed.
// On Unix-like systems, it won't re-create any reader because the reader will stay open till the next Update call.
restartReaders := make(map[positions.Entry]reader)
for key, reader := range c.readers {
if !reader.IsRunning() {
_, err := os.Stat(reader.Path())
if err != nil {
continue
}
restartReaders[key] = reader
}
}
for key, reader := range restartReaders {
level.Debug(c.opts.Logger).Log("msg", "recreate reader", "path", reader.Path())
reader.Stop()
delete(c.readers, key)
c.addReader(key, reader.Path(), reader.Labels())
}
c.mut.RUnlock()
}
}
}
Expand Down Expand Up @@ -200,7 +227,6 @@ func (c *Component) Update(args component.Arguments) error {

for _, target := range newArgs.Targets {
path := target[pathLabel]

labels := make(model.LabelSet)
for k, v := range target {
if strings.HasPrefix(k, model.ReservedLabelPrefix) {
Expand All @@ -214,19 +240,7 @@ func (c *Component) Update(args component.Arguments) error {
if _, exist := c.readers[readersKey]; exist {
continue
}

c.reportSize(path, labels.String())

handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler.Chan(), func() {}))
reader, err := c.startTailing(path, labels, handler)
if err != nil {
continue
}

c.readers[readersKey] = readerWithHandler{
reader: reader,
handler: handler,
}
c.addReader(readersKey, path, labels)
}

// Remove from the positions file any entries that had a Reader before, but
Expand All @@ -238,6 +252,21 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

func (c *Component) addReader(key positions.Entry, path string, labels model.LabelSet) {
c.reportSize(path, labels.String())

handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler.Chan(), func() {}))
reader, err := c.startTailing(path, labels, handler)
if err != nil {
return
}

c.readers[key] = readerWithHandler{
reader: reader,
handler: handler,
}
}

// readerWithHandler combines a reader with an entry handler associated with
// it. Closing the reader will also close the handler.
type readerWithHandler struct {
Expand Down Expand Up @@ -331,7 +360,7 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok
handler,
c.posFile,
path,
labels.String(),
labels,
c.args.Encoding,
c.args.DecompressionConfig,
)
Expand All @@ -352,7 +381,7 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok
handler,
c.posFile,
path,
labels.String(),
labels,
c.args.Encoding,
pollOptions,
c.args.TailFromEnd,
Expand Down
69 changes: 69 additions & 0 deletions internal/component/loki/source/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,72 @@ func TestEncoding(t *testing.T) {
"expected positions.yml file to be written eventually",
)
}

func TestDeleteRecreateFile(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

filename := "example"

ctx, cancel := context.WithCancel(componenttest.TestContext(t))
defer cancel()

// Create file to log to.
f, err := os.Create(filename)
require.NoError(t, err)

ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file")
require.NoError(t, err)

ch1 := loki.NewLogsReceiver()

go func() {
err := ctrl.Run(ctx, Arguments{
Targets: []discovery.Target{{
"__path__": f.Name(),
"foo": "bar",
}},
ForwardTo: []loki.LogsReceiver{ch1},
})
require.NoError(t, err)
}()

ctrl.WaitRunning(time.Minute)

_, err = f.Write([]byte("writing some text\n"))
require.NoError(t, err)

wantLabelSet := model.LabelSet{
"filename": model.LabelValue(f.Name()),
"foo": "bar",
}

select {
case logEntry := <-ch1.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, "writing some text", logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t, "failed waiting for log line")
}

require.NoError(t, f.Close())
require.NoError(t, os.Remove(f.Name()))

// Create a file with the same name
f, err = os.Create(filename)
require.NoError(t, err)
defer os.Remove(f.Name())
defer f.Close()

_, err = f.Write([]byte("writing some new text\n"))
require.NoError(t, err)

select {
case logEntry := <-ch1.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, "writing some new text", logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t, "failed waiting for log line")
}
}
3 changes: 3 additions & 0 deletions internal/component/loki/source/file/reader.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package file

import "github.com/prometheus/common/model"

// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// This code accommodates the tailer and decompressor implementations as readers.

Expand All @@ -8,5 +10,6 @@ type reader interface {
Stop()
IsRunning() bool
Path() string
Labels() model.LabelSet
MarkPositionAndSize() error
}
23 changes: 15 additions & 8 deletions internal/component/loki/source/file/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ type tailer struct {
handler loki.EntryHandler
positions positions.Positions

path string
labels string
tail *tail.Tail
path string
labels model.LabelSet
labelsStr string
tail *tail.Tail

posAndSizeMtx sync.Mutex
stopOnce sync.Once
Expand All @@ -50,20 +51,21 @@ type tailer struct {
}

func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string,
labels string, encoding string, pollOptions watch.PollingFileWatcherOptions, tailFromEnd bool) (*tailer, error) {
labels model.LabelSet, encoding string, pollOptions watch.PollingFileWatcherOptions, tailFromEnd bool) (*tailer, error) {
// Simple check to make sure the file we are tailing doesn't
// have a position already saved which is past the end of the file.
fi, err := os.Stat(path)
if err != nil {
return nil, err
}
pos, err := positions.Get(path, labels)
labelsStr := labels.String()
pos, err := positions.Get(path, labelsStr)
if err != nil {
return nil, err
}

if fi.Size() < pos {
positions.Remove(path, labels)
positions.Remove(path, labelsStr)
}

// If no cached position is found and the tailFromEnd option is enabled.
Expand All @@ -72,7 +74,7 @@ func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, p
if err != nil {
level.Error(logger).Log("msg", "failed to get a position from the end of the file, default to start of file", err)
} else {
positions.Put(path, labels, pos)
positions.Put(path, labelsStr, pos)
level.Info(logger).Log("msg", "retrieved and stored the position of the last line")
}
}
Expand Down Expand Up @@ -101,6 +103,7 @@ func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, p
positions: positions,
path: path,
labels: labels,
labelsStr: labelsStr,
tail: tail,
running: atomic.NewBool(false),
posquit: make(chan struct{}),
Expand Down Expand Up @@ -295,7 +298,7 @@ func (t *tailer) MarkPositionAndSize() error {
// Update metrics and positions file all together to avoid race conditions when `t.tail` is stopped.
t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size))
t.metrics.readBytes.WithLabelValues(t.path).Set(float64(pos))
t.positions.Put(t.path, t.labels, pos)
t.positions.Put(t.path, t.labelsStr, pos)

return nil
}
Expand Down Expand Up @@ -356,3 +359,7 @@ func (t *tailer) cleanupMetrics() {
func (t *tailer) Path() string {
return t.path
}

func (t *tailer) Labels() model.LabelSet {
return t.labels
}

0 comments on commit 27cee0f

Please sign in to comment.