Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loki.source.kubernetes: restart tailers less aggressively #601

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ Main (unreleased)
- In `prometheus.exporter.kafka`, the interpolation table used to compute estimated lag metrics is now pruned
on `metadata_refresh_interval` instead of `prune_interval_seconds`. (@wildum)

- Don't restart tailers in `loki.source.kubernetes` component by above-average
time deltas if K8s version is >= 1.29.1 (@hainenber)

### Bugfixes

- Fixed issue with defaults for Beyla component not being applied correctly. (marctc)
Expand Down
81 changes: 49 additions & 32 deletions internal/component/loki/source/kubernetes/kubetail/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/blang/semver/v4"
"github.com/go-kit/log"
"github.com/grafana/alloy/internal/alloy/logging/level"
"github.com/grafana/alloy/internal/component/common/loki"
Expand Down Expand Up @@ -170,6 +171,15 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
return err
}

k8sServerVersion, err := t.opts.Client.Discovery().ServerVersion()
if err != nil {
return err
}
k8sComparableServerVersion, err := semver.ParseTolerant(k8sServerVersion.GitVersion)
if err != nil {
return err
}

// Create a new rolling average calculator to determine the average delta
// time between log entries.
//
Expand All @@ -180,41 +190,48 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
// The computed average will never be less than the minimum of 2s.
calc := newRollingAverageCalculator(10000, 100, 2*time.Second, maxTailerLifetime)

go func() {
rolledFileTicker := time.NewTicker(1 * time.Second)
defer func() {
rolledFileTicker.Stop()
_ = stream.Close()
}()
for {
select {
case <-ctx.Done():
return
case <-rolledFileTicker.C:
// Versions of Kubernetes which do not contain
// kubernetes/kubernetes#115702 will fail to detect rolled log files
// and stop sending logs to us.
//
// To work around this, we use a rolling average to determine how
// frequent we usually expect to see entries. If 3x the normal delta has
// elapsed, we'll restart the tailer.
//
// False positives here are acceptable, but false negatives mean that
// we'll have a larger spike of missing logs until we detect a rolled
// file.
avg := calc.GetAverage()
last := calc.GetLast()
if last.IsZero() {
continue
}
s := time.Since(last)
if s > avg*3 {
level.Info(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s)
// Versions of Kubernetes which do not contain
// kubernetes/kubernetes#115702 (<= v1.29.1) will fail to detect rotated log files
// and stop sending logs to us.
//
// To work around this, we use a rolling average to determine how
// frequent we usually expect to see entries. If 3x the normal delta has
// elapsed, we'll restart the tailer.
//
// False positives here are acceptable, but false negatives mean that
// we'll have a larger spike of missing logs until we detect a rolled
// file.
if k8sComparableServerVersion.LT(semver.Version{Major: 1, Minor: 29, Patch: 0}) {
go func() {
rolledFileTicker := time.NewTicker(1 * time.Second)
defer func() {
rolledFileTicker.Stop()
_ = stream.Close()
}()
for {
select {
case <-ctx.Done():
return
case <-rolledFileTicker.C:
avg := calc.GetAverage()
last := calc.GetLast()
if last.IsZero() {
continue
}
s := time.Since(last)
if s > avg*3 {
level.Debug(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s)
return
}
}
}
}
}()
}()
} else {
go func() {
<-ctx.Done()
_ = stream.Close()
}()
}

level.Info(t.log).Log("msg", "opened log stream", "start time", lastReadTime)

Expand Down
Loading