From d3182008448964d4a8bac8436e609c4621c71768 Mon Sep 17 00:00:00 2001 From: Nikolay Eskov Date: Thu, 10 Oct 2024 12:32:00 +0300 Subject: [PATCH] Support multiple L2 nodes for monitoring process (#297) Now it's possible to analyze multiple L2 nodes. Several bugs fixed. --- cmd/nodemon/nodemon.go | 136 +++++++++++++++++++++++----------- pkg/analysis/l2/l2analyzer.go | 105 ++++++++++++++++++-------- pkg/tools/processing.go | 56 ++++++++++++++ 3 files changed, 222 insertions(+), 75 deletions(-) create mode 100644 pkg/tools/processing.go diff --git a/cmd/nodemon/nodemon.go b/cmd/nodemon/nodemon.go index 059ac943..24f0c433 100644 --- a/cmd/nodemon/nodemon.go +++ b/cmd/nodemon/nodemon.go @@ -2,16 +2,18 @@ package main import ( "context" - "errors" + stderrs "errors" "flag" "log" + "net/url" "os" "os/signal" "strings" - "sync" "syscall" "time" + "github.com/pkg/errors" + "nodemon/pkg/analysis/l2" "go.uber.org/zap" @@ -38,7 +40,7 @@ const ( ) var ( - errInvalidParameters = errors.New("invalid parameters") + errInvalidParameters = stderrs.New("invalid parameters") ) func main() { @@ -48,9 +50,9 @@ func main() { ) if err := run(); err != nil { switch { - case errors.Is(err, context.Canceled): + case stderrs.Is(err, context.Canceled): os.Exit(contextCanceledExitCode) - case errors.Is(err, errInvalidParameters): + case stderrs.Is(err, errInvalidParameters): os.Exit(invalidParametersExitCode) default: log.Fatal(err) @@ -58,6 +60,79 @@ func main() { } } +type nodemonL2Config struct { + L2nodeURLs string + L2nodeNames string +} + +func newNodemonL2Config() *nodemonL2Config { + c := new(nodemonL2Config) + tools.StringVarFlagWithEnv(&c.L2nodeURLs, "l2-urls", "", + "List of Waves L2 Blockchain nodes URLs to monitor. Provide space separated list of REST API URLs here.") + tools.StringVarFlagWithEnv(&c.L2nodeNames, "l2-names", "", + "List of Waves L2 Blockchain nodes names to monitor. Provide space separated list of nodes names here. "+ + "If not provided, URLs will be used as names.") + return c +} + +func (c *nodemonL2Config) present() bool { + return c.L2nodeURLs != "" +} + +func (c *nodemonL2Config) Nodes() []l2.Node { + if !c.present() { + return nil + } + urls := strings.Fields(c.L2nodeURLs) + names := strings.Fields(c.L2nodeNames) + out := make([]l2.Node, 0, len(urls)) + for i, nodeURL := range urls { + name := nodeURL + if len(names) == len(urls) { + name = names[i] + } + out = append(out, l2.Node{URL: nodeURL, Name: name}) + } + return out +} + +func validateURLs(urls []string) error { + var errs []error + for i, nodeURL := range urls { + if _, err := url.ParseRequestURI(nodeURL); err != nil { + errs = append(errs, errors.Wrapf(err, "%d-th URL '%s' is invalid", i+1, nodeURL)) + } + } + return stderrs.Join(errs...) +} + +func (c *nodemonL2Config) validate(logger *zap.Logger) error { + if !c.present() { + return nil + } + var ( + urls = strings.Fields(c.L2nodeURLs) + names = strings.Fields(c.L2nodeNames) + ) + if len(names) != 0 && len(urls) != len(names) { + logger.Sugar().Errorf("L2 node URLs and names should have the same length: names=%d, URLs=%d", + len(names), len(urls), + ) + return errInvalidParameters + } + if err := validateURLs(urls); err != nil { + logger.Error("Invalid L2 node URL", zap.Error(err)) + return errInvalidParameters + } + for i, nodeName := range names { + if nodeName == "" { + logger.Sugar().Errorf("%d-th node name is empty", i+1) + return errInvalidParameters + } + } + return nil +} + type nodemonVaultConfig struct { address string user string @@ -66,10 +141,6 @@ type nodemonVaultConfig struct { secretPath string } -func (n *nodemonVaultConfig) present() bool { - return n.address != "" -} - func newNodemonVaultConfig() *nodemonVaultConfig { c := new(nodemonVaultConfig) tools.StringVarFlagWithEnv(&c.address, "vault-address", "", "Vault server address.") @@ -82,6 +153,10 @@ func newNodemonVaultConfig() *nodemonVaultConfig { return c } +func (n *nodemonVaultConfig) present() bool { + return n.address != "" +} + func (n *nodemonVaultConfig) validate(logger *zap.Logger) error { if n.address == "" { // skip further validation return nil @@ -122,6 +197,7 @@ type nodemonConfig struct { logLevel string development bool vault *nodemonVaultConfig + l2 *nodemonL2Config } func newNodemonConfig() *nodemonConfig { @@ -129,7 +205,7 @@ func newNodemonConfig() *nodemonConfig { tools.StringVarFlagWithEnv(&c.storage, "storage", ".nodes.json", "Path to storage. Default value is \".nodes.json\"") tools.StringVarFlagWithEnv(&c.nodes, "nodes", "", - "Initial list of Waves Blockchain nodes to monitor. Provide comma separated list of REST API URLs here.") + "Initial list of Waves Blockchain nodes to monitor. Provide space separated list of REST API URLs here.") tools.StringVarFlagWithEnv(&c.bindAddress, "bind", ":8080", "Local network address to bind the HTTP API of the service on. Default value is \":8080\".") tools.DurationVarFlagWithEnv(&c.interval, "interval", @@ -151,12 +227,8 @@ func newNodemonConfig() *nodemonConfig { tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.") tools.StringVarFlagWithEnv(&c.logLevel, "log-level", "INFO", "Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level INFO.") - tools.StringVarFlagWithEnv(&c.L2nodeURL, "l2-node-url", "", - "") - tools.StringVarFlagWithEnv(&c.L2nodeName, "l2-nodes", "", - "List of Waves L2 Blockchain nodes to monitor. Provide comma separated list of REST API URLs here.") - c.vault = newNodemonVaultConfig() + c.l2 = newNodemonL2Config() return c } @@ -183,32 +255,7 @@ func (c *nodemonConfig) validate(logger *zap.Logger) error { logger.Error("Invalid base target threshold", zap.Uint64("threshold", c.baseTargetThreshold)) return errInvalidParameters } - return c.vault.validate(logger) -} - -func merge(channels ...<-chan entities.Alert) <-chan entities.Alert { - fanInFunc := func(wg *sync.WaitGroup, out chan<- entities.Alert, in <-chan entities.Alert) { - defer wg.Done() - // will keep working until the input channels are closed - for alert := range in { - out <- alert - } - } - - wg := new(sync.WaitGroup) - wg.Add(len(channels)) - out := make(chan entities.Alert) - - for _, ch := range channels { - go fanInFunc(wg, out, ch) - } - - go func() { - wg.Wait() - close(out) - }() - - return out + return stderrs.Join(c.vault.validate(logger), c.l2.validate(logger)) } func (c *nodemonConfig) runDiscordPairServer() bool { return c.nanomsgPairDiscordURL != "" } @@ -226,9 +273,10 @@ func (c *nodemonConfig) runAnalyzers( ) { alerts := runAnalyzer(cfg, es, logger, notifications) // L2 analyzer will only be run if the arguments are set - if cfg.L2nodeURL != "" && cfg.L2nodeName != "" { - alertL2 := l2.RunL2Analyzer(ctx, logger, cfg.L2nodeName, cfg.L2nodeURL) - mergedAlerts := merge(alerts, alertL2) + if cfg.l2.present() { + alertL2 := l2.RunL2Analyzers(ctx, logger, cfg.l2.Nodes()) + // merge alerts from different analyzers, wait till both are done + mergedAlerts := tools.FanIn(alerts, alertL2) alerts = mergedAlerts } runMessagingServices(ctx, cfg, alerts, logger, ns, es, pew) diff --git a/pkg/analysis/l2/l2analyzer.go b/pkg/analysis/l2/l2analyzer.go index 4e1b212d..7d3e6594 100644 --- a/pkg/analysis/l2/l2analyzer.go +++ b/pkg/analysis/l2/l2analyzer.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "fmt" "io" "net/http" urlPackage "net/url" @@ -13,6 +12,7 @@ import ( "time" "nodemon/pkg/entities" + "nodemon/pkg/tools" "go.uber.org/zap" ) @@ -32,11 +32,11 @@ func hexStringToInt(hexString string) (int64, error) { return strconv.ParseInt(hexString, 16, 64) } -func collectL2Height(ctx context.Context, url string, ch chan<- uint64, logger *zap.Logger) { +func collectL2Height(ctx context.Context, url string, logger *zap.Logger) (uint64, bool) { // Validate the URL if _, err := urlPackage.ParseRequestURI(url); err != nil { - logger.Error("Invalid URL", zap.String("url", url), zap.Error(err)) - return + logger.Error("Invalid node URL", zap.Error(err), zap.String("nodeURL", url)) + return 0, false } requestBody, err := json.Marshal(map[string]interface{}{ @@ -46,60 +46,73 @@ func collectL2Height(ctx context.Context, url string, ch chan<- uint64, logger * "params": []interface{}{}, }) if err != nil { - logger.Error("Failed to build a request body for l2 node", zap.Error(err)) - return + logger.Error("Failed to build a request body for l2 node", zap.Error(err), zap.String("nodeURL", url)) + return 0, false } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(requestBody)) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(requestBody)) if err != nil { - logger.Error("Failed to create a HTTP request to l2 node", zap.Error(err)) - return + logger.Error("Failed to create a HTTP request to l2 node", zap.Error(err), zap.String("nodeURL", url)) + return 0, false } httpClient := http.Client{Timeout: l2HeightRequestTimeout} resp, err := httpClient.Do(req) if err != nil { - logger.Error("Failed to send a request to l2 node", zap.Error(err)) - return + logger.Error("Failed to send a request to l2 node", zap.Error(err), zap.String("nodeURL", url)) + return 0, false } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - logger.Error("Failed to read response body from l2 node", zap.Error(err)) - return + logger.Error("Failed to read response body from l2 node", zap.Error(err), zap.String("nodeURL", url)) + return 0, false } var response Response err = json.Unmarshal(body, &response) if err != nil { - logger.Error("Failed unmarshalling response:", zap.Error(err)) - return + logger.Error("Failed unmarshalling response", zap.Error(err), zap.String("nodeURL", url)) + return 0, false } height, err := hexStringToInt(response.Result) if err != nil { - logger.Error("Failed converting hex string to integer:", zap.Error(err)) - return + logger.Error("Failed converting hex string to integer", zap.Error(err), zap.String("nodeURL", url)) + return 0, false } if height < 0 { - logger.Error("The received height is negative, " + strconv.Itoa(int(height))) + logger.Error("The received height is negative", zap.Int64("height", height), zap.String("nodeURL", url)) + return 0, false } - ch <- uint64(height) + return uint64(height), true } -func RunL2Analyzer( - ctx context.Context, - zap *zap.Logger, - nodeURL string, - nodeName string, -) <-chan entities.Alert { +type Node struct { + URL string + Name string +} + +func runCollector(ctx context.Context, nodeURL string, logger *zap.Logger) <-chan uint64 { + collectAndSend := func(heightCh chan<- uint64) { + height, ok := collectL2Height(ctx, nodeURL, logger) + if !ok { + return // failed to collect height + } + logger.Info("L2 height collected", zap.Uint64("height", height), zap.String("nodeURL", nodeURL)) + select { + case heightCh <- height: + case <-ctx.Done(): + } + } collector := func(heightCh chan<- uint64) { + defer close(heightCh) ticker := time.NewTicker(time.Minute) defer ticker.Stop() - defer close(heightCh) + collectAndSend(heightCh) // collect height just after starting for { select { case <-ticker.C: - collectL2Height(ctx, nodeURL, heightCh, zap) + collectAndSend(heightCh) // collect height every minute case <-ctx.Done(): return } @@ -107,7 +120,10 @@ func RunL2Analyzer( } heightCh := make(chan uint64) go collector(heightCh) + return heightCh +} +func RunL2Analyzer(ctx context.Context, zap *zap.Logger, node Node) <-chan entities.Alert { analyzer := func(alertsL2 chan<- entities.Alert, heightCh <-chan uint64) { alertTimer := time.NewTimer(l2NodesSameHeightTimerDuration) defer alertTimer.Stop() @@ -125,17 +141,44 @@ func RunL2Analyzer( alertTimer.Reset(l2NodesSameHeightTimerDuration) } case <-alertTimer.C: - zap.Info(fmt.Sprintf("Alert: Height of an l2 node %s didn't change in 5 minutes, node url:%s", - nodeName, nodeURL, - )) - alertsL2 <- entities.NewL2StuckAlert(time.Now().Unix(), lastHeight, nodeName) + zap.Sugar().Infof("Alert: Height of an l2 node %s didn't change in 5 minutes, node url:%s", + node.Name, node.URL, + ) + alert := entities.NewL2StuckAlert(time.Now().Unix(), lastHeight, node.Name) + select { + case alertsL2 <- alert: + case <-ctx.Done(): + return + } alertTimer.Reset(l2NodesSameHeightTimerDuration) case <-ctx.Done(): return } } } + + heightCh := runCollector(ctx, node.URL, zap) alertsL2 := make(chan entities.Alert) go analyzer(alertsL2, heightCh) return alertsL2 } + +func RunL2Analyzers( + ctx context.Context, + zap *zap.Logger, + nodes []Node, +) <-chan entities.Alert { + // intentionally not using tools.FanInSeqCtx to avoid context propagation + return tools.FanInSeq(func(yield func(<-chan entities.Alert) bool) { + if len(nodes) == 0 { + zap.Warn("No l2 nodes to analyze") + return + } + for _, node := range nodes { + alertsL2 := RunL2Analyzer(ctx, zap, node) + if !yield(alertsL2) { + return + } + } + }) +} diff --git a/pkg/tools/processing.go b/pkg/tools/processing.go new file mode 100644 index 00000000..df0eb115 --- /dev/null +++ b/pkg/tools/processing.go @@ -0,0 +1,56 @@ +package tools + +import ( + "context" + "iter" + "slices" + "sync" +) + +func FanInSeqCtx[T any](ctx context.Context, inputs iter.Seq[<-chan T]) <-chan T { + fanInFunc := func(ctx context.Context, wg *sync.WaitGroup, out chan<- T, in <-chan T) { + defer wg.Done() + // will keep working until the input channels are closed or the context is done + for { + select { + case <-ctx.Done(): + return + case alert, ok := <-in: + if !ok { + return + } + select { + case <-ctx.Done(): + return + case out <- alert: + } + } + } + } + + wg := new(sync.WaitGroup) + out := make(chan T) + for in := range inputs { + wg.Add(1) + go fanInFunc(ctx, wg, out, in) + } + + go func() { + defer close(out) + wg.Wait() + }() + + return out +} + +func FanInSeq[T any](inputs iter.Seq[<-chan T]) <-chan T { + return FanInSeqCtx(context.Background(), inputs) +} + +func FanInCtx[T any](ctx context.Context, inputs ...<-chan T) <-chan T { + return FanInSeqCtx(ctx, slices.Values(inputs)) +} + +func FanIn[T any](inputs ...<-chan T) <-chan T { + return FanInCtx(context.Background(), inputs...) +}