forked from influxdata/telegraf-operator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
watcher.go
129 lines (106 loc) · 4 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package main
import (
"io/ioutil"
"path/filepath"
"strings"
"sync/atomic"
"time"
"github.com/fsnotify/fsnotify"
"github.com/go-logr/logr"
)
type telegrafClassesOnChange func()
// telegrafClassesWatcher allows monitoring a directory with telegraf classes using
// fsnotify package and batching multiple events to reduce number of Kubernetes API calls.
type telegrafClassesWatcher struct {
logger logr.Logger
watcherEvents chan fsnotify.Event
onChange telegrafClassesOnChange
eventCount uint64
eventChannel chan struct{}
eventDelay time.Duration
}
// newTelegrafClassesWatcher creates a new instance of telegrafClassesWatcher.
func newTelegrafClassesWatcher(logger logr.Logger, telegrafClassesDirectory string, onChange telegrafClassesOnChange) (*telegrafClassesWatcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
// watching the contents of classes directory requires adding the directory as well as most child elements
logger.Info("adding directory to watcher", "directory", telegrafClassesDirectory)
watcher.Add(telegrafClassesDirectory)
items, err := ioutil.ReadDir(telegrafClassesDirectory)
if err != nil {
return nil, err
}
for _, item := range items {
name := item.Name()
// Add all items in the classes directory except for current/previous secret contents that begin with "..", "." and ".."
// explicitly add "..data" directory as this is the directory that maps current state of the secret.
//
// Example listing of classes directory:
//
// drwxrwxrwt 3 root root 100 Jul 29 12:27 .
// drwxr-xr-x 1 root root 4096 Jul 29 12:26 ..
// drwxr-xr-x 2 root root 60 Jul 29 12:27 ..2021_07_29_12_27_39.113045998
// lrwxrwxrwx 1 root root 31 Jul 29 12:27 ..data -> ..2021_07_29_12_27_39.113045998
// lrwxrwxrwx 1 root root 20 Jul 29 12:26 app -> ..data/app
// lrwxrwxrwx 1 root root 20 Jul 29 12:26 basic -> ..data/basic
//
// in the above case, we want to match "..data", "app" and "basic", but skip ".", ".." and "..2021_07_29_12_27_39.113045998"
if name == "..data" || (name != "." && !strings.HasPrefix(name, "..")) {
p := filepath.Join(telegrafClassesDirectory, name)
logger.Info("adding item to watch", "path", p)
err = watcher.Add(p)
if err != nil {
return nil, err
}
}
}
w := &telegrafClassesWatcher{
watcherEvents: watcher.Events,
logger: logger,
onChange: onChange,
// allow large number of messages in the channel to avoid blocking
eventChannel: make(chan struct{}, 100),
// delay by 10 seconds to group multiple fsnotify events into single invocation of callback
eventDelay: 10 * time.Second,
}
w.createGoroutines()
return w, nil
}
// createGoroutines runs all required goroutines for the watcher.
func (w *telegrafClassesWatcher) createGoroutines() {
go w.batchChanges()
go w.monitorForChanges()
}
// batchChanges is a goroutine that batches invocations of onChange()
// based on events sent from monitorForChanges().
func (w *telegrafClassesWatcher) batchChanges() {
var previousEventCount uint64
for range w.eventChannel {
currentEventCount := atomic.LoadUint64(&w.eventCount)
// check if counter is same as last time events were processed,
// only delay and batch if it is different
if currentEventCount != previousEventCount {
// delay processing of the event to batch multiple events from file
time.Sleep(w.eventDelay)
// update the event counter again to latest, potentially different value
currentEventCount = atomic.LoadUint64(&w.eventCount)
w.onChange()
previousEventCount = currentEventCount
}
}
}
// monitorForChanges helps batch events from fsnotify by incrementing a counter and
// sending events using an internal channel, then handled by batchChanges().
func (w *telegrafClassesWatcher) monitorForChanges() {
for {
_, ok := <-w.watcherEvents
if ok {
// increase the event counter and send a message to goroutine
// that batches invocations of onChange()
atomic.AddUint64(&w.eventCount, 1)
w.eventChannel <- struct{}{}
}
}
}