diff --git a/cmd/goflow2/main.go b/cmd/goflow2/main.go index 1e68f693..53f46eb1 100644 --- a/cmd/goflow2/main.go +++ b/cmd/goflow2/main.go @@ -60,6 +60,9 @@ var ( Format = flag.String("format", "json", fmt.Sprintf("Choose the format (available: %s)", strings.Join(format.GetFormats(), ", "))) Transport = flag.String("transport", "file", fmt.Sprintf("Choose the transport (available: %s)", strings.Join(transport.GetTransports(), ", "))) + ErrCnt = flag.Int("err.cnt", 10, "Maximum errors per batch for muting") + ErrInt = flag.Duration("err.int", time.Second*10, "Maximum errors interval for muting") + Addr = flag.String("addr", ":8080", "HTTP server address") TemplatePath = flag.String("templates.path", "/templates", "NetFlow/IPFIX templates list") @@ -285,6 +288,8 @@ func main() { decodeFunc = metrics.PromDecoderWrapper(decodeFunc, listenAddrUrl.Scheme) pipes = append(pipes, p) + bm := utils.NewBatchMute(*ErrInt, *ErrCnt) + // starts receivers // the function either returns an error if err := recv.Start(hostname, int(port), decodeFunc); err != nil { @@ -299,22 +304,33 @@ func main() { case <-q: return case err := <-recv.Errors(): - l := l.WithError(err) - if errors.Is(err, netflow.ErrorTemplateNotFound) { - l.Warn("template error") - } else if errors.Is(err, debug.PanicError) { - var pErrMsg *debug.PanicErrorMessage - if errors.As(err, &pErrMsg) { - l = l.WithFields(log.Fields{ - "message": pErrMsg.Msg, - "stacktrace": string(pErrMsg.Stacktrace), - }) - } - l.Error("intercepted panic") - } else if errors.Is(err, net.ErrClosed) { + if errors.Is(err, net.ErrClosed) { l.Info("closed receiver") - } else { + continue + } else if !errors.Is(err, netflow.ErrorTemplateNotFound) && !errors.Is(err, debug.PanicError) { l.Error("error") + continue + } + + muted, skipped := bm.Increment() + if muted && skipped == 0 { + log.Warn("too many receiver messages, muting") + } else if !muted && skipped > 0 { + log.Warnf("skipped %d receiver messages", skipped) + } else if !muted { + l := l.WithError(err) + if errors.Is(err, netflow.ErrorTemplateNotFound) { + l.Warn("template error") + } else if errors.Is(err, debug.PanicError) { + var pErrMsg *debug.PanicErrorMessage + if errors.As(err, &pErrMsg) { + l = l.WithFields(log.Fields{ + "message": pErrMsg.Msg, + "stacktrace": string(pErrMsg.Stacktrace), + }) + } + l.Error("intercepted panic") + } } } @@ -336,6 +352,8 @@ func main() { transportErr = transportErrorFct.Errors() } + bm := utils.NewBatchMute(*ErrInt, *ErrCnt) + for { select { case <-q: @@ -344,8 +362,17 @@ func main() { if err == nil { return } - l := log.WithError(err) - l.Error("transport error") + + muted, skipped := bm.Increment() + if muted && skipped == 0 { + log.Warn("too many transport errors, muting") + } else if !muted && skipped > 0 { + log.Warnf("skipped %d transport errors", skipped) + } else if !muted { + l := log.WithError(err) + l.Error("transport error") + } + } } }() diff --git a/utils/mute.go b/utils/mute.go new file mode 100644 index 00000000..9e5624d5 --- /dev/null +++ b/utils/mute.go @@ -0,0 +1,43 @@ +package utils + +import ( + "time" +) + +type BatchMute struct { + batchTime time.Time + resetInterval time.Duration + ctr int + max int +} + +func (b *BatchMute) increment(val int, t time.Time) (muted bool, skipped int) { + + if b.max == 0 || b.resetInterval == 0 { + return muted, skipped + } + + if b.ctr >= b.max { + skipped = b.ctr - b.max + } + + if t.Sub(b.batchTime) > b.resetInterval { + b.ctr = 0 + b.batchTime = t + } + b.ctr += val + + return b.max > 0 && b.ctr > b.max, skipped +} + +func (b *BatchMute) Increment() (muting bool, skipped int) { + return b.increment(1, time.Now().UTC()) +} + +func NewBatchMute(resetInterval time.Duration, max int) *BatchMute { + return &BatchMute{ + batchTime: time.Now().UTC(), + resetInterval: resetInterval, + max: max, + } +} diff --git a/utils/mute_test.go b/utils/mute_test.go new file mode 100644 index 00000000..b4327b70 --- /dev/null +++ b/utils/mute_test.go @@ -0,0 +1,51 @@ +package utils + +import ( + "testing" + "time" +) + +func TestBatchMute(t *testing.T) { + tm := time.Date(2023, time.November, 10, 23, 0, 0, 0, time.UTC) + bm := BatchMute{ + batchTime: tm, + resetInterval: time.Second * 10, + max: 5, + } + + for i := 0; i < 20; i++ { + tm = tm.Add(time.Second) + t.Log(bm.increment(1, tm)) + } + +} + +func TestBatchMuteZero(t *testing.T) { + tm := time.Date(2023, time.November, 10, 23, 0, 0, 0, time.UTC) + bm := BatchMute{ + batchTime: tm, + resetInterval: time.Second * 10, + max: 0, + } + + for i := 0; i < 20; i++ { + tm = tm.Add(time.Second) + t.Log(bm.increment(1, tm)) + } + +} + +func TestBatchMuteInterval(t *testing.T) { + tm := time.Date(2023, time.November, 10, 23, 0, 0, 0, time.UTC) + bm := BatchMute{ + batchTime: tm, + resetInterval: 0, + max: 5, + } + + for i := 0; i < 20; i++ { + tm = tm.Add(time.Second) + t.Log(bm.increment(1, tm)) + } + +}