From 67a6239152bc456bce99458730a6c216f24dcdf4 Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Fri, 21 Jun 2024 16:32:37 +0300 Subject: [PATCH] remotecfg: add jitter to polling frequency (#961) Signed-off-by: Paschalis Tsilias --- CHANGELOG.md | 2 + internal/service/remotecfg/remotecfg.go | 14 ++-- internal/util/jitter/jitter.go | 100 ++++++++++++++++++++++++ internal/util/jitter/jitter_test.go | 88 +++++++++++++++++++++ 4 files changed, 197 insertions(+), 7 deletions(-) create mode 100644 internal/util/jitter/jitter.go create mode 100644 internal/util/jitter/jitter_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 4203bd68cd..13b24bdcd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,8 @@ v1.2.0-rc.0 - Add an initial lower limit of 10 seconds for the the `poll_frequency` argument in the `remotecfg` block. (@tpaschalis) +- Add a constant jitter to `remotecfg` service's polling. (@tpaschalis) + - Added support for NS records to `discovery.dns`. (@djcode) - Improved clustering use cases for tracking GCP delta metrics in the `prometheus.exporter.gcp` (@kgeckhart) diff --git a/internal/service/remotecfg/remotecfg.go b/internal/service/remotecfg/remotecfg.go index 54db027c70..a56351c1d8 100644 --- a/internal/service/remotecfg/remotecfg.go +++ b/internal/service/remotecfg/remotecfg.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/service" + "github.com/grafana/alloy/internal/util/jitter" "github.com/grafana/alloy/syntax" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -32,6 +33,8 @@ func getHash(in []byte) string { return fmt.Sprintf("%x", fnvHash.Sum(nil)) } +const baseJitter = 100 * time.Millisecond + // Service implements a service for remote configuration. // The default value of ch is nil; this means it will block forever if the // remotecfg service is not configured. In addition, we're keeping track of @@ -46,8 +49,7 @@ type Service struct { mut sync.RWMutex asClient collectorv1connect.CollectorServiceClient - ch <-chan time.Time - ticker *time.Ticker + ticker *jitter.Ticker dataPath string currentConfigHash string metrics *metrics @@ -131,7 +133,7 @@ func New(opts Options) (*Service, error) { return &Service{ opts: opts, - ticker: time.NewTicker(math.MaxInt64), + ticker: jitter.NewTicker(math.MaxInt64-baseJitter, baseJitter), // first argument is set as-is to avoid overflowing }, nil } @@ -210,7 +212,7 @@ func (s *Service) Run(ctx context.Context, host service.Host) error { for { select { - case <-s.ch: + case <-s.ticker.C: err := s.fetchRemote() if err != nil { level.Error(s.opts.Logger).Log("msg", "failed to fetch remote configuration from the API", "err", err) @@ -230,8 +232,7 @@ func (s *Service) Update(newConfig any) error { // it. Make sure we stop everything gracefully before returning. if newArgs.URL == "" { s.mut.Lock() - s.ch = nil - s.ticker.Reset(math.MaxInt64) + s.ticker.Reset(math.MaxInt64 - baseJitter) // avoid overflowing s.asClient = noopClient{} s.args.HTTPClientConfig = config.CloneDefaultHTTPClientConfig() s.mut.Unlock() @@ -247,7 +248,6 @@ func (s *Service) Update(newConfig any) error { } s.dataPath = filepath.Join(s.opts.StoragePath, ServiceName, hash) s.ticker.Reset(newArgs.PollFrequency) - s.ch = s.ticker.C // Update the HTTP client last since it might fail. if !reflect.DeepEqual(s.args.HTTPClientConfig, newArgs.HTTPClientConfig) { httpClient, err := commonconfig.NewClientFromConfig(*newArgs.HTTPClientConfig.Convert(), "remoteconfig") diff --git a/internal/util/jitter/jitter.go b/internal/util/jitter/jitter.go new file mode 100644 index 0000000000..a9525ad9fb --- /dev/null +++ b/internal/util/jitter/jitter.go @@ -0,0 +1,100 @@ +package jitter + +import ( + "math/rand" + "sync" + "time" +) + +type Ticker struct { + C <-chan time.Time + stop chan struct{} + reset chan struct{} + + mut sync.RWMutex + d time.Duration + j time.Duration +} + +// NewTicker creates a Ticker that works similar to time.Ticker, but sends the +// time with a period specified by `duration` adjusted by a pseudorandom jitter +// in the range of [duration-jitter, duration+jitter). +// Following the behavior of time.Ticker, we use a 1-buffer channel, so if the +// client falls behind while reading, we'll drop ticks on the floor until the +// client catches up. +// Callers have to make sure that both duration and the [d-j, d+j) intervals +// are valid positive int64 values (non-negative and non-overflowing). +// Use Stop to release associated resources and the Reset methods to modify the +// duration and jitter. +func NewTicker(duration time.Duration, jitter time.Duration) *Ticker { + ticker := time.NewTicker(duration) + c := make(chan time.Time, 1) + t := &Ticker{ + C: c, + + stop: make(chan struct{}), + reset: make(chan struct{}), + d: duration, + j: jitter, + } + + go func() { + for { + select { + case tc := <-ticker.C: + ticker.Reset(t.getNextPeriod()) + select { + case c <- tc: + default: + } + case <-t.stop: + ticker.Stop() + return + case <-t.reset: + ticker.Reset(t.getNextPeriod()) + } + } + }() + return t +} + +// Stop turns off the Ticker; no more ticks will be sent. Stop does not close +// Ticker's channel, to prevent a concurrent goroutine from seeing an erroneous +// "tick". +func (t *Ticker) Stop() { + close(t.reset) + close(t.stop) +} + +// Reset stops the Ticker, resets its base duration to the specified argument +// and re-calculates the period with a jitter. +// The next tick will arrive after the new period elapses. +func (t *Ticker) Reset(d time.Duration) { + t.mut.Lock() + t.d = d + t.mut.Unlock() + t.reset <- struct{}{} +} + +// Reset stops the Ticker, resets its jitter to the specified argument and +// re-calculates the period with the new jitter. +// The next tick will arrive after the new period elapses. +func (t *Ticker) ResetJitter(d time.Duration) { + t.mut.Lock() + t.j = d + t.mut.Unlock() + t.reset <- struct{}{} +} + +// getNextPeriod is used to calculate the period for the Ticker. +func (t *Ticker) getNextPeriod() time.Duration { + // jitter is a random value between [0, 2j) + // the returned period is then d-j + jitter + // which results in [d-j, d+j). + t.mut.RLock() + jitter := rand.Int63n(2 * int64(t.j)) + period := t.d - t.j + time.Duration(jitter) + t.mut.RUnlock() + + return period +} diff --git a/internal/util/jitter/jitter_test.go b/internal/util/jitter/jitter_test.go new file mode 100644 index 0000000000..ef7dbfef9a --- /dev/null +++ b/internal/util/jitter/jitter_test.go @@ -0,0 +1,88 @@ +package jitter + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// Inspired by a test on github.com/mroth/jitter +func TestTicker(t *testing.T) { + var ( + d = 10 * time.Millisecond + j = 3 * time.Millisecond + n = 10 + delta = 1 * time.Millisecond + min = time.Duration(math.Floor(float64(d)-float64(j)))*time.Duration(n) - delta + max = time.Duration(math.Ceil(float64(d)+float64(j)))*time.Duration(n) + delta + ) + + // Check that the time required for N ticks is within expected range. + ticker := NewTicker(d, j) + start := time.Now() + for i := 0; i < n; i++ { + <-ticker.C + } + + elapsed := time.Since(start) + if elapsed < min || elapsed > max { + require.Fail(t, "ticker didn't meet timing criteria", "time needed for %d ticks %v outside of expected range [%v - %v]", n, elapsed, min, max) + } +} + +func TestTickerStop(t *testing.T) { + t.Parallel() + + var ( + d = 5 * time.Millisecond + j = 1 * time.Millisecond + before = 3 // ticks before stop + wait = d * 10 // monitor after stop + ) + + ticker := NewTicker(d, j) + for i := 0; i < before; i++ { + <-ticker.C + } + + ticker.Stop() + select { + case <-ticker.C: + require.Fail(t, "Got tick after Stop()") + case <-time.After(wait): + } +} + +func TestTickerReset(t *testing.T) { + var ( + d1 = 10 * time.Millisecond + d2 = 20 * time.Millisecond + j1 = 3 * time.Millisecond + j2 = 9 * time.Millisecond + n = 10 + delta = 1 * time.Millisecond + min1 = time.Duration(math.Floor(float64(d1)-float64(j1)))*time.Duration(n) - delta + max1 = time.Duration(math.Ceil(float64(d1)+float64(j1)))*time.Duration(n) + delta + min2 = time.Duration(math.Floor(float64(d2)-float64(j2)))*time.Duration(n) - delta + max2 = time.Duration(math.Ceil(float64(d2)+float64(j2)))*time.Duration(n) + delta + ) + + // Check that the time required for N ticks is within expected range. + ticker := NewTicker(d1, j1) + start := time.Now() + for i := 0; i < n; i++ { + <-ticker.C + } + ticker.Reset(d2) + ticker.ResetJitter(j2) + for i := 0; i < n; i++ { + <-ticker.C + } + + elapsed := time.Since(start) + if elapsed < (min1+min2) || elapsed > (max1+max2) { + require.Fail(t, "ticker didn't meet timing criteria", "time needed for %d ticks %v outside of expected range [%v - %v]", n, elapsed, (min1 + min2), (max1 + max2)) + } +}