From d4bffe21a3e31dc847f7eb887a599a8c997b092c Mon Sep 17 00:00:00 2001 From: Louis Date: Sun, 18 Aug 2024 20:10:59 -0700 Subject: [PATCH] goflow2: use slog instead of logrus (#199) This removes the `logrus` dependency as it's in maintenance-only mode. --- cmd/enricher/main.go | 43 ++++++++----- cmd/goflow2/main.go | 141 +++++++++++++++++++++++++------------------ go.mod | 1 - go.sum | 4 -- 4 files changed, 110 insertions(+), 79 deletions(-) diff --git a/cmd/enricher/main.go b/cmd/enricher/main.go index 71bfd1b2..f03358a9 100644 --- a/cmd/enricher/main.go +++ b/cmd/enricher/main.go @@ -7,6 +7,8 @@ import ( "flag" "fmt" "io" + "log" + "log/slog" "net" "os" "strings" @@ -25,7 +27,6 @@ import ( _ "github.com/netsampler/goflow2/v2/transport/kafka" "github.com/oschwald/geoip2-golang" - log "github.com/sirupsen/logrus" "google.golang.org/protobuf/encoding/protodelim" ) @@ -92,15 +93,30 @@ func main() { os.Exit(0) } - lvl, _ := log.ParseLevel(*LogLevel) - log.SetLevel(lvl) + var loglevel slog.Level + if err := loglevel.UnmarshalText([]byte(*LogLevel)); err != nil { + log.Fatal("error parsing log level") + } + + lo := slog.HandlerOptions{ + Level: loglevel, + } + logger := slog.New(slog.NewTextHandler(os.Stderr, &lo)) + + switch *LogFmt { + case "json": + logger = slog.New(slog.NewJSONHandler(os.Stderr, &lo)) + } + + slog.SetDefault(logger) var dbAsn, dbCountry *geoip2.Reader var err error if *DbAsn != "" { dbAsn, err = geoip2.Open(*DbAsn) if err != nil { - log.Fatal(err) + slog.Error("error opening asn db", slog.String("error", err.Error())) + os.Exit(1) } defer dbAsn.Close() } @@ -108,7 +124,8 @@ func main() { if *DbCountry != "" { dbCountry, err = geoip2.Open(*DbCountry) if err != nil { - log.Fatal(err) + slog.Error("error opening country db", slog.String("error", err.Error())) + os.Exit(1) } defer dbCountry.Close() } @@ -120,16 +137,12 @@ func main() { transporter, err := transport.FindTransport(*Transport) if err != nil { - log.Fatal(err) + slog.Error("error transporter", slog.String("error", err.Error())) + os.Exit(1) } defer transporter.Close() - switch *LogFmt { - case "json": - log.SetFormatter(&log.JSONFormatter{}) - } - - log.Info("starting enricher") + logger.Info("starting enricher") rdr := bufio.NewReader(os.Stdin) @@ -138,7 +151,7 @@ func main() { if err := protodelim.UnmarshalFrom(rdr, &msg); err != nil && errors.Is(err, io.EOF) { return } else if err != nil { - log.Error(err) + slog.Error("error unmarshalling message", slog.String("error", err.Error())) continue } @@ -150,13 +163,13 @@ func main() { key, data, err := formatter.Format(&msg) if err != nil { - log.Error(err) + slog.Error("error formatting message", slog.String("error", err.Error())) continue } err = transporter.Send(key, data) if err != nil { - log.Error(err) + slog.Error("error sending message", slog.String("error", err.Error())) continue } diff --git a/cmd/goflow2/main.go b/cmd/goflow2/main.go index 7b3f5aca..88f909cd 100644 --- a/cmd/goflow2/main.go +++ b/cmd/goflow2/main.go @@ -6,6 +6,8 @@ import ( "flag" "fmt" "io" + "log" + "log/slog" "net" "net/http" "net/url" @@ -42,7 +44,6 @@ import ( "github.com/netsampler/goflow2/v2/utils/debug" "github.com/prometheus/client_golang/prometheus/promhttp" - log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" ) @@ -87,22 +88,33 @@ func main() { os.Exit(0) } - lvl, _ := log.ParseLevel(*LogLevel) - log.SetLevel(lvl) + var loglevel slog.Level + if err := loglevel.UnmarshalText([]byte(*LogLevel)); err != nil { + log.Fatal("error parsing log level") + } + + lo := slog.HandlerOptions{ + Level: loglevel, + } + logger := slog.New(slog.NewTextHandler(os.Stderr, &lo)) switch *LogFmt { case "json": - log.SetFormatter(&log.JSONFormatter{}) + logger = slog.New(slog.NewJSONHandler(os.Stderr, &lo)) } + slog.SetDefault(logger) + formatter, err := format.FindFormat(*Format) if err != nil { - log.Fatal(err) + slog.Error("error formatter", slog.String("error", err.Error())) + os.Exit(1) } transporter, err := transport.FindTransport(*Transport) if err != nil { - log.Fatal(err) + slog.Error("error transporter", slog.String("error", err.Error())) + os.Exit(1) } var flowProducer producer.ProducerInterface @@ -113,12 +125,14 @@ func main() { if *MappingFile != "" { f, err := os.Open(*MappingFile) if err != nil { - log.Fatal(err) + slog.Error("error opening mapping", slog.String("error", err.Error())) + os.Exit(1) } cfgProducer, err = LoadMapping(f) f.Close() if err != nil { - log.Fatal(err) + slog.Error("error loading mapping", slog.String("error", err.Error())) + os.Exit(1) } } @@ -129,12 +143,14 @@ func main() { flowProducer, err = protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem) if err != nil { - log.Fatal(err) + slog.Error("error producer", slog.String("error", err.Error())) + os.Exit(1) } } else if *Produce == "raw" { flowProducer = &rawproducer.RawProducer{} } else { - log.Fatalf("producer %s does not exist", *Produce) + slog.Error("producer does not exist", slog.String("error", err.Error()), slog.String("producer", *Produce)) + os.Exit(1) } // intercept panic and generate an error @@ -150,12 +166,13 @@ func main() { if !collecting { wr.WriteHeader(http.StatusServiceUnavailable) if _, err := wr.Write([]byte("Not OK\n")); err != nil { - log.WithError(err).Error("error writing HTTP") + slog.Error("error writing HTTP", slog.String("error", err.Error())) } } else { wr.WriteHeader(http.StatusOK) if _, err := wr.Write([]byte("OK\n")); err != nil { - log.WithError(err).Error("error writing HTTP") + slog.Error("error writing HTTP", slog.String("error", err.Error())) + } } }) @@ -167,18 +184,17 @@ func main() { wg.Add(1) go func() { defer wg.Done() - l := log.WithFields(log.Fields{ - "http": *Addr, - }) + logger := logger.With(slog.String("http", *Addr)) err := srv.ListenAndServe() if err != nil && !errors.Is(err, http.ErrServerClosed) { - l.WithError(err).Fatal("HTTP server error") + slog.Error("HTTP server error", slog.String("error", err.Error())) + os.Exit(1) } - l.Info("closed HTTP server") + logger.Info("closed HTTP server") }() } - log.Info("starting GoFlow2") + logger.Info("starting GoFlow2") c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) @@ -190,12 +206,14 @@ func main() { for _, listenAddress := range strings.Split(*ListenAddresses, ",") { listenAddrUrl, err := url.Parse(listenAddress) if err != nil { - log.Fatal(err) + logger.Error("error parsing address", slog.String("error", err.Error())) + os.Exit(1) } numSockets := 1 if listenAddrUrl.Query().Has("count") { if numSocketsTmp, err := strconv.ParseUint(listenAddrUrl.Query().Get("count"), 10, 64); err != nil { - log.Fatal(err) + slog.Error("error parsing count of sockets in URL", slog.String("error", err.Error())) + os.Exit(1) } else { numSockets = int(numSocketsTmp) } @@ -207,7 +225,8 @@ func main() { var numWorkers int if listenAddrUrl.Query().Has("workers") { if numWorkersTmp, err := strconv.ParseUint(listenAddrUrl.Query().Get("workers"), 10, 64); err != nil { - log.Fatal(err) + slog.Error("error parsing workers in URL", slog.String("error", err.Error())) + os.Exit(1) } else { numWorkers = int(numWorkersTmp) } @@ -219,14 +238,16 @@ func main() { var isBlocking bool if listenAddrUrl.Query().Has("blocking") { if isBlocking, err = strconv.ParseBool(listenAddrUrl.Query().Get("blocking")); err != nil { - log.Fatal(err) + slog.Error("error parsing blocking in URL", slog.String("error", err.Error())) + os.Exit(1) } } var queueSize int if listenAddrUrl.Query().Has("queue_size") { if queueSizeTmp, err := strconv.ParseUint(listenAddrUrl.Query().Get("queue_size"), 10, 64); err != nil { - log.Fatal(err) + slog.Error("error parsing queue_size in URL", slog.String("error", err.Error())) + os.Exit(1) } else { queueSize = int(queueSizeTmp) } @@ -237,22 +258,21 @@ func main() { hostname := listenAddrUrl.Hostname() port, err := strconv.ParseUint(listenAddrUrl.Port(), 10, 64) if err != nil { - log.Errorf("Port %s could not be converted to integer", listenAddrUrl.Port()) - return + slog.Error("port could not be converted to integer", slog.String("port", listenAddrUrl.Port())) + os.Exit(1) } - logFields := log.Fields{ - "scheme": listenAddrUrl.Scheme, - "hostname": hostname, - "port": port, - "count": numSockets, - "workers": numWorkers, - "blocking": isBlocking, - "queue_size": queueSize, + logAttr := []any{ + slog.String("scheme", listenAddrUrl.Scheme), + slog.String("hostname", hostname), + slog.Int64("port", int64(port)), + slog.Int("count", numSockets), + slog.Int64("workers", int64(numWorkers)), + slog.Bool("blocking", isBlocking), + slog.Int64("queue_size", int64(queueSize)), } - l := log.WithFields(logFields) - - l.Info("starting collection") + logger := logger.With(logAttr...) + logger.Info("starting collection") cfg := &utils.UDPReceiverConfig{ Sockets: numSockets, @@ -263,7 +283,8 @@ func main() { } recv, err := utils.NewUDPReceiver(cfg) if err != nil { - log.WithError(err).Fatal("error creating UDP receiver") + logger.Error("error creating UDP receiver", slog.String("error", err.Error())) + os.Exit(1) } cfgPipe := &utils.PipeConfig{ @@ -282,8 +303,8 @@ func main() { } else if listenAddrUrl.Scheme == "flow" { p = utils.NewFlowPipe(cfgPipe) } else { - l.Errorf("scheme %s does not exist", listenAddrUrl.Scheme) - return + logger.Error("scheme does not exist", slog.String("error", listenAddrUrl.Scheme)) + os.Exit(1) } decodeFunc = p.DecodeFlow @@ -298,7 +319,8 @@ func main() { // starts receivers // the function either returns an error if err := recv.Start(hostname, int(port), decodeFunc); err != nil { - l.Fatal(err) + logger.Error("error starting", slog.String("error", listenAddrUrl.Scheme)) + os.Exit(1) } else { wg.Add(1) go func() { @@ -310,31 +332,34 @@ func main() { return case err := <-recv.Errors(): if errors.Is(err, net.ErrClosed) { - l.Info("closed receiver") + logger.Info("closed receiver") continue } else if !errors.Is(err, netflow.ErrorTemplateNotFound) && !errors.Is(err, debug.PanicError) { - l.WithError(err).Error("error") + logger.Error("error", slog.String("error", err.Error())) continue } muted, skipped := bm.Increment() if muted && skipped == 0 { - log.Warn("too many receiver messages, muting") + logger.Warn("too many receiver messages, muting") } else if !muted && skipped > 0 { - log.Warnf("skipped %d receiver messages", skipped) + logger.Warn("skipped receiver messages", slog.Int("count", skipped)) } else if !muted { - l := l.WithError(err) + attrs := []any{ + slog.String("error", err.Error()), + } + if errors.Is(err, netflow.ErrorTemplateNotFound) { - l.Warn("template error") + logger.Warn("template error") } else if errors.Is(err, debug.PanicError) { var pErrMsg *debug.PanicErrorMessage if errors.As(err, &pErrMsg) { - l = l.WithFields(log.Fields{ - "message": pErrMsg.Msg, - "stacktrace": string(pErrMsg.Stacktrace), - }) + attrs = append(attrs, + slog.Any("message", pErrMsg.Msg), + slog.String("stacktrace", string(pErrMsg.Stacktrace)), + ) } - l.Error("intercepted panic") + logger.Error("intercepted panic", attrs...) } } @@ -367,15 +392,13 @@ func main() { if err == nil { return } - muted, skipped := bm.Increment() if muted && skipped == 0 { - log.Warn("too many transport errors, muting") + logger.Warn("too many transport errors, muting") } else if !muted && skipped > 0 { - log.Warnf("skipped %d transport errors", skipped) + logger.Warn("skipped transport errors", slog.Int("count", skipped)) } else if !muted { - l := log.WithError(err) - l.Error("transport error") + logger.Error("transport error", slog.String("error", err.Error())) } } @@ -391,7 +414,7 @@ func main() { // stops receivers first, udp sockets will be down for _, recv := range receivers { if err := recv.Stop(); err != nil { - log.WithError(err).Error("error stopping receiver") + logger.Error("error stopping receiver", slog.String("error", err.Error())) } } // then stop pipe @@ -402,11 +425,11 @@ func main() { flowProducer.Close() // close transporter (eg: flushes message to Kafka) transporter.Close() - log.Info("closed transporter") + logger.Info("transporter closed") // close http server (prometheus + health check) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) if err := srv.Shutdown(ctx); err != nil { - log.WithError(err).Error("error shutting-down HTTP server") + logger.Error("error shutting-down HTTP server", slog.String("error", err.Error())) } cancel() close(q) // close errors diff --git a/go.mod b/go.mod index b3e63307..fc3f7cc3 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/libp2p/go-reuseport v0.4.0 github.com/oschwald/geoip2-golang v1.11.0 github.com/prometheus/client_golang v1.20.0 - github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.9.0 github.com/xdg-go/scram v1.1.2 google.golang.org/protobuf v1.34.2 diff --git a/go.sum b/go.sum index 840f3c02..6ff74f65 100644 --- a/go.sum +++ b/go.sum @@ -75,12 +75,9 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= -github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= -github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= @@ -115,7 +112,6 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=