Skip to content

Commit

Permalink
Support multiple L2 nodes for monitoring process (#297)
Browse files Browse the repository at this point in the history
Now it's possible to analyze multiple L2 nodes. Several bugs fixed.
  • Loading branch information
nickeskov authored Oct 10, 2024
1 parent 744d591 commit d318200
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 75 deletions.
136 changes: 92 additions & 44 deletions cmd/nodemon/nodemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ package main

import (
"context"
"errors"
stderrs "errors"
"flag"
"log"
"net/url"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/pkg/errors"

"nodemon/pkg/analysis/l2"

"go.uber.org/zap"
Expand All @@ -38,7 +40,7 @@ const (
)

var (
errInvalidParameters = errors.New("invalid parameters")
errInvalidParameters = stderrs.New("invalid parameters")
)

func main() {
Expand All @@ -48,16 +50,89 @@ func main() {
)
if err := run(); err != nil {
switch {
case errors.Is(err, context.Canceled):
case stderrs.Is(err, context.Canceled):
os.Exit(contextCanceledExitCode)
case errors.Is(err, errInvalidParameters):
case stderrs.Is(err, errInvalidParameters):
os.Exit(invalidParametersExitCode)
default:
log.Fatal(err)
}
}
}

type nodemonL2Config struct {
L2nodeURLs string
L2nodeNames string
}

func newNodemonL2Config() *nodemonL2Config {
c := new(nodemonL2Config)
tools.StringVarFlagWithEnv(&c.L2nodeURLs, "l2-urls", "",
"List of Waves L2 Blockchain nodes URLs to monitor. Provide space separated list of REST API URLs here.")
tools.StringVarFlagWithEnv(&c.L2nodeNames, "l2-names", "",
"List of Waves L2 Blockchain nodes names to monitor. Provide space separated list of nodes names here. "+
"If not provided, URLs will be used as names.")
return c
}

func (c *nodemonL2Config) present() bool {
return c.L2nodeURLs != ""
}

func (c *nodemonL2Config) Nodes() []l2.Node {
if !c.present() {
return nil
}
urls := strings.Fields(c.L2nodeURLs)
names := strings.Fields(c.L2nodeNames)
out := make([]l2.Node, 0, len(urls))
for i, nodeURL := range urls {
name := nodeURL
if len(names) == len(urls) {
name = names[i]
}
out = append(out, l2.Node{URL: nodeURL, Name: name})
}
return out
}

func validateURLs(urls []string) error {
var errs []error
for i, nodeURL := range urls {
if _, err := url.ParseRequestURI(nodeURL); err != nil {
errs = append(errs, errors.Wrapf(err, "%d-th URL '%s' is invalid", i+1, nodeURL))
}
}
return stderrs.Join(errs...)
}

func (c *nodemonL2Config) validate(logger *zap.Logger) error {
if !c.present() {
return nil
}
var (
urls = strings.Fields(c.L2nodeURLs)
names = strings.Fields(c.L2nodeNames)
)
if len(names) != 0 && len(urls) != len(names) {
logger.Sugar().Errorf("L2 node URLs and names should have the same length: names=%d, URLs=%d",
len(names), len(urls),
)
return errInvalidParameters
}
if err := validateURLs(urls); err != nil {
logger.Error("Invalid L2 node URL", zap.Error(err))
return errInvalidParameters
}
for i, nodeName := range names {
if nodeName == "" {
logger.Sugar().Errorf("%d-th node name is empty", i+1)
return errInvalidParameters
}
}
return nil
}

type nodemonVaultConfig struct {
address string
user string
Expand All @@ -66,10 +141,6 @@ type nodemonVaultConfig struct {
secretPath string
}

func (n *nodemonVaultConfig) present() bool {
return n.address != ""
}

func newNodemonVaultConfig() *nodemonVaultConfig {
c := new(nodemonVaultConfig)
tools.StringVarFlagWithEnv(&c.address, "vault-address", "", "Vault server address.")
Expand All @@ -82,6 +153,10 @@ func newNodemonVaultConfig() *nodemonVaultConfig {
return c
}

func (n *nodemonVaultConfig) present() bool {
return n.address != ""
}

func (n *nodemonVaultConfig) validate(logger *zap.Logger) error {
if n.address == "" { // skip further validation
return nil
Expand Down Expand Up @@ -122,14 +197,15 @@ type nodemonConfig struct {
logLevel string
development bool
vault *nodemonVaultConfig
l2 *nodemonL2Config
}

func newNodemonConfig() *nodemonConfig {
c := new(nodemonConfig)
tools.StringVarFlagWithEnv(&c.storage, "storage",
".nodes.json", "Path to storage. Default value is \".nodes.json\"")
tools.StringVarFlagWithEnv(&c.nodes, "nodes", "",
"Initial list of Waves Blockchain nodes to monitor. Provide comma separated list of REST API URLs here.")
"Initial list of Waves Blockchain nodes to monitor. Provide space separated list of REST API URLs here.")
tools.StringVarFlagWithEnv(&c.bindAddress, "bind", ":8080",
"Local network address to bind the HTTP API of the service on. Default value is \":8080\".")
tools.DurationVarFlagWithEnv(&c.interval, "interval",
Expand All @@ -151,12 +227,8 @@ func newNodemonConfig() *nodemonConfig {
tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.")
tools.StringVarFlagWithEnv(&c.logLevel, "log-level", "INFO",
"Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level INFO.")
tools.StringVarFlagWithEnv(&c.L2nodeURL, "l2-node-url", "",
"")
tools.StringVarFlagWithEnv(&c.L2nodeName, "l2-nodes", "",
"List of Waves L2 Blockchain nodes to monitor. Provide comma separated list of REST API URLs here.")

c.vault = newNodemonVaultConfig()
c.l2 = newNodemonL2Config()
return c
}

Expand All @@ -183,32 +255,7 @@ func (c *nodemonConfig) validate(logger *zap.Logger) error {
logger.Error("Invalid base target threshold", zap.Uint64("threshold", c.baseTargetThreshold))
return errInvalidParameters
}
return c.vault.validate(logger)
}

func merge(channels ...<-chan entities.Alert) <-chan entities.Alert {
fanInFunc := func(wg *sync.WaitGroup, out chan<- entities.Alert, in <-chan entities.Alert) {
defer wg.Done()
// will keep working until the input channels are closed
for alert := range in {
out <- alert
}
}

wg := new(sync.WaitGroup)
wg.Add(len(channels))
out := make(chan entities.Alert)

for _, ch := range channels {
go fanInFunc(wg, out, ch)
}

go func() {
wg.Wait()
close(out)
}()

return out
return stderrs.Join(c.vault.validate(logger), c.l2.validate(logger))
}

func (c *nodemonConfig) runDiscordPairServer() bool { return c.nanomsgPairDiscordURL != "" }
Expand All @@ -226,9 +273,10 @@ func (c *nodemonConfig) runAnalyzers(
) {
alerts := runAnalyzer(cfg, es, logger, notifications)
// L2 analyzer will only be run if the arguments are set
if cfg.L2nodeURL != "" && cfg.L2nodeName != "" {
alertL2 := l2.RunL2Analyzer(ctx, logger, cfg.L2nodeName, cfg.L2nodeURL)
mergedAlerts := merge(alerts, alertL2)
if cfg.l2.present() {
alertL2 := l2.RunL2Analyzers(ctx, logger, cfg.l2.Nodes())
// merge alerts from different analyzers, wait till both are done
mergedAlerts := tools.FanIn(alerts, alertL2)
alerts = mergedAlerts
}
runMessagingServices(ctx, cfg, alerts, logger, ns, es, pew)
Expand Down
Loading

0 comments on commit d318200

Please sign in to comment.