diff --git a/CHANGELOG.md b/CHANGELOG.md index e4bb8db9cc..23e8ffd5a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,9 @@ Main (unreleased) - In `prometheus.exporter.kafka`, the interpolation table used to compute estimated lag metrics is now pruned on `metadata_refresh_interval` instead of `prune_interval_seconds`. (@wildum) +- Don't restart tailers in `loki.source.kubernetes` component by above-average + time deltas if K8s version is >= 1.29.1 (@hainenber) + - Add support for configuring CPU profile's duration scraped by `pyroscope.scrape`. (@hainenber) ### Bugfixes diff --git a/internal/component/loki/source/kubernetes/kubetail/tailer.go b/internal/component/loki/source/kubernetes/kubetail/tailer.go index 01fcc24fcf..1fb6280c69 100644 --- a/internal/component/loki/source/kubernetes/kubetail/tailer.go +++ b/internal/component/loki/source/kubernetes/kubetail/tailer.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/blang/semver/v4" "github.com/go-kit/log" "github.com/grafana/alloy/internal/alloy/logging/level" "github.com/grafana/alloy/internal/component/common/loki" @@ -170,6 +171,15 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { return err } + k8sServerVersion, err := t.opts.Client.Discovery().ServerVersion() + if err != nil { + return err + } + k8sComparableServerVersion, err := semver.ParseTolerant(k8sServerVersion.GitVersion) + if err != nil { + return err + } + // Create a new rolling average calculator to determine the average delta // time between log entries. // @@ -180,41 +190,48 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { // The computed average will never be less than the minimum of 2s. calc := newRollingAverageCalculator(10000, 100, 2*time.Second, maxTailerLifetime) - go func() { - rolledFileTicker := time.NewTicker(1 * time.Second) - defer func() { - rolledFileTicker.Stop() - _ = stream.Close() - }() - for { - select { - case <-ctx.Done(): - return - case <-rolledFileTicker.C: - // Versions of Kubernetes which do not contain - // kubernetes/kubernetes#115702 will fail to detect rolled log files - // and stop sending logs to us. - // - // To work around this, we use a rolling average to determine how - // frequent we usually expect to see entries. If 3x the normal delta has - // elapsed, we'll restart the tailer. - // - // False positives here are acceptable, but false negatives mean that - // we'll have a larger spike of missing logs until we detect a rolled - // file. - avg := calc.GetAverage() - last := calc.GetLast() - if last.IsZero() { - continue - } - s := time.Since(last) - if s > avg*3 { - level.Info(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s) + // Versions of Kubernetes which do not contain + // kubernetes/kubernetes#115702 (<= v1.29.1) will fail to detect rotated log files + // and stop sending logs to us. + // + // To work around this, we use a rolling average to determine how + // frequent we usually expect to see entries. If 3x the normal delta has + // elapsed, we'll restart the tailer. + // + // False positives here are acceptable, but false negatives mean that + // we'll have a larger spike of missing logs until we detect a rolled + // file. + if k8sComparableServerVersion.LT(semver.Version{Major: 1, Minor: 29, Patch: 0}) { + go func() { + rolledFileTicker := time.NewTicker(1 * time.Second) + defer func() { + rolledFileTicker.Stop() + _ = stream.Close() + }() + for { + select { + case <-ctx.Done(): return + case <-rolledFileTicker.C: + avg := calc.GetAverage() + last := calc.GetLast() + if last.IsZero() { + continue + } + s := time.Since(last) + if s > avg*3 { + level.Debug(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s) + return + } } } - } - }() + }() + } else { + go func() { + <-ctx.Done() + _ = stream.Close() + }() + } level.Info(t.log).Log("msg", "opened log stream", "start time", lastReadTime)