Skip to content

Commit

Permalink
remotecfg: add jitter to polling frequency (#961)
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <[email protected]>
  • Loading branch information
tpaschalis authored Jun 21, 2024
1 parent a31c1f5 commit 67a6239
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ v1.2.0-rc.0
- Add an initial lower limit of 10 seconds for the the `poll_frequency`
argument in the `remotecfg` block. (@tpaschalis)

- Add a constant jitter to `remotecfg` service's polling. (@tpaschalis)

- Added support for NS records to `discovery.dns`. (@djcode)

- Improved clustering use cases for tracking GCP delta metrics in the `prometheus.exporter.gcp` (@kgeckhart)
Expand Down
14 changes: 7 additions & 7 deletions internal/service/remotecfg/remotecfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/service"
"github.com/grafana/alloy/internal/util/jitter"
"github.com/grafana/alloy/syntax"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -32,6 +33,8 @@ func getHash(in []byte) string {
return fmt.Sprintf("%x", fnvHash.Sum(nil))
}

const baseJitter = 100 * time.Millisecond

// Service implements a service for remote configuration.
// The default value of ch is nil; this means it will block forever if the
// remotecfg service is not configured. In addition, we're keeping track of
Expand All @@ -46,8 +49,7 @@ type Service struct {

mut sync.RWMutex
asClient collectorv1connect.CollectorServiceClient
ch <-chan time.Time
ticker *time.Ticker
ticker *jitter.Ticker
dataPath string
currentConfigHash string
metrics *metrics
Expand Down Expand Up @@ -131,7 +133,7 @@ func New(opts Options) (*Service, error) {

return &Service{
opts: opts,
ticker: time.NewTicker(math.MaxInt64),
ticker: jitter.NewTicker(math.MaxInt64-baseJitter, baseJitter), // first argument is set as-is to avoid overflowing
}, nil
}

Expand Down Expand Up @@ -210,7 +212,7 @@ func (s *Service) Run(ctx context.Context, host service.Host) error {

for {
select {
case <-s.ch:
case <-s.ticker.C:
err := s.fetchRemote()
if err != nil {
level.Error(s.opts.Logger).Log("msg", "failed to fetch remote configuration from the API", "err", err)
Expand All @@ -230,8 +232,7 @@ func (s *Service) Update(newConfig any) error {
// it. Make sure we stop everything gracefully before returning.
if newArgs.URL == "" {
s.mut.Lock()
s.ch = nil
s.ticker.Reset(math.MaxInt64)
s.ticker.Reset(math.MaxInt64 - baseJitter) // avoid overflowing
s.asClient = noopClient{}
s.args.HTTPClientConfig = config.CloneDefaultHTTPClientConfig()
s.mut.Unlock()
Expand All @@ -247,7 +248,6 @@ func (s *Service) Update(newConfig any) error {
}
s.dataPath = filepath.Join(s.opts.StoragePath, ServiceName, hash)
s.ticker.Reset(newArgs.PollFrequency)
s.ch = s.ticker.C
// Update the HTTP client last since it might fail.
if !reflect.DeepEqual(s.args.HTTPClientConfig, newArgs.HTTPClientConfig) {
httpClient, err := commonconfig.NewClientFromConfig(*newArgs.HTTPClientConfig.Convert(), "remoteconfig")
Expand Down
100 changes: 100 additions & 0 deletions internal/util/jitter/jitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package jitter

import (
"math/rand"
"sync"
"time"
)

type Ticker struct {
C <-chan time.Time
stop chan struct{}
reset chan struct{}

mut sync.RWMutex
d time.Duration
j time.Duration
}

// NewTicker creates a Ticker that works similar to time.Ticker, but sends the
// time with a period specified by `duration` adjusted by a pseudorandom jitter
// in the range of [duration-jitter, duration+jitter).
// Following the behavior of time.Ticker, we use a 1-buffer channel, so if the
// client falls behind while reading, we'll drop ticks on the floor until the
// client catches up.
// Callers have to make sure that both duration and the [d-j, d+j) intervals
// are valid positive int64 values (non-negative and non-overflowing).
// Use Stop to release associated resources and the Reset methods to modify the
// duration and jitter.
func NewTicker(duration time.Duration, jitter time.Duration) *Ticker {
ticker := time.NewTicker(duration)
c := make(chan time.Time, 1)
t := &Ticker{
C: c,

stop: make(chan struct{}),
reset: make(chan struct{}),
d: duration,
j: jitter,
}

go func() {
for {
select {
case tc := <-ticker.C:
ticker.Reset(t.getNextPeriod())
select {
case c <- tc:
default:
}
case <-t.stop:
ticker.Stop()
return
case <-t.reset:
ticker.Reset(t.getNextPeriod())
}
}
}()
return t
}

// Stop turns off the Ticker; no more ticks will be sent. Stop does not close
// Ticker's channel, to prevent a concurrent goroutine from seeing an erroneous
// "tick".
func (t *Ticker) Stop() {
close(t.reset)
close(t.stop)
}

// Reset stops the Ticker, resets its base duration to the specified argument
// and re-calculates the period with a jitter.
// The next tick will arrive after the new period elapses.
func (t *Ticker) Reset(d time.Duration) {
t.mut.Lock()
t.d = d
t.mut.Unlock()
t.reset <- struct{}{}
}

// Reset stops the Ticker, resets its jitter to the specified argument and
// re-calculates the period with the new jitter.
// The next tick will arrive after the new period elapses.
func (t *Ticker) ResetJitter(d time.Duration) {
t.mut.Lock()
t.j = d
t.mut.Unlock()
t.reset <- struct{}{}
}

// getNextPeriod is used to calculate the period for the Ticker.
func (t *Ticker) getNextPeriod() time.Duration {
// jitter is a random value between [0, 2j)
// the returned period is then d-j + jitter
// which results in [d-j, d+j).
t.mut.RLock()
jitter := rand.Int63n(2 * int64(t.j))
period := t.d - t.j + time.Duration(jitter)
t.mut.RUnlock()

return period
}
88 changes: 88 additions & 0 deletions internal/util/jitter/jitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package jitter

import (
"math"
"testing"
"time"

"github.com/stretchr/testify/require"
)

// Inspired by a test on github.com/mroth/jitter
func TestTicker(t *testing.T) {
var (
d = 10 * time.Millisecond
j = 3 * time.Millisecond
n = 10
delta = 1 * time.Millisecond
min = time.Duration(math.Floor(float64(d)-float64(j)))*time.Duration(n) - delta
max = time.Duration(math.Ceil(float64(d)+float64(j)))*time.Duration(n) + delta
)

// Check that the time required for N ticks is within expected range.
ticker := NewTicker(d, j)
start := time.Now()
for i := 0; i < n; i++ {
<-ticker.C
}

elapsed := time.Since(start)
if elapsed < min || elapsed > max {
require.Fail(t, "ticker didn't meet timing criteria", "time needed for %d ticks %v outside of expected range [%v - %v]", n, elapsed, min, max)
}
}

func TestTickerStop(t *testing.T) {
t.Parallel()

var (
d = 5 * time.Millisecond
j = 1 * time.Millisecond
before = 3 // ticks before stop
wait = d * 10 // monitor after stop
)

ticker := NewTicker(d, j)
for i := 0; i < before; i++ {
<-ticker.C
}

ticker.Stop()
select {
case <-ticker.C:
require.Fail(t, "Got tick after Stop()")
case <-time.After(wait):
}
}

func TestTickerReset(t *testing.T) {
var (
d1 = 10 * time.Millisecond
d2 = 20 * time.Millisecond
j1 = 3 * time.Millisecond
j2 = 9 * time.Millisecond
n = 10
delta = 1 * time.Millisecond
min1 = time.Duration(math.Floor(float64(d1)-float64(j1)))*time.Duration(n) - delta
max1 = time.Duration(math.Ceil(float64(d1)+float64(j1)))*time.Duration(n) + delta
min2 = time.Duration(math.Floor(float64(d2)-float64(j2)))*time.Duration(n) - delta
max2 = time.Duration(math.Ceil(float64(d2)+float64(j2)))*time.Duration(n) + delta
)

// Check that the time required for N ticks is within expected range.
ticker := NewTicker(d1, j1)
start := time.Now()
for i := 0; i < n; i++ {
<-ticker.C
}
ticker.Reset(d2)
ticker.ResetJitter(j2)
for i := 0; i < n; i++ {
<-ticker.C
}

elapsed := time.Since(start)
if elapsed < (min1+min2) || elapsed > (max1+max2) {
require.Fail(t, "ticker didn't meet timing criteria", "time needed for %d ticks %v outside of expected range [%v - %v]", n, elapsed, (min1 + min2), (max1 + max2))
}
}

0 comments on commit 67a6239

Please sign in to comment.