From 375025025a073dbfe19bbe83ef2c96350a327a73 Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Thu, 16 Nov 2023 17:52:27 -0300 Subject: [PATCH 01/13] drain test and routing --- component/common/loki/client/manager.go | 14 +-- component/common/loki/client/queue_client.go | 4 +- component/common/loki/wal/watcher.go | 6 +- component/common/loki/wal/watcher_test.go | 94 +++++++++++++++++++- component/loki/write/write.go | 15 +++- 5 files changed, 121 insertions(+), 12 deletions(-) diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index 6683e9e5772b..a239e9a90091 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -35,8 +35,8 @@ func (n nilNotifier) SubscribeCleanup(_ wal.CleanupEventSubscriber) {} func (n nilNotifier) SubscribeWrite(_ wal.WriteEventSubscriber) {} -type Stoppable interface { - Stop() +type Drainable interface { + Stop(drain bool) } type StoppableClient interface { @@ -53,7 +53,7 @@ type StoppableClient interface { type Manager struct { name string clients []Client - walWatchers []Stoppable + walWatchers []Drainable // stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface stoppableClients []StoppableClient @@ -78,7 +78,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr clientsCheck := make(map[string]struct{}) clients := make([]Client, 0, len(clientCfgs)) - watchers := make([]Stoppable, 0, len(clientCfgs)) + watchers := make([]Drainable, 0, len(clientCfgs)) stoppableClients := make([]StoppableClient, 0, len(clientCfgs)) for _, cfg := range clientCfgs { // Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name). @@ -188,12 +188,16 @@ func (m *Manager) Chan() chan<- loki.Entry { } func (m *Manager) Stop() { + m.StopWithDrain(false) +} + +func (m *Manager) StopWithDrain(drain bool) { // first stop the receiving channel m.once.Do(func() { close(m.entries) }) m.wg.Wait() // close wal watchers for _, walWatcher := range m.walWatchers { - walWatcher.Stop() + walWatcher.Stop(drain) } // close clients for _, c := range m.stoppableClients { diff --git a/component/common/loki/client/queue_client.go b/component/common/loki/client/queue_client.go index 871880cad17b..edd9c25bbd37 100644 --- a/component/common/loki/client/queue_client.go +++ b/component/common/loki/client/queue_client.go @@ -29,7 +29,7 @@ import ( // StoppableWriteTo is a mixing of the WAL's WriteTo interface, that is Stoppable as well. type StoppableWriteTo interface { agentWal.WriteTo - Stoppable + Stop() StopNow() } @@ -38,7 +38,7 @@ type StoppableWriteTo interface { type MarkerHandler interface { UpdateReceivedData(segmentId, dataCount int) // Data queued for sending UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending - Stoppable + Stop() } // queuedBatch is a batch specific to a tenant, that is considered ready to be sent. diff --git a/component/common/loki/wal/watcher.go b/component/common/loki/wal/watcher.go index 0972f32f8f8a..84cf88df0664 100644 --- a/component/common/loki/wal/watcher.go +++ b/component/common/loki/wal/watcher.go @@ -331,7 +331,11 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) { return readData, firstErr } -func (w *Watcher) Stop() { +// Stop stops the Watcher, draining the WAL until the end of the last segment if drain is true. Since the writer of the WAL +// is expected to have stopped before the Watcher, the last segment will be drained completely before the end of Stop. +// +// Note if drain is enabled, the caller routine of Stop will block executing the drain procedure. +func (w *Watcher) Stop(drain bool) { // first close the quit channel to order main mainLoop routine to stop close(w.quit) // upon calling stop, wait for main mainLoop execution to stop diff --git a/component/common/loki/wal/watcher_test.go b/component/common/loki/wal/watcher_test.go index 97de19748dee..e256ec49e9bc 100644 --- a/component/common/loki/wal/watcher_test.go +++ b/component/common/loki/wal/watcher_test.go @@ -2,6 +2,7 @@ package wal import ( "fmt" + "go.uber.org/atomic" "os" "testing" "time" @@ -341,7 +342,7 @@ func TestWatcher(t *testing.T) { } // create new watcher, and defer stop watcher := NewWatcher(dir, "test", metrics, writeTo, logger, DefaultWatchConfig, noMarker{}) - defer watcher.Stop() + defer watcher.Stop(false) wl, err := New(Config{ Enabled: true, Dir: dir, @@ -419,7 +420,7 @@ func TestWatcher_Replay(t *testing.T) { return 0 }, }) - defer watcher.Stop() + defer watcher.Stop(false) wl, err := New(Config{ Enabled: true, Dir: dir, @@ -501,7 +502,7 @@ func TestWatcher_Replay(t *testing.T) { return -1 }, }) - defer watcher.Stop() + defer watcher.Stop(false) wl, err := New(Config{ Enabled: true, Dir: dir, @@ -568,3 +569,90 @@ func TestWatcher_Replay(t *testing.T) { writeTo.AssertContainsLines(t, segment2Lines...) }) } + +// slowWriteTo mimics the combination of a WriteTo and a slow remote write client. This will allow us to have a writer +// that moves faster than the WAL watcher, and therefore, test the draining procedure. +type slowWriteTo struct { + entriesReceived atomic.Uint64 + sleepAfterAppendEntries time.Duration +} + +func (s *slowWriteTo) SeriesReset(segmentNum int) { +} + +func (s *slowWriteTo) StoreSeries(series []record.RefSeries, segmentNum int) { +} + +func (s *slowWriteTo) AppendEntries(entries wal.RefEntries, segmentNum int) error { + s.entriesReceived.Add(uint64(len(entries.Entries))) + time.Sleep(s.sleepAfterAppendEntries) + return nil +} + +func TestWatcher_StopAndDrainWAL(t *testing.T) { + labels := model.LabelSet{ + "app": "test", + } + + reg := prometheus.NewRegistry() + logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowDebug()) + dir := t.TempDir() + metrics := NewWatcherMetrics(reg) + // the slow write to will take one second on each AppendEntries operation + writeTo := &slowWriteTo{ + sleepAfterAppendEntries: time.Second, + } + // create new watcher, and defer stop + watcher := NewWatcher(dir, "test", metrics, writeTo, logger, DefaultWatchConfig, mockMarker{ + LastMarkedSegmentFunc: func() int { + // Ignore marker to read from last segment, which is none + return -1 + }, + }) + + // start watcher, and burn through WAL as we write to it + watcher.Start() + + wl, err := New(Config{ + Enabled: true, + Dir: dir, + }, logger, reg) + require.NoError(t, err) + defer wl.Close() + + ew := newEntryWriter() + + writeNLines := func(t *testing.T, n int) { + for i := 0; i < n; i++ { + // First, write to segment 0. This will be the last "marked" segment + err = ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: "test line", + }, + }, wl, logger) + require.NoError(t, err) + } + } + + // The test will write the WAL while the Watcher is running. First, 10 lines will be written to a segment, and the test + // will wait for the Watcher to have read 5 lines. After, a new segment will be cut, 10 other lines written, and the + // Watcher stopped with drain. The test will expect all 20 lines in total to have been received. + + writeNLines(t, 10) + + require.Eventually(t, func() bool { + return writeTo.entriesReceived.Load() >= 5 + }, time.Second*11, time.Millisecond*500, "expected the write to catch up to half of the first segment") + + _, err = wl.NextSegment() + require.NoError(t, err) + writeNLines(t, 10) + require.NoError(t, wl.Sync()) + + // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 + watcher.Stop(true) + + require.Equal(t, 20, int(writeTo.entriesReceived.Load()), "expected the watcher to drain the whole WAL") +} diff --git a/component/loki/write/write.go b/component/loki/write/write.go index efda7b597c6f..53a3b9821a08 100644 --- a/component/loki/write/write.go +++ b/component/loki/write/write.go @@ -81,7 +81,7 @@ type Component struct { receiver loki.LogsReceiver // remote write components - clientManger client.Client + clientManger *client.Manager walWriter *wal.Writer // sink is the place where log entries received by this component should be written to. If WAL @@ -111,6 +111,18 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { + defer func() { + // when exiting Run, proceed to shut down first the writer component, and then + // the client manager, with the WAL and remote-write client inside + if c.walWriter != nil { + c.walWriter.Stop() + } + if c.clientManger != nil { + // drain, since the component is shutting down. That means the agent is shutting down as well + c.clientManger.StopWithDrain(true) + } + }() + for { select { case <-ctx.Done(): @@ -140,6 +152,7 @@ func (c *Component) Update(args component.Arguments) error { c.walWriter.Stop() } if c.clientManger != nil { + // only drain on component shutdown c.clientManger.Stop() } From 8244c995bc8818ff2f02b93a0d469cbd28cc7dde Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Fri, 17 Nov 2023 12:34:32 -0300 Subject: [PATCH 02/13] refactoring watcher to use state --- component/common/loki/wal/watcher.go | 71 ++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 8 deletions(-) diff --git a/component/common/loki/wal/watcher.go b/component/common/loki/wal/watcher.go index 84cf88df0664..84a89ac211a3 100644 --- a/component/common/loki/wal/watcher.go +++ b/component/common/loki/wal/watcher.go @@ -7,6 +7,7 @@ import ( "math" "os" "strconv" + "sync" "time" "github.com/go-kit/log" @@ -72,6 +73,60 @@ type Marker interface { LastMarkedSegment() int } +// wState represents the possible states the Watcher can be in. +type wState int64 + +const ( + // stateRunning is the main functioning state of the watcher. It will keep tailing head segments, consuming closed + // ones, and checking for new ones. + stateRunning wState = iota + + stateDraining + + // stateStopping means the Watcher is being stopped. It should drop all segment read activity, and exit promptly. + stateStopping +) + +// watcherState is a holder for the state the Watcher is in. It provides handy methods for checking it it's stopping, getting +// the current state, or blocking until it has stopped. +type watcherState struct { + s wState + mut sync.RWMutex + quit chan struct{} +} + +func newWatcherState() *watcherState { + return &watcherState{ + s: stateRunning, + quit: make(chan struct{}), + } +} + +func (s *watcherState) Transition(ns wState) { + s.mut.Lock() + defer s.mut.Unlock() + s.s = ns + if ns == stateStopping { + close(s.quit) + } +} + +func (s *watcherState) Get() wState { + s.mut.RLock() + defer s.mut.RUnlock() + return s.s +} + +func (s *watcherState) IsStopping() bool { + s.mut.RLock() + defer s.mut.RUnlock() + return s.s == stateStopping +} + +func (s *watcherState) WaitForStopping() <-chan struct{} { + return s.quit +} + type Watcher struct { // id identifies the Watcher. Used when one Watcher is instantiated per remote write client, to be able to track to whom // the metric/log line corresponds. @@ -80,7 +135,7 @@ type Watcher struct { actions WriteTo readNotify chan struct{} done chan struct{} - quit chan struct{} + state *watcherState walDir string logger log.Logger MaxSegment int @@ -99,7 +154,7 @@ func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, log id: id, actions: writeTo, readNotify: make(chan struct{}), - quit: make(chan struct{}), + state: newWatcherState(), done: make(chan struct{}), MaxSegment: -1, marker: marker, @@ -121,7 +176,7 @@ func (w *Watcher) Start() { // retries. func (w *Watcher) mainLoop() { defer close(w.done) - for !isClosed(w.quit) { + for !w.state.IsStopping() { if w.marker != nil { w.savedSegment = w.marker.LastMarkedSegment() level.Debug(w.logger).Log("msg", "last saved segment", "segment", w.savedSegment) @@ -132,7 +187,7 @@ func (w *Watcher) mainLoop() { } select { - case <-w.quit: + case <-w.state.WaitForStopping(): return case <-time.After(5 * time.Second): } @@ -160,7 +215,7 @@ func (w *Watcher) run() error { } level.Debug(w.logger).Log("msg", "Tailing WAL", "currentSegment", currentSegment, "lastSegment", lastSegment) - for !isClosed(w.quit) { + for !w.state.IsStopping() { w.metrics.currentSegment.WithLabelValues(w.id).Set(float64(currentSegment)) level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment) @@ -215,7 +270,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { for { select { - case <-w.quit: + case <-w.state.WaitForStopping(): return nil case <-segmentTicker.C: @@ -293,7 +348,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { func (w *Watcher) readSegment(r *wlog.LiveReader, segmentNum int) (bool, error) { var readData bool - for r.Next() && !isClosed(w.quit) { + for r.Next() && !w.state.IsStopping() { rec := r.Record() w.metrics.recordsRead.WithLabelValues(w.id).Inc() read, err := w.decodeAndDispatch(rec, segmentNum) @@ -337,7 +392,7 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) { // Note if drain is enabled, the caller routine of Stop will block executing the drain procedure. func (w *Watcher) Stop(drain bool) { // first close the quit channel to order main mainLoop routine to stop - close(w.quit) + w.state.Transition(stateStopping) // upon calling stop, wait for main mainLoop execution to stop <-w.done From 7180974ead3b2cbaaa5046dd370e1e2dda21d38f Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Fri, 17 Nov 2023 12:54:09 -0300 Subject: [PATCH 03/13] drain working --- component/common/loki/wal/config.go | 4 +++ component/common/loki/wal/watcher.go | 43 +++++++++++++++++++---- component/common/loki/wal/watcher_test.go | 22 ++++++++++-- 3 files changed, 60 insertions(+), 9 deletions(-) diff --git a/component/common/loki/wal/config.go b/component/common/loki/wal/config.go index c0d6c7ae2752..d25e2426923f 100644 --- a/component/common/loki/wal/config.go +++ b/component/common/loki/wal/config.go @@ -12,6 +12,7 @@ const ( var DefaultWatchConfig = WatchConfig{ MinReadFrequency: time.Millisecond * 250, MaxReadFrequency: time.Second, + DrainTimeout: time.Second * 30, } // Config contains all WAL-related settings. @@ -49,6 +50,9 @@ type WatchConfig struct { // MaxReadFrequency controls the maximum read frequency the Watcher polls the WAL for new records. As mentioned above // it caps the polling frequency to a maximum, to prevent to exponential backoff from making it too high. MaxReadFrequency time.Duration + + // DrainTimeout is the maximum amount of time that the Watcher can spend draining the remaining segments in the WAL. + DrainTimeout time.Duration } // UnmarshalYAML implement YAML Unmarshaler diff --git a/component/common/loki/wal/watcher.go b/component/common/loki/wal/watcher.go index 84a89ac211a3..07a2afd46781 100644 --- a/component/common/loki/wal/watcher.go +++ b/component/common/loki/wal/watcher.go @@ -90,21 +90,39 @@ const ( // watcherState is a holder for the state the Watcher is in. It provides handy methods for checking it it's stopping, getting // the current state, or blocking until it has stopped. type watcherState struct { - s wState - mut sync.RWMutex - quit chan struct{} + s wState + mut sync.RWMutex + quit chan struct{} + logger log.Logger } -func newWatcherState() *watcherState { +func newWatcherState(l log.Logger) *watcherState { return &watcherState{ - s: stateRunning, - quit: make(chan struct{}), + s: stateRunning, + quit: make(chan struct{}), + logger: l, + } +} + +func printState(s wState) string { + switch s { + case stateRunning: + return "running" + case stateDraining: + return "draining" + case stateStopping: + return "stopping" + default: + return "unknown" } } func (s *watcherState) Transition(ns wState) { s.mut.Lock() defer s.mut.Unlock() + + level.Debug(s.logger).Log("msg", "Watcher transitioning state", "currentState", printState(s.s), "nextState", printState(ns)) + s.s = ns if ns == stateStopping { close(s.quit) @@ -143,6 +161,7 @@ type Watcher struct { metrics *WatcherMetrics minReadFreq time.Duration maxReadFreq time.Duration + drainTimeout time.Duration marker Marker savedSegment int } @@ -154,7 +173,7 @@ func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, log id: id, actions: writeTo, readNotify: make(chan struct{}), - state: newWatcherState(), + state: newWatcherState(logger), done: make(chan struct{}), MaxSegment: -1, marker: marker, @@ -163,6 +182,7 @@ func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, log metrics: metrics, minReadFreq: config.MinReadFrequency, maxReadFreq: config.MaxReadFrequency, + drainTimeout: config.DrainTimeout, } } @@ -186,6 +206,10 @@ func (w *Watcher) mainLoop() { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } + if w.state.Get() == stateDraining { + level.Warn(w.logger).Log("msg", "exited from run with error while draining") + } + select { case <-w.state.WaitForStopping(): return @@ -391,6 +415,11 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) { // // Note if drain is enabled, the caller routine of Stop will block executing the drain procedure. func (w *Watcher) Stop(drain bool) { + if drain { + w.state.Transition(stateDraining) + // wait for 10 seconds for the watcher to drain + <-time.NewTimer(w.drainTimeout).C + } // first close the quit channel to order main mainLoop routine to stop w.state.Transition(stateStopping) // upon calling stop, wait for main mainLoop execution to stop diff --git a/component/common/loki/wal/watcher_test.go b/component/common/loki/wal/watcher_test.go index e256ec49e9bc..db7a53d47eca 100644 --- a/component/common/loki/wal/watcher_test.go +++ b/component/common/loki/wal/watcher_test.go @@ -4,6 +4,7 @@ import ( "fmt" "go.uber.org/atomic" "os" + "strings" "testing" "time" @@ -573,6 +574,7 @@ func TestWatcher_Replay(t *testing.T) { // slowWriteTo mimics the combination of a WriteTo and a slow remote write client. This will allow us to have a writer // that moves faster than the WAL watcher, and therefore, test the draining procedure. type slowWriteTo struct { + t *testing.T entriesReceived atomic.Uint64 sleepAfterAppendEntries time.Duration } @@ -584,6 +586,12 @@ func (s *slowWriteTo) StoreSeries(series []record.RefSeries, segmentNum int) { } func (s *slowWriteTo) AppendEntries(entries wal.RefEntries, segmentNum int) error { + var allLines strings.Builder + for _, e := range entries.Entries { + allLines.WriteString(e.Line) + allLines.WriteString("/") + } + s.t.Logf("AppendEntries called from segment %d - %s", segmentNum, allLines.String()) s.entriesReceived.Add(uint64(len(entries.Entries))) time.Sleep(s.sleepAfterAppendEntries) return nil @@ -598,11 +606,18 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) { logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowDebug()) dir := t.TempDir() metrics := NewWatcherMetrics(reg) + // the slow write to will take one second on each AppendEntries operation writeTo := &slowWriteTo{ + t: t, sleepAfterAppendEntries: time.Second, } - // create new watcher, and defer stop + + cfg := DefaultWatchConfig + // since the watcher would have consumed already 5 log entries, and it needs to consume a remaining of 15, with a WriteTo + // taking 1 second to respond to each AppendEntries call, this test will use the smallest timeout possible for the Watcher + // to fully drain the WAL. + cfg.DrainTimeout = time.Second * 16 watcher := NewWatcher(dir, "test", metrics, writeTo, logger, DefaultWatchConfig, mockMarker{ LastMarkedSegmentFunc: func() int { // Ignore marker to read from last segment, which is none @@ -622,6 +637,8 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) { ew := newEntryWriter() + // helper to add context to each written line + var lineCounter atomic.Int64 writeNLines := func(t *testing.T, n int) { for i := 0; i < n; i++ { // First, write to segment 0. This will be the last "marked" segment @@ -629,9 +646,10 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) { Labels: labels, Entry: logproto.Entry{ Timestamp: time.Now(), - Line: "test line", + Line: fmt.Sprintf("test line %d", lineCounter.Load()), }, }, wl, logger) + lineCounter.Add(1) require.NoError(t, err) } } From 8e4bb05c77f2499df3cd049846a782e8b6cb1ac6 Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Fri, 17 Nov 2023 13:08:13 -0300 Subject: [PATCH 04/13] fine-tuned test case --- component/common/loki/wal/watcher_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/component/common/loki/wal/watcher_test.go b/component/common/loki/wal/watcher_test.go index db7a53d47eca..fffe7ded4d02 100644 --- a/component/common/loki/wal/watcher_test.go +++ b/component/common/loki/wal/watcher_test.go @@ -618,7 +618,7 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) { // taking 1 second to respond to each AppendEntries call, this test will use the smallest timeout possible for the Watcher // to fully drain the WAL. cfg.DrainTimeout = time.Second * 16 - watcher := NewWatcher(dir, "test", metrics, writeTo, logger, DefaultWatchConfig, mockMarker{ + watcher := NewWatcher(dir, "test", metrics, writeTo, logger, cfg, mockMarker{ LastMarkedSegmentFunc: func() int { // Ignore marker to read from last segment, which is none return -1 @@ -670,7 +670,11 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) { require.NoError(t, wl.Sync()) // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 + now := time.Now() watcher.Stop(true) + // expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout + // has one extra second. + require.InDelta(t, time.Second*15, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s") require.Equal(t, 20, int(writeTo.entriesReceived.Load()), "expected the watcher to drain the whole WAL") } From 185c72a58809295787020f13472196834cf09f7f Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Fri, 17 Nov 2023 13:32:58 -0300 Subject: [PATCH 05/13] added short timeout test --- component/common/loki/wal/watcher_test.go | 181 ++++++++++++++-------- 1 file changed, 120 insertions(+), 61 deletions(-) diff --git a/component/common/loki/wal/watcher_test.go b/component/common/loki/wal/watcher_test.go index fffe7ded4d02..aa29a7ccfa26 100644 --- a/component/common/loki/wal/watcher_test.go +++ b/component/common/loki/wal/watcher_test.go @@ -601,80 +601,139 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) { labels := model.LabelSet{ "app": "test", } - - reg := prometheus.NewRegistry() logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowDebug()) - dir := t.TempDir() - metrics := NewWatcherMetrics(reg) - // the slow write to will take one second on each AppendEntries operation - writeTo := &slowWriteTo{ - t: t, - sleepAfterAppendEntries: time.Second, + // newTestingResources is a helper for bootstrapping all required testing resources + newTestingResources := func(t *testing.T, cfg WatchConfig) (*slowWriteTo, *Watcher, WAL) { + reg := prometheus.NewRegistry() + dir := t.TempDir() + metrics := NewWatcherMetrics(reg) + + // the slow write to will take one second on each AppendEntries operation + writeTo := &slowWriteTo{ + t: t, + sleepAfterAppendEntries: time.Second, + } + + watcher := NewWatcher(dir, "test", metrics, writeTo, logger, cfg, mockMarker{ + LastMarkedSegmentFunc: func() int { + // Ignore marker to read from last segment, which is none + return -1 + }, + }) + + // start watcher, and burn through WAL as we write to it + watcher.Start() + + wl, err := New(Config{ + Enabled: true, + Dir: dir, + }, logger, reg) + require.NoError(t, err) + return writeTo, watcher, wl } - cfg := DefaultWatchConfig - // since the watcher would have consumed already 5 log entries, and it needs to consume a remaining of 15, with a WriteTo - // taking 1 second to respond to each AppendEntries call, this test will use the smallest timeout possible for the Watcher - // to fully drain the WAL. - cfg.DrainTimeout = time.Second * 16 - watcher := NewWatcher(dir, "test", metrics, writeTo, logger, cfg, mockMarker{ - LastMarkedSegmentFunc: func() int { - // Ignore marker to read from last segment, which is none - return -1 - }, - }) + t.Run("watcher drains WAL just in time", func(t *testing.T) { + cfg := DefaultWatchConfig + // considering the slow write to has a 1 second delay when Appending an entry, and before the draining begins, + // the watcher would have consumed only 5 entries, this timeout will give the Watcher just enough time to fully + // drain the WAL. + cfg.DrainTimeout = time.Second * 16 + writeTo, watcher, wl := newTestingResources(t, cfg) + defer wl.Close() - // start watcher, and burn through WAL as we write to it - watcher.Start() + ew := newEntryWriter() - wl, err := New(Config{ - Enabled: true, - Dir: dir, - }, logger, reg) - require.NoError(t, err) - defer wl.Close() + // helper to add context to each written line + var lineCounter atomic.Int64 + writeNLines := func(t *testing.T, n int) { + for i := 0; i < n; i++ { + // First, write to segment 0. This will be the last "marked" segment + err := ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("test line %d", lineCounter.Load()), + }, + }, wl, logger) + lineCounter.Add(1) + require.NoError(t, err) + } + } - ew := newEntryWriter() + // The test will write the WAL while the Watcher is running. First, 10 lines will be written to a segment, and the test + // will wait for the Watcher to have read 5 lines. After, a new segment will be cut, 10 other lines written, and the + // Watcher stopped with drain. The test will expect all 20 lines in total to have been received. - // helper to add context to each written line - var lineCounter atomic.Int64 - writeNLines := func(t *testing.T, n int) { - for i := 0; i < n; i++ { - // First, write to segment 0. This will be the last "marked" segment - err = ew.WriteEntry(loki.Entry{ - Labels: labels, - Entry: logproto.Entry{ - Timestamp: time.Now(), - Line: fmt.Sprintf("test line %d", lineCounter.Load()), - }, - }, wl, logger) - lineCounter.Add(1) - require.NoError(t, err) + writeNLines(t, 10) + + require.Eventually(t, func() bool { + return writeTo.entriesReceived.Load() >= 5 + }, time.Second*11, time.Millisecond*500, "expected the write to catch up to half of the first segment") + + _, err := wl.NextSegment() + require.NoError(t, err) + writeNLines(t, 10) + require.NoError(t, wl.Sync()) + + // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 + now := time.Now() + watcher.Stop(true) + + // expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout + // has one extra second. + require.InDelta(t, time.Second*15, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s") + require.Equal(t, int(writeTo.entriesReceived.Load()), 20, "expected the watcher to fully drain the WAL") + }) + + t.Run("watcher drain timeout too short, should exit promptly", func(t *testing.T) { + cfg := DefaultWatchConfig + // having a 10 seconds timeout should give the watcher enough time to only consume ~10 entries, and be missing ~5 + // from the last segment + cfg.DrainTimeout = time.Second * 10 + writeTo, watcher, wl := newTestingResources(t, cfg) + defer wl.Close() + + ew := newEntryWriter() + + // helper to add context to each written line + var lineCounter atomic.Int64 + writeNLines := func(t *testing.T, n int) { + for i := 0; i < n; i++ { + // First, write to segment 0. This will be the last "marked" segment + err := ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("test line %d", lineCounter.Load()), + }, + }, wl, logger) + lineCounter.Add(1) + require.NoError(t, err) + } } - } - // The test will write the WAL while the Watcher is running. First, 10 lines will be written to a segment, and the test - // will wait for the Watcher to have read 5 lines. After, a new segment will be cut, 10 other lines written, and the - // Watcher stopped with drain. The test will expect all 20 lines in total to have been received. + // The test will write the WAL while the Watcher is running. First, 10 lines will be written to a segment, and the test + // will wait for the Watcher to have read 5 lines. After, a new segment will be cut, 10 other lines written, and the + // Watcher stopped with drain. The test will expect all 20 lines in total to have been received. - writeNLines(t, 10) + writeNLines(t, 10) - require.Eventually(t, func() bool { - return writeTo.entriesReceived.Load() >= 5 - }, time.Second*11, time.Millisecond*500, "expected the write to catch up to half of the first segment") + require.Eventually(t, func() bool { + return writeTo.entriesReceived.Load() >= 5 + }, time.Second*11, time.Millisecond*500, "expected the write to catch up to half of the first segment") - _, err = wl.NextSegment() - require.NoError(t, err) - writeNLines(t, 10) - require.NoError(t, wl.Sync()) + _, err := wl.NextSegment() + require.NoError(t, err) + writeNLines(t, 10) + require.NoError(t, wl.Sync()) - // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 - now := time.Now() - watcher.Stop(true) + // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 + now := time.Now() + watcher.Stop(true) - // expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout - // has one extra second. - require.InDelta(t, time.Second*15, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s") - require.Equal(t, 20, int(writeTo.entriesReceived.Load()), "expected the watcher to drain the whole WAL") + require.InDelta(t, time.Second*10, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s") + require.Less(t, int(writeTo.entriesReceived.Load()), 20, "expected watcher to have not consumed WAL fully") + require.InDelta(t, 15, int(writeTo.entriesReceived.Load()), 1.0, "expected Watcher to consume at most +/- 1 entry from the WAL") + }) } From 19dffffc25d7950eee876d7d59a6cef1ec0319d5 Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Fri, 17 Nov 2023 16:41:53 -0300 Subject: [PATCH 06/13] prompt exit test --- component/common/loki/wal/watcher_test.go | 51 +++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/component/common/loki/wal/watcher_test.go b/component/common/loki/wal/watcher_test.go index aa29a7ccfa26..7d36f6942333 100644 --- a/component/common/loki/wal/watcher_test.go +++ b/component/common/loki/wal/watcher_test.go @@ -686,6 +686,57 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) { require.Equal(t, int(writeTo.entriesReceived.Load()), 20, "expected the watcher to fully drain the WAL") }) + t.Run("watcher should exit promptly after draining completely", func(t *testing.T) { + cfg := DefaultWatchConfig + // the drain timeout will be too long, for the amount of data remaining in the WAL (~15 entries more) + cfg.DrainTimeout = time.Second * 30 + writeTo, watcher, wl := newTestingResources(t, cfg) + defer wl.Close() + + ew := newEntryWriter() + + // helper to add context to each written line + var lineCounter atomic.Int64 + writeNLines := func(t *testing.T, n int) { + for i := 0; i < n; i++ { + // First, write to segment 0. This will be the last "marked" segment + err := ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("test line %d", lineCounter.Load()), + }, + }, wl, logger) + lineCounter.Add(1) + require.NoError(t, err) + } + } + + // The test will write the WAL while the Watcher is running. First, 10 lines will be written to a segment, and the test + // will wait for the Watcher to have read 5 lines. After, a new segment will be cut, 10 other lines written, and the + // Watcher stopped with drain. The test will expect all 20 lines in total to have been received. + + writeNLines(t, 10) + + require.Eventually(t, func() bool { + return writeTo.entriesReceived.Load() >= 5 + }, time.Second*11, time.Millisecond*500, "expected the write to catch up to half of the first segment") + + _, err := wl.NextSegment() + require.NoError(t, err) + writeNLines(t, 10) + require.NoError(t, wl.Sync()) + + // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 + now := time.Now() + watcher.Stop(true) + + // expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout + // has one extra second. + require.InDelta(t, time.Second*15, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s") + require.Equal(t, int(writeTo.entriesReceived.Load()), 20, "expected the watcher to fully drain the WAL") + }) + t.Run("watcher drain timeout too short, should exit promptly", func(t *testing.T) { cfg := DefaultWatchConfig // having a 10 seconds timeout should give the watcher enough time to only consume ~10 entries, and be missing ~5 From da31c8a53c1f844957e436244fe1ec02f696e74a Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Fri, 17 Nov 2023 16:42:06 -0300 Subject: [PATCH 07/13] prompt exit passing --- component/common/loki/wal/watcher.go | 77 ++++++++++++++++++---------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/component/common/loki/wal/watcher.go b/component/common/loki/wal/watcher.go index 07a2afd46781..c4e4e6135e48 100644 --- a/component/common/loki/wal/watcher.go +++ b/component/common/loki/wal/watcher.go @@ -90,17 +90,17 @@ const ( // watcherState is a holder for the state the Watcher is in. It provides handy methods for checking it it's stopping, getting // the current state, or blocking until it has stopped. type watcherState struct { - s wState - mut sync.RWMutex - quit chan struct{} - logger log.Logger + current wState + mut sync.RWMutex + stoppingSignal chan struct{} + logger log.Logger } func newWatcherState(l log.Logger) *watcherState { return &watcherState{ - s: stateRunning, - quit: make(chan struct{}), - logger: l, + current: stateRunning, + stoppingSignal: make(chan struct{}), + logger: l, } } @@ -117,32 +117,36 @@ func printState(s wState) string { } } -func (s *watcherState) Transition(ns wState) { +func (s *watcherState) Transition(next wState) { s.mut.Lock() defer s.mut.Unlock() - level.Debug(s.logger).Log("msg", "Watcher transitioning state", "currentState", printState(s.s), "nextState", printState(ns)) + level.Debug(s.logger).Log("msg", "Watcher transitioning state", "currentState", printState(s.current), "nextState", printState(next)) - s.s = ns - if ns == stateStopping { - close(s.quit) + // only perform channel close if the state is not already stopping + // expect s.s to be either draining ro running to perform a close + if next == stateStopping && s.current != next { + close(s.stoppingSignal) } + + // update state + s.current = next } func (s *watcherState) Get() wState { s.mut.RLock() defer s.mut.RUnlock() - return s.s + return s.current } func (s *watcherState) IsStopping() bool { s.mut.RLock() defer s.mut.RUnlock() - return s.s == stateStopping + return s.current == stateStopping } func (s *watcherState) WaitForStopping() <-chan struct{} { - return s.quit + return s.stoppingSignal } type Watcher struct { @@ -202,12 +206,16 @@ func (w *Watcher) mainLoop() { level.Debug(w.logger).Log("msg", "last saved segment", "segment", w.savedSegment) } - if err := w.run(); err != nil { + err := w.run() + if err != nil { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } - if w.state.Get() == stateDraining { - level.Warn(w.logger).Log("msg", "exited from run with error while draining") + if w.state.Get() == stateDraining && errors.Is(err, os.ErrNotExist) { + level.Info(w.logger).Log("msg", "Reached non existing segment while draining, assuming end of WAL") + // since we've reached the end of the WAL, and the Watcher is draining, promptly transition to stopping state + // so the watcher can stoppingSignal early + w.state.Transition(stateStopping) } select { @@ -241,7 +249,6 @@ func (w *Watcher) run() error { level.Debug(w.logger).Log("msg", "Tailing WAL", "currentSegment", currentSegment, "lastSegment", lastSegment) for !w.state.IsStopping() { w.metrics.currentSegment.WithLabelValues(w.id).Set(float64(currentSegment)) - level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment) // On start, we have a pointer to what is the latest segment. On subsequent calls to this function, // currentSegment will have been incremented, and we should open that segment. @@ -266,6 +273,8 @@ func (w *Watcher) run() error { // If tail is false, we know the segment we are "watching" over is closed (no further write will occur to it). Then, the // segment is read fully, any errors are logged as Warnings, and no error is returned. func (w *Watcher) watch(segmentNum int, tail bool) error { + level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", segmentNum, "tail", tail) + segment, err := wlog.OpenReadSegment(wlog.SegmentName(w.walDir, segmentNum)) if err != nil { return err @@ -303,24 +312,30 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { return fmt.Errorf("segments: %w", err) } - // Check if new segments exists. - if last <= segmentNum { + // Check if new segments exists, or we are draining the WAL, which means that either: + // - This is the last segment, and we can consume it fully + // - There's some other segment, and we can consume this segment fully as well + if last <= segmentNum && w.state.Get() != stateDraining { continue } - // Since we know last > segmentNum, there must be a new segment. Read the remaining from the segmentNum segment - // and return from `watch` to read the next one + if w.state.Get() == stateDraining { + level.Debug(w.logger).Log("msg", "Draining segment completely", "segment", segmentNum, "lastSegment", last) + } + + // We now that there's either a new segment (last > segmentNum), or we are draining the WAL. Either case, read + // the remaining from the segmentNum segment and return from `watch` to read the next one _, err = w.readSegment(reader, segmentNum) if debug { level.Warn(w.logger).Log("msg", "Error reading segment inside segmentTicker", "segment", segmentNum, "read", reader.Offset(), "err", err) } - // io.EOF error are non-fatal since we are tailing the wal + // io.EOF error are non-fatal since we consuming the segment till the end if errors.Unwrap(err) != io.EOF { return err } - // return after reading the whole segment for creating a new LiveReader from the newly created segment + // return after reading the whole segment return nil // the cases below will unlock the select block, and execute the block below @@ -416,12 +431,18 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) { // Note if drain is enabled, the caller routine of Stop will block executing the drain procedure. func (w *Watcher) Stop(drain bool) { if drain { + level.Info(w.logger).Log("msg", "Draining Watcher") w.state.Transition(stateDraining) - // wait for 10 seconds for the watcher to drain - <-time.NewTimer(w.drainTimeout).C + // wait for drain timeout, or stopping state, in case the Watcher does the transition itself promptly + select { + case <-time.NewTimer(w.drainTimeout).C: + level.Warn(w.logger).Log("msg", "Watcher drain timeout occurred, transitioning to Stopping") + case <-w.state.WaitForStopping(): + } } - // first close the quit channel to order main mainLoop routine to stop + w.state.Transition(stateStopping) + // upon calling stop, wait for main mainLoop execution to stop <-w.done From dd3f4d80f93ef6da23503182ab544be4408264b5 Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Fri, 17 Nov 2023 17:48:13 -0300 Subject: [PATCH 08/13] refactoring watcher --- component/common/loki/client/manager.go | 12 +- component/common/loki/wal/config.go | 1 + .../common/loki/wal/internal/watcher_state.go | 88 +++++++++++++++ component/common/loki/wal/watcher.go | 104 ++---------------- 4 files changed, 106 insertions(+), 99 deletions(-) create mode 100644 component/common/loki/wal/internal/watcher_state.go diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index a239e9a90091..3e3d8ff78fd1 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -35,7 +35,7 @@ func (n nilNotifier) SubscribeCleanup(_ wal.CleanupEventSubscriber) {} func (n nilNotifier) SubscribeWrite(_ wal.WriteEventSubscriber) {} -type Drainable interface { +type Stoppable interface { Stop(drain bool) } @@ -53,7 +53,7 @@ type StoppableClient interface { type Manager struct { name string clients []Client - walWatchers []Drainable + walWatchers []Stoppable // stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface stoppableClients []StoppableClient @@ -78,7 +78,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr clientsCheck := make(map[string]struct{}) clients := make([]Client, 0, len(clientCfgs)) - watchers := make([]Drainable, 0, len(clientCfgs)) + watchers := make([]Stoppable, 0, len(clientCfgs)) stoppableClients := make([]StoppableClient, 0, len(clientCfgs)) for _, cfg := range clientCfgs { // Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name). @@ -187,15 +187,19 @@ func (m *Manager) Chan() chan<- loki.Entry { return m.entries } +// Stop the manager, not draining the Write-Ahead Log, if that mode is enabled. func (m *Manager) Stop() { m.StopWithDrain(false) } +// StopWithDrain will stop the manager, its Write-Ahead Log watchers, and clients accordingly. If drain is enabled, +// the Watchers will attempt to drain the WAL completely. func (m *Manager) StopWithDrain(drain bool) { // first stop the receiving channel m.once.Do(func() { close(m.entries) }) m.wg.Wait() - // close wal watchers + + // stop wal watchers for _, walWatcher := range m.walWatchers { walWatcher.Stop(drain) } diff --git a/component/common/loki/wal/config.go b/component/common/loki/wal/config.go index d25e2426923f..160bca2b90a9 100644 --- a/component/common/loki/wal/config.go +++ b/component/common/loki/wal/config.go @@ -52,6 +52,7 @@ type WatchConfig struct { MaxReadFrequency time.Duration // DrainTimeout is the maximum amount of time that the Watcher can spend draining the remaining segments in the WAL. + // After that time, the Watcher is stopped immediately, dropping all the work in process. DrainTimeout time.Duration } diff --git a/component/common/loki/wal/internal/watcher_state.go b/component/common/loki/wal/internal/watcher_state.go new file mode 100644 index 000000000000..c81413dfd230 --- /dev/null +++ b/component/common/loki/wal/internal/watcher_state.go @@ -0,0 +1,88 @@ +package internal + +import ( + "sync" + + "github.com/go-kit/log" + "github.com/grafana/agent/pkg/flow/logging/level" +) + +const ( + // StateRunning is the main functioning state of the watcher. It will keep tailing head segments, consuming closed + // ones, and checking for new ones. + StateRunning = iota + + // StateDraining is an intermediary state between running and stopping. The watcher will attempt to consume all the data + // found in the WAL, omitting errors and assuming all segments found are "closed", that is, no longer being written. + StateDraining + + // StateStopping means the Watcher is being stopped. It should drop all segment read activity, and exit promptly. + StateStopping +) + +// WatcherState is a holder for the state the Watcher is in. It provides handy methods for checking it it's stopping, getting +// the current state, or blocking until it has stopped. +type WatcherState struct { + current int + mut sync.RWMutex + stoppingSignal chan struct{} + logger log.Logger +} + +func NewWatcherState(l log.Logger) *WatcherState { + return &WatcherState{ + current: StateRunning, + stoppingSignal: make(chan struct{}), + logger: l, + } +} + +// Transition changes the state of WatcherState to next, reacting accordingly. +func (s *WatcherState) Transition(next int) { + s.mut.Lock() + defer s.mut.Unlock() + + level.Debug(s.logger).Log("msg", "watcher transitioning state", "currentState", printState(s.current), "nextState", printState(next)) + + // only perform channel close if the state is not already stopping + // expect s.s to be either draining ro running to perform a close + if next == StateStopping && s.current != next { + close(s.stoppingSignal) + } + + // update state + s.current = next +} + +// IsDraining evaluates to true if the current state is StateDraining. +func (s *WatcherState) IsDraining() bool { + s.mut.RLock() + defer s.mut.RUnlock() + return s.current == StateDraining +} + +// IsStopping evaluates to true if the current state is StateStopping. +func (s *WatcherState) IsStopping() bool { + s.mut.RLock() + defer s.mut.RUnlock() + return s.current == StateStopping +} + +// WaitForStopping returns a channel in which the called can read, effectively waiting until the state changes to stopping. +func (s *WatcherState) WaitForStopping() <-chan struct{} { + return s.stoppingSignal +} + +// printState prints a user-friendly name of the possible Watcher states. +func printState(state int) string { + switch state { + case StateRunning: + return "running" + case StateDraining: + return "draining" + case StateStopping: + return "stopping" + default: + return "unknown" + } +} diff --git a/component/common/loki/wal/watcher.go b/component/common/loki/wal/watcher.go index c4e4e6135e48..856d185d2327 100644 --- a/component/common/loki/wal/watcher.go +++ b/component/common/loki/wal/watcher.go @@ -3,11 +3,11 @@ package wal import ( "errors" "fmt" + "github.com/grafana/agent/component/common/loki/wal/internal" "io" "math" "os" "strconv" - "sync" "time" "github.com/go-kit/log" @@ -73,82 +73,6 @@ type Marker interface { LastMarkedSegment() int } -// wState represents the possible states the Watcher can be in. -type wState int64 - -const ( - // stateRunning is the main functioning state of the watcher. It will keep tailing head segments, consuming closed - // ones, and checking for new ones. - stateRunning wState = iota - - stateDraining - - // stateStopping means the Watcher is being stopped. It should drop all segment read activity, and exit promptly. - stateStopping -) - -// watcherState is a holder for the state the Watcher is in. It provides handy methods for checking it it's stopping, getting -// the current state, or blocking until it has stopped. -type watcherState struct { - current wState - mut sync.RWMutex - stoppingSignal chan struct{} - logger log.Logger -} - -func newWatcherState(l log.Logger) *watcherState { - return &watcherState{ - current: stateRunning, - stoppingSignal: make(chan struct{}), - logger: l, - } -} - -func printState(s wState) string { - switch s { - case stateRunning: - return "running" - case stateDraining: - return "draining" - case stateStopping: - return "stopping" - default: - return "unknown" - } -} - -func (s *watcherState) Transition(next wState) { - s.mut.Lock() - defer s.mut.Unlock() - - level.Debug(s.logger).Log("msg", "Watcher transitioning state", "currentState", printState(s.current), "nextState", printState(next)) - - // only perform channel close if the state is not already stopping - // expect s.s to be either draining ro running to perform a close - if next == stateStopping && s.current != next { - close(s.stoppingSignal) - } - - // update state - s.current = next -} - -func (s *watcherState) Get() wState { - s.mut.RLock() - defer s.mut.RUnlock() - return s.current -} - -func (s *watcherState) IsStopping() bool { - s.mut.RLock() - defer s.mut.RUnlock() - return s.current == stateStopping -} - -func (s *watcherState) WaitForStopping() <-chan struct{} { - return s.stoppingSignal -} - type Watcher struct { // id identifies the Watcher. Used when one Watcher is instantiated per remote write client, to be able to track to whom // the metric/log line corresponds. @@ -157,7 +81,7 @@ type Watcher struct { actions WriteTo readNotify chan struct{} done chan struct{} - state *watcherState + state *internal.WatcherState walDir string logger log.Logger MaxSegment int @@ -177,7 +101,7 @@ func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, log id: id, actions: writeTo, readNotify: make(chan struct{}), - state: newWatcherState(logger), + state: internal.NewWatcherState(logger), done: make(chan struct{}), MaxSegment: -1, marker: marker, @@ -211,11 +135,11 @@ func (w *Watcher) mainLoop() { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } - if w.state.Get() == stateDraining && errors.Is(err, os.ErrNotExist) { + if w.state.IsDraining() && errors.Is(err, os.ErrNotExist) { level.Info(w.logger).Log("msg", "Reached non existing segment while draining, assuming end of WAL") // since we've reached the end of the WAL, and the Watcher is draining, promptly transition to stopping state // so the watcher can stoppingSignal early - w.state.Transition(stateStopping) + w.state.Transition(internal.StateStopping) } select { @@ -315,11 +239,11 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { // Check if new segments exists, or we are draining the WAL, which means that either: // - This is the last segment, and we can consume it fully // - There's some other segment, and we can consume this segment fully as well - if last <= segmentNum && w.state.Get() != stateDraining { + if last <= segmentNum && !w.state.IsDraining() { continue } - if w.state.Get() == stateDraining { + if w.state.IsDraining() { level.Debug(w.logger).Log("msg", "Draining segment completely", "segment", segmentNum, "lastSegment", last) } @@ -432,7 +356,7 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) { func (w *Watcher) Stop(drain bool) { if drain { level.Info(w.logger).Log("msg", "Draining Watcher") - w.state.Transition(stateDraining) + w.state.Transition(internal.StateDraining) // wait for drain timeout, or stopping state, in case the Watcher does the transition itself promptly select { case <-time.NewTimer(w.drainTimeout).C: @@ -441,7 +365,7 @@ func (w *Watcher) Stop(drain bool) { } } - w.state.Transition(stateStopping) + w.state.Transition(internal.StateStopping) // upon calling stop, wait for main mainLoop execution to stop <-w.done @@ -506,16 +430,6 @@ func (w *Watcher) findNextSegmentFor(index int) (int, error) { return -1, errors.New("failed to find segment for index") } -// isClosed checks in a non-blocking manner if a channel is closed or not. -func isClosed(c chan struct{}) bool { - select { - case <-c: - return true - default: - return false - } -} - // readSegmentNumbers reads the given directory and returns all segment identifiers, that is, the index of each segment // file. func readSegmentNumbers(dir string) ([]int, error) { From 7d2e197c3e5fe5c36694e074a021b2ca3e1f1198 Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Fri, 17 Nov 2023 17:57:18 -0300 Subject: [PATCH 09/13] some comments --- component/common/loki/client/manager.go | 2 ++ component/common/loki/wal/watcher.go | 14 +++++++------- component/common/loki/wal/watcher_test.go | 16 ++++++++++------ 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index 3e3d8ff78fd1..d8caad75d120 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -194,6 +194,8 @@ func (m *Manager) Stop() { // StopWithDrain will stop the manager, its Write-Ahead Log watchers, and clients accordingly. If drain is enabled, // the Watchers will attempt to drain the WAL completely. +// The shutdown procedure first stops the Watchers, allowing them to flush as much data into the clients as possible. Then +// the clients are shut down accordingly. func (m *Manager) StopWithDrain(drain bool) { // first stop the receiving channel m.once.Do(func() { close(m.entries) }) diff --git a/component/common/loki/wal/watcher.go b/component/common/loki/wal/watcher.go index 856d185d2327..a92a62631f7e 100644 --- a/component/common/loki/wal/watcher.go +++ b/component/common/loki/wal/watcher.go @@ -3,7 +3,6 @@ package wal import ( "errors" "fmt" - "github.com/grafana/agent/component/common/loki/wal/internal" "io" "math" "os" @@ -11,6 +10,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/agent/component/common/loki/wal/internal" "github.com/grafana/agent/pkg/flow/logging/level" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/wlog" @@ -197,7 +197,7 @@ func (w *Watcher) run() error { // If tail is false, we know the segment we are "watching" over is closed (no further write will occur to it). Then, the // segment is read fully, any errors are logged as Warnings, and no error is returned. func (w *Watcher) watch(segmentNum int, tail bool) error { - level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", segmentNum, "tail", tail) + level.Debug(w.logger).Log("msg", "Watching WAL segment", "currentSegment", segmentNum, "tail", tail) segment, err := wlog.OpenReadSegment(wlog.SegmentName(w.walDir, segmentNum)) if err != nil { @@ -237,8 +237,8 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { } // Check if new segments exists, or we are draining the WAL, which means that either: - // - This is the last segment, and we can consume it fully - // - There's some other segment, and we can consume this segment fully as well + // - This is the last segment, and we can consume it fully because we are draining the WAL + // - There's a segment after the current one, and we can consume this segment fully as well if last <= segmentNum && !w.state.IsDraining() { continue } @@ -248,13 +248,13 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { } // We now that there's either a new segment (last > segmentNum), or we are draining the WAL. Either case, read - // the remaining from the segmentNum segment and return from `watch` to read the next one + // the remaining data from the segmentNum and return from `watch` to read the next one. _, err = w.readSegment(reader, segmentNum) if debug { level.Warn(w.logger).Log("msg", "Error reading segment inside segmentTicker", "segment", segmentNum, "read", reader.Offset(), "err", err) } - // io.EOF error are non-fatal since we consuming the segment till the end + // io.EOF error are non-fatal since we are consuming the segment till the end if errors.Unwrap(err) != io.EOF { return err } @@ -350,7 +350,7 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) { } // Stop stops the Watcher, draining the WAL until the end of the last segment if drain is true. Since the writer of the WAL -// is expected to have stopped before the Watcher, the last segment will be drained completely before the end of Stop. +// is expected to have stopped before the Watcher, no further writes are expected, so segments can be safely consumed. // // Note if drain is enabled, the caller routine of Stop will block executing the drain procedure. func (w *Watcher) Stop(drain bool) { diff --git a/component/common/loki/wal/watcher_test.go b/component/common/loki/wal/watcher_test.go index 7d36f6942333..dbdef8a5d3bb 100644 --- a/component/common/loki/wal/watcher_test.go +++ b/component/common/loki/wal/watcher_test.go @@ -2,7 +2,6 @@ package wal import ( "fmt" - "go.uber.org/atomic" "os" "strings" "testing" @@ -14,6 +13,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/tsdb/record" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/grafana/agent/component/common/loki" "github.com/grafana/agent/component/common/loki/utils" @@ -586,12 +586,16 @@ func (s *slowWriteTo) StoreSeries(series []record.RefSeries, segmentNum int) { } func (s *slowWriteTo) AppendEntries(entries wal.RefEntries, segmentNum int) error { - var allLines strings.Builder - for _, e := range entries.Entries { - allLines.WriteString(e.Line) - allLines.WriteString("/") + // only log on development debug flag + if debug { + var allLines strings.Builder + for _, e := range entries.Entries { + allLines.WriteString(e.Line) + allLines.WriteString("/") + } + s.t.Logf("AppendEntries called from segment %d - %s", segmentNum, allLines.String()) } - s.t.Logf("AppendEntries called from segment %d - %s", segmentNum, allLines.String()) + s.entriesReceived.Add(uint64(len(entries.Entries))) time.Sleep(s.sleepAfterAppendEntries) return nil From fcf3ca87331a472bf91d710c4c1112d6986fcd6e Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Fri, 17 Nov 2023 18:02:10 -0300 Subject: [PATCH 10/13] map river configs --- CHANGELOG.md | 2 ++ component/loki/write/write.go | 3 +++ component/loki/write/write_test.go | 4 ++++ docs/sources/flow/reference/components/loki.write.md | 1 + 4 files changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e532812bb40c..7472af4fdbd2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,8 @@ Main (unreleased) - Added links between compatible components in the documentation to make it easier to discover them. (@thampiotr) +- Added support for `loki.write` to flush WAL on agent shutdown. (@thepalbi) + ### Bugfixes - Update `pyroscope.ebpf` to fix a logical bug causing to profile to many kthreads instead of regular processes https://github.com/grafana/pyroscope/pull/2778 (@korniltsev) diff --git a/component/loki/write/write.go b/component/loki/write/write.go index 53a3b9821a08..a31cb0745976 100644 --- a/component/loki/write/write.go +++ b/component/loki/write/write.go @@ -41,6 +41,7 @@ type WalArguments struct { MaxSegmentAge time.Duration `river:"max_segment_age,attr,optional"` MinReadFrequency time.Duration `river:"min_read_frequency,attr,optional"` MaxReadFrequency time.Duration `river:"max_read_frequency,attr,optional"` + DrainTimeout time.Duration `river:"drain_timeout,attr,optional"` } func (wa *WalArguments) Validate() error { @@ -58,6 +59,7 @@ func (wa *WalArguments) SetToDefault() { MaxSegmentAge: wal.DefaultMaxSegmentAge, MinReadFrequency: wal.DefaultWatchConfig.MinReadFrequency, MaxReadFrequency: wal.DefaultWatchConfig.MaxReadFrequency, + DrainTimeout: wal.DefaultWatchConfig.DrainTimeout, } } @@ -163,6 +165,7 @@ func (c *Component) Update(args component.Arguments) error { WatchConfig: wal.WatchConfig{ MinReadFrequency: newArgs.WAL.MinReadFrequency, MaxReadFrequency: newArgs.WAL.MaxReadFrequency, + DrainTimeout: newArgs.WAL.DrainTimeout, }, } diff --git a/component/loki/write/write_test.go b/component/loki/write/write_test.go index 642b53703e0c..d77bebe21c0f 100644 --- a/component/loki/write/write_test.go +++ b/component/loki/write/write_test.go @@ -79,6 +79,7 @@ func TestUnmarshallWalAttrributes(t *testing.T) { MaxSegmentAge: wal.DefaultMaxSegmentAge, MinReadFrequency: wal.DefaultWatchConfig.MinReadFrequency, MaxReadFrequency: wal.DefaultWatchConfig.MaxReadFrequency, + DrainTimeout: wal.DefaultWatchConfig.DrainTimeout, }, }, "wal enabled with defaults": { @@ -90,6 +91,7 @@ func TestUnmarshallWalAttrributes(t *testing.T) { MaxSegmentAge: wal.DefaultMaxSegmentAge, MinReadFrequency: wal.DefaultWatchConfig.MinReadFrequency, MaxReadFrequency: wal.DefaultWatchConfig.MaxReadFrequency, + DrainTimeout: wal.DefaultWatchConfig.DrainTimeout, }, }, "wal enabled with some overrides": { @@ -97,12 +99,14 @@ func TestUnmarshallWalAttrributes(t *testing.T) { enabled = true max_segment_age = "10m" min_read_frequency = "11ms" + drain_timeout = "5m" `, expected: WalArguments{ Enabled: true, MaxSegmentAge: time.Minute * 10, MinReadFrequency: time.Millisecond * 11, MaxReadFrequency: wal.DefaultWatchConfig.MaxReadFrequency, + DrainTimeout: time.Minute * 5, }, }, } { diff --git a/docs/sources/flow/reference/components/loki.write.md b/docs/sources/flow/reference/components/loki.write.md index efbcdf34eabc..d776ef9efe60 100644 --- a/docs/sources/flow/reference/components/loki.write.md +++ b/docs/sources/flow/reference/components/loki.write.md @@ -166,6 +166,7 @@ Name | Type | Description `max_segment_age` | `duration` | Maximum time a WAL segment should be allowed to live. Segments older than this setting will be eventually deleted. | `"1h"` | no `min_read_frequency` | `duration` | Minimum backoff time in the backup read mechanism. | `"250ms"` | no `max_read_frequency` | `duration` | Maximum backoff time in the backup read mechanism. | `"1s"` | no +`max_read_frequency` | `duration` | Maximum backoff time in the backup read mechanism. | `"1s"` | no [run]: {{< relref "../cli/run.md" >}} From d45d5bf95574636d49a4e71dc195d16cc4c74d3e Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Fri, 17 Nov 2023 18:03:06 -0300 Subject: [PATCH 11/13] add docs --- docs/sources/flow/reference/components/loki.write.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/sources/flow/reference/components/loki.write.md b/docs/sources/flow/reference/components/loki.write.md index d776ef9efe60..75aad04f3f2a 100644 --- a/docs/sources/flow/reference/components/loki.write.md +++ b/docs/sources/flow/reference/components/loki.write.md @@ -162,11 +162,11 @@ The following arguments are supported: Name | Type | Description | Default | Required --------------------- |------------|--------------------------------------------------------------------------------------------------------------------|-----------| -------- -`enabled` | `bool` | Whether to enable the WAL. | false | no +`enabled` | `bool` | Whether to enable the WAL. | false | no `max_segment_age` | `duration` | Maximum time a WAL segment should be allowed to live. Segments older than this setting will be eventually deleted. | `"1h"` | no `min_read_frequency` | `duration` | Minimum backoff time in the backup read mechanism. | `"250ms"` | no `max_read_frequency` | `duration` | Maximum backoff time in the backup read mechanism. | `"1s"` | no -`max_read_frequency` | `duration` | Maximum backoff time in the backup read mechanism. | `"1s"` | no +`drain_timeout` | `duration` | Maximum time the WAL drain procedure can take, before being forcefully stopped. | `"30s"` | no [run]: {{< relref "../cli/run.md" >}} From ed3190b9e83c6a6a3f95508ca6720c0d8353dfb1 Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Thu, 23 Nov 2023 11:13:00 -0300 Subject: [PATCH 12/13] splitting apart Stop and Drain --- component/common/loki/client/manager.go | 15 ++++++++---- component/common/loki/wal/config.go | 4 ++-- component/common/loki/wal/watcher.go | 28 +++++++++++------------ component/common/loki/wal/watcher_test.go | 15 +++++++----- component/loki/write/types.go | 2 +- 5 files changed, 36 insertions(+), 28 deletions(-) diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index d8caad75d120..14fbf92be6df 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -35,8 +35,9 @@ func (n nilNotifier) SubscribeCleanup(_ wal.CleanupEventSubscriber) {} func (n nilNotifier) SubscribeWrite(_ wal.WriteEventSubscriber) {} -type Stoppable interface { - Stop(drain bool) +type StoppableWatcher interface { + Stop() + Drain() } type StoppableClient interface { @@ -53,7 +54,7 @@ type StoppableClient interface { type Manager struct { name string clients []Client - walWatchers []Stoppable + walWatchers []StoppableWatcher // stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface stoppableClients []StoppableClient @@ -78,7 +79,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr clientsCheck := make(map[string]struct{}) clients := make([]Client, 0, len(clientCfgs)) - watchers := make([]Stoppable, 0, len(clientCfgs)) + watchers := make([]StoppableWatcher, 0, len(clientCfgs)) stoppableClients := make([]StoppableClient, 0, len(clientCfgs)) for _, cfg := range clientCfgs { // Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name). @@ -203,7 +204,11 @@ func (m *Manager) StopWithDrain(drain bool) { // stop wal watchers for _, walWatcher := range m.walWatchers { - walWatcher.Stop(drain) + // if drain enabled, drain the WAL + if drain { + walWatcher.Drain() + } + walWatcher.Stop() } // close clients for _, c := range m.stoppableClients { diff --git a/component/common/loki/wal/config.go b/component/common/loki/wal/config.go index 160bca2b90a9..7c22d747c13d 100644 --- a/component/common/loki/wal/config.go +++ b/component/common/loki/wal/config.go @@ -10,9 +10,9 @@ const ( // DefaultWatchConfig is the opinionated defaults for operating the Watcher. var DefaultWatchConfig = WatchConfig{ - MinReadFrequency: time.Millisecond * 250, + MinReadFrequency: 250 * time.Millisecond, MaxReadFrequency: time.Second, - DrainTimeout: time.Second * 30, + DrainTimeout: 15 * time.Second, } // Config contains all WAL-related settings. diff --git a/component/common/loki/wal/watcher.go b/component/common/loki/wal/watcher.go index a92a62631f7e..f91e71b856dc 100644 --- a/component/common/loki/wal/watcher.go +++ b/component/common/loki/wal/watcher.go @@ -349,22 +349,22 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) { return readData, firstErr } -// Stop stops the Watcher, draining the WAL until the end of the last segment if drain is true. Since the writer of the WAL -// is expected to have stopped before the Watcher, no further writes are expected, so segments can be safely consumed. -// -// Note if drain is enabled, the caller routine of Stop will block executing the drain procedure. -func (w *Watcher) Stop(drain bool) { - if drain { - level.Info(w.logger).Log("msg", "Draining Watcher") - w.state.Transition(internal.StateDraining) - // wait for drain timeout, or stopping state, in case the Watcher does the transition itself promptly - select { - case <-time.NewTimer(w.drainTimeout).C: - level.Warn(w.logger).Log("msg", "Watcher drain timeout occurred, transitioning to Stopping") - case <-w.state.WaitForStopping(): - } +// Drain moves the Watcher to a draining state, which will assume no more data is being written to the WAL, and it will +// attempt to read until the end of the last written segment. The calling routine of Drain will block until all data is +// read, or a timeout occurs. +func (w *Watcher) Drain() { + level.Info(w.logger).Log("msg", "Draining Watcher") + w.state.Transition(internal.StateDraining) + // wait for drain timeout, or stopping state, in case the Watcher does the transition itself promptly + select { + case <-time.NewTimer(w.drainTimeout).C: + level.Warn(w.logger).Log("msg", "Watcher drain timeout occurred, transitioning to Stopping") + case <-w.state.WaitForStopping(): } +} +// Stop stops the Watcher, shutting down the main routine. +func (w *Watcher) Stop() { w.state.Transition(internal.StateStopping) // upon calling stop, wait for main mainLoop execution to stop diff --git a/component/common/loki/wal/watcher_test.go b/component/common/loki/wal/watcher_test.go index dbdef8a5d3bb..15644d740a28 100644 --- a/component/common/loki/wal/watcher_test.go +++ b/component/common/loki/wal/watcher_test.go @@ -343,7 +343,7 @@ func TestWatcher(t *testing.T) { } // create new watcher, and defer stop watcher := NewWatcher(dir, "test", metrics, writeTo, logger, DefaultWatchConfig, noMarker{}) - defer watcher.Stop(false) + defer watcher.Stop() wl, err := New(Config{ Enabled: true, Dir: dir, @@ -421,7 +421,7 @@ func TestWatcher_Replay(t *testing.T) { return 0 }, }) - defer watcher.Stop(false) + defer watcher.Stop() wl, err := New(Config{ Enabled: true, Dir: dir, @@ -503,7 +503,7 @@ func TestWatcher_Replay(t *testing.T) { return -1 }, }) - defer watcher.Stop(false) + defer watcher.Stop() wl, err := New(Config{ Enabled: true, Dir: dir, @@ -682,7 +682,8 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) { // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 now := time.Now() - watcher.Stop(true) + watcher.Drain() + watcher.Stop() // expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout // has one extra second. @@ -733,7 +734,8 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) { // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 now := time.Now() - watcher.Stop(true) + watcher.Drain() + watcher.Stop() // expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout // has one extra second. @@ -785,7 +787,8 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) { // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 now := time.Now() - watcher.Stop(true) + watcher.Drain() + watcher.Stop() require.InDelta(t, time.Second*10, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s") require.Less(t, int(writeTo.entriesReceived.Load()), 20, "expected watcher to have not consumed WAL fully") diff --git a/component/loki/write/types.go b/component/loki/write/types.go index 2959f1b681a1..dc240c675e98 100644 --- a/component/loki/write/types.go +++ b/component/loki/write/types.go @@ -82,7 +82,7 @@ type QueueConfig struct { func (q *QueueConfig) SetToDefault() { *q = QueueConfig{ Capacity: 10 * units.MiB, // considering the default BatchSize of 1MiB, this gives us a default buffered channel of size 10 - DrainTimeout: time.Minute, + DrainTimeout: 15 * time.Second, } } From 5cde5fc54fba16f172e61aa7cba3bd9821bbae0f Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Wed, 6 Dec 2023 13:57:11 -0300 Subject: [PATCH 13/13] minimize manager stop time --- component/common/loki/client/manager.go | 79 ++++++++++++++++--------- 1 file changed, 52 insertions(+), 27 deletions(-) diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index 14fbf92be6df..244aa587a81f 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -45,6 +45,27 @@ type StoppableClient interface { StopNow() } +// watcherClientPair represents a pair of watcher and client, which are coupled together, or just a single client. +type watcherClientPair struct { + watcher StoppableWatcher + client StoppableClient +} + +// Stop will proceed to stop, in order, the possibly-nil watcher and the client. +func (p watcherClientPair) Stop(drain bool) { + // if the config has WAL disabled, there will be no watcher per client config + if p.watcher != nil { + // if drain enabled, drain the WAL + if drain { + p.watcher.Drain() + } + p.watcher.Stop() + } + + // subsequently stop the client + p.client.Stop() +} + // Manager manages remote write client instantiation, and connects the related components to orchestrate the flow of loki.Entry // from the scrape targets, to the remote write clients themselves. // @@ -52,12 +73,10 @@ type StoppableClient interface { // work, tracked in https://github.com/grafana/loki/issues/8197, this Manager will be responsible for instantiating all client // types: Logger, Multi and WAL. type Manager struct { - name string - clients []Client - walWatchers []StoppableWatcher + name string - // stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface - stoppableClients []StoppableClient + clients []Client + pairs []watcherClientPair entries chan loki.Entry once sync.Once @@ -79,8 +98,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr clientsCheck := make(map[string]struct{}) clients := make([]Client, 0, len(clientCfgs)) - watchers := make([]StoppableWatcher, 0, len(clientCfgs)) - stoppableClients := make([]StoppableClient, 0, len(clientCfgs)) + pairs := make([]watcherClientPair, 0, len(clientCfgs)) for _, cfg := range clientCfgs { // Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name). clientName := GetClientName(cfg) @@ -104,7 +122,6 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr if err != nil { return nil, fmt.Errorf("error starting queue client: %w", err) } - stoppableClients = append(stoppableClients, queue) // subscribe watcher's wal.WriteTo to writer events. This will make the writer trigger the cleanup of the wal.WriteTo // series cache whenever a segment is deleted. @@ -117,7 +134,10 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr level.Debug(logger).Log("msg", "starting WAL watcher for client", "client", clientName) watcher.Start() - watchers = append(watchers, watcher) + pairs = append(pairs, watcherClientPair{ + watcher: watcher, + client: queue, + }) } else { client, err := New(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger) if err != nil { @@ -125,14 +145,16 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr } clients = append(clients, client) - stoppableClients = append(stoppableClients, client) + + pairs = append(pairs, watcherClientPair{ + client: client, + }) } } manager := &Manager{ - clients: clients, - stoppableClients: stoppableClients, - walWatchers: watchers, - entries: make(chan loki.Entry), + clients: clients, + pairs: pairs, + entries: make(chan loki.Entry), } if walCfg.Enabled { manager.name = buildManagerName("wal", clientCfgs...) @@ -175,8 +197,8 @@ func (m *Manager) startWithForward() { } func (m *Manager) StopNow() { - for _, c := range m.stoppableClients { - c.StopNow() + for _, pair := range m.pairs { + pair.client.StopNow() } } @@ -202,18 +224,21 @@ func (m *Manager) StopWithDrain(drain bool) { m.once.Do(func() { close(m.entries) }) m.wg.Wait() - // stop wal watchers - for _, walWatcher := range m.walWatchers { - // if drain enabled, drain the WAL - if drain { - walWatcher.Drain() - } - walWatcher.Stop() - } - // close clients - for _, c := range m.stoppableClients { - c.Stop() + var stopWG sync.WaitGroup + + // Depending on whether drain is enabled, the maximum time stopping a watcher and it's client can take is + // the drain time of the watcher + drain time client. To minimize this, and since we keep a separate WAL for each + // client config, each (watcher, client) pair is stopped concurrently. + for _, pair := range m.pairs { + stopWG.Add(1) + go func(pair watcherClientPair) { + defer stopWG.Done() + pair.Stop(drain) + }(pair) } + + // wait for all pairs to be stopped + stopWG.Wait() } // GetClientName computes the specific name for each client config. The name is either the configured Name setting in Config,