From da846bb516e8fb1188e618c1ddbc062e0a73d4c8 Mon Sep 17 00:00:00 2001 From: Aleksandr Dolgavin Date: Mon, 23 Dec 2024 12:00:04 -0600 Subject: [PATCH] Network switch mangos to nats (#306) * Changed pubsub communication to nats * Moved the nats server to cmd * Added a nodemon-built-in optional messaging server, dockerfile and make command * Fixed subscription to alerts * Fixes. * Add explicit connection timeout NATS for clients. * Updated telegram and discord bot flags for NATS connection. * Update docs for telegram and discord bots. * Change nodemon flags. * Update 'natsConnectionsTimeoutDefault' const. * Update 'cmd/nodemon/README.md'. * Refactor topics naming - Now topic names can be created only with special functions - Created different topics for telegram and discord bots * Run nats embedded server before messaging services start. * Fix 'RunNatsMessagingServer' func Also changed default param for embedded NATS server * Remove default 'scheme' value for 'nodemon'. * Update 'Dockerfile-messaging-server'. * Remove 'messaging-server' application and its 'Dockerfile-messaging-server'. * Added scheme validation for nodemon --------- Co-authored-by: Nikolay Eskov --- Dockerfile-nodemon | 4 +- cmd/bots/discord/README.md | 8 +- cmd/bots/discord/discord.go | 23 +- cmd/bots/internal/common/environment.go | 92 ++++--- cmd/bots/internal/common/initial/initial.go | 6 +- cmd/bots/internal/common/messaging/bots.go | 5 +- .../internal/common/messaging/pair_client.go | 163 +++++-------- .../common/messaging/pubsub_client.go | 71 ++---- cmd/bots/telegram/README.md | 15 +- cmd/bots/telegram/telegram.go | 25 +- cmd/nodemon/README.md | 16 +- cmd/nodemon/nodemon.go | 224 +++++++++++++----- go.mod | 14 +- go.sum | 36 +-- pkg/messaging/messaging_server.go | 64 +++++ pkg/messaging/pair/server.go | 111 ++++----- pkg/messaging/pubsub/server.go | 41 ++-- pkg/messaging/topics.go | 27 +++ 18 files changed, 541 insertions(+), 404 deletions(-) create mode 100644 pkg/messaging/messaging_server.go create mode 100644 pkg/messaging/topics.go diff --git a/Dockerfile-nodemon b/Dockerfile-nodemon index c114a801..2597e787 100644 --- a/Dockerfile-nodemon +++ b/Dockerfile-nodemon @@ -32,8 +32,8 @@ RUN apk add --no-cache bind-tools USER $APP_USER WORKDIR ${APP} -# Considered as a default HTTP API Port -EXPOSE 8080 +# Considered as a default HTTP API Port, NATS embedded server port +EXPOSE 8080, 4222 COPY --from=builder ${APP}/build/linux-amd64/nodemon ${APP}/nodemon diff --git a/cmd/bots/discord/README.md b/cmd/bots/discord/README.md index 3f748d97..e5b0b793 100644 --- a/cmd/bots/discord/README.md +++ b/cmd/bots/discord/README.md @@ -15,11 +15,9 @@ To do the same as environment variable form use _**UPPER_SNAKE_CASE**_ option na - _-discord-chat-id_ (string) — discord chat ID to send alerts through a specific chat - _-log-level_ (string) — Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level is INFO. (default "INFO") -- _-nano-msg-pair-discord-url_ (string) — Nanomsg IPC URL for pair socket (default - "ipc:///tmp/nano-msg-nodemon-pair.ipc"). Used for communication between the monitoring and bot services. -- _-nano-msg-pubsub-url_ (string) — Nanomsg IPC URL for pubsub socket (default - "ipc:///tmp/discord/nano-msg-nodemon-pubsub.ipc"). Used by the bot to subscribe to alerts generated by - the monitoring service. +- _-nats-msg-url_ (string) — NATS server URL for messaging (default "nats://127.0.0.1:4222"). + Used by the bot to subscribe to alerts generated by + the monitoring service and for communication between the monitoring and bot services. ## Build requirements diff --git a/cmd/bots/discord/discord.go b/cmd/bots/discord/discord.go index 8e375e30..de4cd0b5 100644 --- a/cmd/bots/discord/discord.go +++ b/cmd/bots/discord/discord.go @@ -15,6 +15,7 @@ import ( "nodemon/cmd/bots/internal/common/messaging" "nodemon/cmd/bots/internal/discord/handlers" "nodemon/internal" + generalMessaging "nodemon/pkg/messaging" "nodemon/pkg/messaging/pair" "nodemon/pkg/tools" @@ -38,21 +39,19 @@ func main() { } type discordBotConfig struct { - nanomsgPubSubURL string - nanomsgPairURL string + natsMessagingURL string discordBotToken string discordChatID string logLevel string development bool bindAddress string + scheme string } func newDiscordBotConfigConfig() *discordBotConfig { c := new(discordBotConfig) - tools.StringVarFlagWithEnv(&c.nanomsgPubSubURL, "nano-msg-pubsub-url", - "ipc:///tmp/discord/nano-msg-nodemon-pubsub.ipc", "Nanomsg IPC URL for pubsub socket") - tools.StringVarFlagWithEnv(&c.nanomsgPairURL, "nano-msg-pair-discord-url", - "ipc:///tmp/nano-msg-nodemon-pair.ipc", "Nanomsg IPC URL for pair socket") + tools.StringVarFlagWithEnv(&c.natsMessagingURL, "nats-msg-url", + "nats://127.0.0.1:4222", "NATS server URL for messaging") tools.StringVarFlagWithEnv(&c.discordBotToken, "discord-bot-token", "", "The secret token used to authenticate the bot") tools.StringVarFlagWithEnv(&c.discordChatID, "discord-chat-id", @@ -62,6 +61,8 @@ func newDiscordBotConfigConfig() *discordBotConfig { tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.") tools.StringVarFlagWithEnv(&c.bindAddress, "bind", "", "Local network address to bind the HTTP API of the service on.") + tools.StringVarFlagWithEnv(&c.scheme, "scheme", "", + "Blockchain scheme i.e. mainnet, testnet, stagenet. Used in messaging service") return c } @@ -70,6 +71,10 @@ func (c *discordBotConfig) validate(zap *zap.Logger) error { zap.Error("discord bot token is required") return common.ErrInvalidParameters } + if c.scheme == "" { + zap.Error("the blockchain scheme must be specified") + return common.ErrInvalidParameters + } if c.discordChatID == "" { zap.Error("discord chat ID is required") return common.ErrInvalidParameters @@ -111,6 +116,7 @@ func runDiscordBot() error { logger, requestChan, responseChan, + cfg.scheme, ) if initErr != nil { return errors.Wrap(initErr, "failed to init discord bot") @@ -178,7 +184,7 @@ func runMessagingClients( responseChan chan pair.Response, ) { go func() { - clientErr := messaging.StartSubMessagingClient(ctx, cfg.nanomsgPubSubURL, discordBotEnv, logger) + clientErr := messaging.StartSubMessagingClient(ctx, cfg.natsMessagingURL, discordBotEnv, logger) if clientErr != nil { logger.Fatal("failed to start sub messaging client", zap.Error(clientErr)) return @@ -186,7 +192,8 @@ func runMessagingClients( }() go func() { - err := messaging.StartPairMessagingClient(ctx, cfg.nanomsgPairURL, requestChan, responseChan, logger) + topic := generalMessaging.DiscordBotRequestsTopic(cfg.scheme) + err := messaging.StartPairMessagingClient(ctx, cfg.natsMessagingURL, requestChan, responseChan, logger, topic) if err != nil { logger.Fatal("failed to start pair messaging client", zap.Error(err)) return diff --git a/cmd/bots/internal/common/environment.go b/cmd/bots/internal/common/environment.go index 1262c687..14536a2e 100644 --- a/cmd/bots/internal/common/environment.go +++ b/cmd/bots/internal/common/environment.go @@ -20,10 +20,9 @@ import ( "codnect.io/chrono" "github.com/bwmarrin/discordgo" + "github.com/nats-io/nats.go" "github.com/pkg/errors" "github.com/wavesplatform/gowaves/pkg/crypto" - "go.nanomsg.org/mangos/v3" - "go.nanomsg.org/mangos/v3/protocol" "go.uber.org/zap" "gopkg.in/telebot.v3" ) @@ -46,19 +45,25 @@ const ( var errUnknownAlertType = errors.New("received unknown alert type") +type AlertSubscription struct { + alertName entities.AlertName + subscription *nats.Subscription +} + type subscriptions struct { mu *sync.RWMutex - subs map[entities.AlertType]entities.AlertName + subs map[entities.AlertType]AlertSubscription } -func (s *subscriptions) Add(alertType entities.AlertType, alertName entities.AlertName) { +func (s *subscriptions) Add(alertType entities.AlertType, alertName entities.AlertName, + subscription *nats.Subscription) { s.mu.Lock() - s.subs[alertType] = alertName + s.subs[alertType] = AlertSubscription{alertName, subscription} s.mu.Unlock() } // Read returns alert name. -func (s *subscriptions) Read(alertType entities.AlertType) (entities.AlertName, bool) { +func (s *subscriptions) Read(alertType entities.AlertType) (AlertSubscription, bool) { s.mu.RLock() elem, ok := s.subs[alertType] s.mu.RUnlock() @@ -80,12 +85,14 @@ func (s *subscriptions) MapR(f func()) { type DiscordBotEnvironment struct { ChatID string Bot *discordgo.Session - subSocket protocol.Socket Subscriptions subscriptions zap *zap.Logger requestType chan<- pair.Request responsePairType <-chan pair.Response unhandledAlertMessages unhandledAlertMessages + scheme string + nc *nats.Conn + alertHandlerFunc func(msg *nats.Msg) } func NewDiscordBotEnvironment( @@ -94,18 +101,20 @@ func NewDiscordBotEnvironment( zap *zap.Logger, requestType chan<- pair.Request, responsePairType <-chan pair.Response, + scheme string, ) *DiscordBotEnvironment { return &DiscordBotEnvironment{ Bot: bot, ChatID: chatID, Subscriptions: subscriptions{ - subs: make(map[entities.AlertType]entities.AlertName), + subs: make(map[entities.AlertType]AlertSubscription), mu: new(sync.RWMutex), }, zap: zap, requestType: requestType, responsePairType: responsePairType, unhandledAlertMessages: newUnhandledAlertMessages(), + scheme: scheme, } } @@ -121,10 +130,6 @@ func (dscBot *DiscordBotEnvironment) Start() error { return nil } -func (dscBot *DiscordBotEnvironment) SetSubSocket(subSocket protocol.Socket) { - dscBot.subSocket = subSocket -} - func (dscBot *DiscordBotEnvironment) SendMessage(msg string) { _, err := dscBot.Bot.ChannelMessageSend(dscBot.ChatID, msg) if err != nil { @@ -189,16 +194,25 @@ func (dscBot *DiscordBotEnvironment) SendAlertMessage(msg generalMessaging.Alert dscBot.unhandledAlertMessages.Add(alertID, messageID) } +func (dscBot *DiscordBotEnvironment) SetNatsConnection(nc *nats.Conn) { + dscBot.nc = nc +} + +func (dscBot *DiscordBotEnvironment) SetAlertHandlerFunc(alertHandlerFunc func(msg *nats.Msg)) { + dscBot.alertHandlerFunc = alertHandlerFunc +} + func (dscBot *DiscordBotEnvironment) SubscribeToAllAlerts() error { for alertType, alertName := range entities.GetAllAlertTypesAndNames() { if dscBot.IsAlreadySubscribed(alertType) { return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName) } - err := dscBot.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}) + topic := generalMessaging.PubSubMsgTopic(dscBot.scheme, alertType) + subscription, err := dscBot.nc.Subscribe(topic, dscBot.alertHandlerFunc) if err != nil { - return err + return errors.Wrap(err, "failed to subscribe to alert") } - dscBot.Subscriptions.Add(alertType, alertName) + dscBot.Subscriptions.Add(alertType, alertName, subscription) dscBot.zap.Sugar().Infof("subscribed to %s", alertName) } return nil @@ -244,13 +258,15 @@ func (m unhandledAlertMessages) Delete(alertID crypto.Digest) { type TelegramBotEnvironment struct { ChatID int64 Bot *telebot.Bot - Mute bool // If it used elsewhere, should be protected by mutex - subSocket protocol.Socket + Mute bool // If it used elsewhere, should be protected by mutex. subscriptions subscriptions zap *zap.Logger requestType chan<- pair.Request responsePairType <-chan pair.Response unhandledAlertMessages unhandledAlertMessages + scheme string + nc *nats.Conn + alertHandlerFunc func(msg *nats.Msg) } func NewTelegramBotEnvironment( @@ -260,19 +276,21 @@ func NewTelegramBotEnvironment( zap *zap.Logger, requestType chan<- pair.Request, responsePairType <-chan pair.Response, + scheme string, ) *TelegramBotEnvironment { return &TelegramBotEnvironment{ Bot: bot, ChatID: chatID, Mute: mute, subscriptions: subscriptions{ - subs: make(map[entities.AlertType]entities.AlertName), + subs: make(map[entities.AlertType]AlertSubscription), mu: new(sync.RWMutex), }, zap: zap, requestType: requestType, responsePairType: responsePairType, unhandledAlertMessages: newUnhandledAlertMessages(), + scheme: scheme, } } @@ -289,10 +307,6 @@ func (tgEnv *TelegramBotEnvironment) Start(ctx context.Context) error { return nil } -func (tgEnv *TelegramBotEnvironment) SetSubSocket(subSocket protocol.Socket) { - tgEnv.subSocket = subSocket -} - func (tgEnv *TelegramBotEnvironment) SendAlertMessage(msg generalMessaging.AlertMessage) { if tgEnv.Mute { tgEnv.zap.Info("received an alert, but asleep now") @@ -425,11 +439,12 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAllAlerts() error { if tgEnv.IsAlreadySubscribed(alertType) { return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName) } - err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}) + topic := generalMessaging.PubSubMsgTopic(tgEnv.scheme, alertType) + subscription, err := tgEnv.nc.Subscribe(topic, tgEnv.alertHandlerFunc) if err != nil { - return err + return errors.Wrap(err, "failed to subscribe to alert") } - tgEnv.subscriptions.Add(alertType, alertName) + tgEnv.subscriptions.Add(alertType, alertName, subscription) tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName) } @@ -446,12 +461,14 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAlert(alertType entities.AlertTy return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName) } - err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}) + topic := generalMessaging.PubSubMsgTopic(tgEnv.scheme, alertType) + subscription, err := tgEnv.nc.Subscribe(topic, tgEnv.alertHandlerFunc) if err != nil { return errors.Wrap(err, "failed to subscribe to alert") } - tgEnv.subscriptions.Add(alertType, alertName) + tgEnv.subscriptions.Add(alertType, alertName, subscription) tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName) + return nil } @@ -464,20 +481,31 @@ func (tgEnv *TelegramBotEnvironment) UnsubscribeFromAlert(alertType entities.Ale if !tgEnv.IsAlreadySubscribed(alertType) { return errors.Errorf("failed to unsubscribe from %s, was not subscribed to it", alertName) } - - err := tgEnv.subSocket.SetOption(mangos.OptionUnsubscribe, []byte{byte(alertType)}) + alertSub, ok := tgEnv.subscriptions.Read(alertType) + if !ok { + return errors.Errorf("subscription didn't exist even though I was subscribed to it") + } + err := alertSub.subscription.Unsubscribe() if err != nil { - return errors.Wrap(err, "failed to unsubscribe from alert") + return errors.New("failed to unsubscribe from alert") } ok = tgEnv.IsAlreadySubscribed(alertType) if !ok { - return errors.New("failed to unsubscribe from alert: was not subscribed to it") + return errors.New("tried to unsubscribe from alert, but still subscribed to it") } tgEnv.subscriptions.Delete(alertType) tgEnv.zap.Sugar().Infof("Telegram bot unsubscribed from %s", alertName) return nil } +func (tgEnv *TelegramBotEnvironment) SetNatsConnection(nc *nats.Conn) { + tgEnv.nc = nc +} + +func (tgEnv *TelegramBotEnvironment) SetAlertHandlerFunc(alertHandlerFunc func(msg *nats.Msg)) { + tgEnv.alertHandlerFunc = alertHandlerFunc +} + type subscribed struct { AlertName string } @@ -501,7 +529,7 @@ func (tgEnv *TelegramBotEnvironment) SubscriptionsList() (string, error) { var subscribedTo []subscribed tgEnv.subscriptions.MapR(func() { for _, alertName := range tgEnv.subscriptions.subs { - s := subscribed{AlertName: string(alertName) + "\n\n"} + s := subscribed{AlertName: string(alertName.alertName) + "\n\n"} subscribedTo = append(subscribedTo, s) } }) diff --git a/cmd/bots/internal/common/initial/initial.go b/cmd/bots/internal/common/initial/initial.go index fcf0d939..de68a9ce 100644 --- a/cmd/bots/internal/common/initial/initial.go +++ b/cmd/bots/internal/common/initial/initial.go @@ -19,6 +19,7 @@ func InitTgBot(behavior string, logger *zap.Logger, requestType chan<- pair.Request, responsePairType <-chan pair.Response, + scheme string, ) (*common.TelegramBotEnvironment, error) { botSettings, err := config.NewTgBotSettings(behavior, webhookLocalAddress, publicURL, botToken) if err != nil { @@ -31,7 +32,7 @@ func InitTgBot(behavior string, logger.Sugar().Debugf("telegram chat id for sending alerts is %d", chatID) - tgBotEnv := common.NewTelegramBotEnvironment(bot, chatID, false, logger, requestType, responsePairType) + tgBotEnv := common.NewTelegramBotEnvironment(bot, chatID, false, logger, requestType, responsePairType, scheme) return tgBotEnv, nil } @@ -41,6 +42,7 @@ func InitDiscordBot( logger *zap.Logger, requestType chan<- pair.Request, responsePairType <-chan pair.Response, + scheme string, ) (*common.DiscordBotEnvironment, error) { bot, err := discordgo.New("Bot " + botToken) if err != nil { @@ -49,6 +51,6 @@ func InitDiscordBot( logger.Sugar().Debugf("discord chat id for sending alerts is %s", chatID) bot.Identify.Intents = discordgo.IntentsGuildMessages | discordgo.IntentsMessageContent - dscBotEnv := common.NewDiscordBotEnvironment(bot, chatID, logger, requestType, responsePairType) + dscBotEnv := common.NewDiscordBotEnvironment(bot, chatID, logger, requestType, responsePairType, scheme) return dscBotEnv, nil } diff --git a/cmd/bots/internal/common/messaging/bots.go b/cmd/bots/internal/common/messaging/bots.go index fe5aad83..56b941be 100644 --- a/cmd/bots/internal/common/messaging/bots.go +++ b/cmd/bots/internal/common/messaging/bots.go @@ -3,13 +3,14 @@ package messaging import ( "nodemon/pkg/messaging" - "go.nanomsg.org/mangos/v3/protocol" + "github.com/nats-io/nats.go" ) type Bot interface { SendAlertMessage(msg messaging.AlertMessage) SendMessage(msg string) + SetNatsConnection(nc *nats.Conn) + SetAlertHandlerFunc(alertHandlerFunc func(msg *nats.Msg)) SubscribeToAllAlerts() error - SetSubSocket(subSocket protocol.Socket) IsEligibleForAction(chatID string) bool } diff --git a/cmd/bots/internal/common/messaging/pair_client.go b/cmd/bots/internal/common/messaging/pair_client.go index 61d69585..370ec2d0 100644 --- a/cmd/bots/internal/common/messaging/pair_client.go +++ b/cmd/bots/internal/common/messaging/pair_client.go @@ -4,87 +4,37 @@ import ( "bytes" "context" "encoding/json" - stderr "errors" "fmt" "strings" "time" - "nodemon/pkg/entities" - "nodemon/pkg/messaging/pair" + "go.uber.org/zap" + "github.com/nats-io/nats.go" "github.com/pkg/errors" - "go.nanomsg.org/mangos/v3/protocol" - pairProtocol "go.nanomsg.org/mangos/v3/protocol/pair" - "go.uber.org/zap" -) -const ( - defaultSendTimeout = 5 * time.Second - defaultRecvTimeout = 5 * time.Second + "nodemon/pkg/entities" + "nodemon/pkg/messaging/pair" ) const defaultResponseTimeout = 5 * time.Second -type sendRecvDeadlineSocketWrapper struct { - protocol.Socket -} - -func (w sendRecvDeadlineSocketWrapper) Send(data []byte) (err error) { - defer func() { // reset deadline in any case - setOptErr := w.Socket.SetOption(protocol.OptionSendDeadline, time.Duration(0)) - if setOptErr != nil { - if err != nil { - err = stderr.Join(err, setOptErr) - } else { - err = setOptErr - } - } - }() - // set deadline for current send - if setOptErr := w.Socket.SetOption(protocol.OptionSendDeadline, defaultSendTimeout); setOptErr != nil { - return setOptErr - } - return w.Socket.Send(data) -} - -func (w sendRecvDeadlineSocketWrapper) Recv() (_ []byte, err error) { - defer func() { // reset deadline in any case - setOptErr := w.Socket.SetOption(protocol.OptionRecvDeadline, time.Duration(0)) - if setOptErr != nil { - if err != nil { - err = stderr.Join(err, setOptErr) - } else { - err = setOptErr - } - } - }() - // set deadline for current recv - if setOptErr := w.Socket.SetOption(protocol.OptionRecvDeadline, defaultRecvTimeout); setOptErr != nil { - return nil, setOptErr - } - return w.Socket.Recv() -} - func StartPairMessagingClient( ctx context.Context, - nanomsgURL string, + natsServerURL string, requestPair <-chan pair.Request, responsePair chan<- pair.Response, logger *zap.Logger, + botRequestsTopic string, ) error { - pairSocket, sockErr := pairProtocol.NewSocket() - if sockErr != nil { - return errors.Wrap(sockErr, "failed to get new pair socket") - } - defer func() { - _ = pairSocket.Close() // can be ignored, only possible error is protocol.ErrClosed - }() - - if err := pairSocket.Dial(nanomsgURL); err != nil { - return errors.Wrap(err, "failed to dial on pair socket") + nc, err := nats.Connect(natsServerURL, nats.Timeout(nats.DefaultTimeout)) + if err != nil { + zap.S().Fatalf("Failed to connect to nats server: %v", err) + return err } + defer nc.Close() - done := runPairLoop(ctx, requestPair, sendRecvDeadlineSocketWrapper{pairSocket}, logger, responsePair) + done := runPairLoop(ctx, requestPair, nc, logger, responsePair, botRequestsTopic) <-ctx.Done() logger.Info("stopping pair messaging service...") @@ -96,9 +46,10 @@ func StartPairMessagingClient( func runPairLoop( ctx context.Context, requestPair <-chan pair.Request, - pairSocket protocol.Socket, + nc *nats.Conn, logger *zap.Logger, responsePair chan<- pair.Response, + botRequestsTopic string, ) <-chan struct{} { ch := make(chan struct{}) go func(done chan<- struct{}) { @@ -114,7 +65,7 @@ func runPairLoop( message := &bytes.Buffer{} message.WriteByte(byte(request.RequestType())) - err := handlePairRequest(ctx, request, pairSocket, message, logger, responsePair) + err := handlePairRequest(ctx, request, nc, message, logger, responsePair, botRequestsTopic) if err != nil { logger.Error("failed to handle pair request", zap.String("request-type", fmt.Sprintf("(%T)", request)), @@ -130,28 +81,25 @@ func runPairLoop( func handlePairRequest( ctx context.Context, request pair.Request, - pairSocket protocol.Socket, + nc *nats.Conn, message *bytes.Buffer, logger *zap.Logger, responsePair chan<- pair.Response, + botRequestsTopic string, ) error { switch r := request.(type) { case *pair.NodesListRequest: - return handleNodesListRequest(ctx, pairSocket, message, logger, responsePair) + return handleNodesListRequest(ctx, nc, message, logger, responsePair, botRequestsTopic) case *pair.InsertNewNodeRequest: - return handleInsertNewNodeRequest(r.URL, message, pairSocket) + return handleInsertNewNodeRequest(r.URL, message, nc, botRequestsTopic) case *pair.UpdateNodeRequest: - return handleUpdateNodeRequest(r.URL, r.Alias, message, pairSocket) + return handleUpdateNodeRequest(r.URL, r.Alias, message, nc, botRequestsTopic) case *pair.DeleteNodeRequest: - message.WriteString(r.URL) - if sendErr := pairSocket.Send(message.Bytes()); sendErr != nil { - return errors.Wrap(sendErr, "failed handle delete node request and send data to a pair socket") - } - return nil + return handleDeleteNodeRequest(r.URL, message, nc, botRequestsTopic) case *pair.NodesStatusRequest: - return handleNodesStatementsRequest(ctx, r.URLs, message, pairSocket, logger, responsePair) + return handleNodesStatementsRequest(ctx, r.URLs, message, nc, logger, responsePair, botRequestsTopic) case *pair.NodeStatementRequest: - return handleNodesStatementRequest(ctx, r.URL, r.Height, logger, message, pairSocket, responsePair) + return handleNodesStatementRequest(ctx, r.URL, r.Height, logger, message, nc, responsePair, botRequestsTopic) default: return errors.New("unknown request type to pair socket") } @@ -163,8 +111,9 @@ func handleNodesStatementRequest( height int, logger *zap.Logger, message *bytes.Buffer, - pairSocket protocol.Socket, + nc *nats.Conn, responsePair chan<- pair.Response, + botRequestsTopic string, ) error { ctx, cancel := context.WithTimeout(ctx, defaultResponseTimeout) defer cancel() @@ -175,17 +124,14 @@ func handleNodesStatementRequest( } message.Write(req) - err = pairSocket.Send(message.Bytes()) - if err != nil { - return errors.Wrap(err, "failed to send a request to pair socket") - } - response, err := pairSocket.Recv() + response, err := nc.Request(botRequestsTopic, message.Bytes(), defaultResponseTimeout) if err != nil { - return errors.Wrap(err, "failed to receive message from pair socket") + return errors.Wrap(err, "failed to receive message from nodemon") } + nodeStatementResp := pair.NodeStatementResponse{} - err = json.Unmarshal(response, &nodeStatementResp) + err = json.Unmarshal(response.Data, &nodeStatementResp) if err != nil { return errors.Wrap(err, "failed to unmarshal message from pair socket") } @@ -195,7 +141,7 @@ func handleNodesStatementRequest( case <-ctx.Done(): logger.Error("failed to send node statement response, timeout exceeded", zap.Duration("timeout", defaultResponseTimeout), - zap.ByteString("node-statement-response", response), + zap.ByteString("node-statement-response", response.Data), zap.Error(ctx.Err()), ) return ctx.Err() @@ -206,25 +152,22 @@ func handleNodesStatementsRequest( ctx context.Context, urls []string, message *bytes.Buffer, - pairSocket protocol.Socket, + nc *nats.Conn, logger *zap.Logger, responsePair chan<- pair.Response, + botRequestsTopic string, ) error { ctx, cancel := context.WithTimeout(ctx, defaultResponseTimeout) defer cancel() message.WriteString(strings.Join(urls, ",")) - err := pairSocket.Send(message.Bytes()) - if err != nil { - return errors.Wrap(err, "failed to send a request to pair socket") - } - response, err := pairSocket.Recv() + response, err := nc.Request(botRequestsTopic, message.Bytes(), defaultResponseTimeout) if err != nil { - return errors.Wrap(err, "failed to receive message from pair socket") + return errors.Wrap(err, "failed to receive message from nodemon") } nodesStatusResp := pair.NodesStatementsResponse{} - err = json.Unmarshal(response, &nodesStatusResp) + err = json.Unmarshal(response.Data, &nodesStatusResp) if err != nil { return errors.Wrap(err, "failed to unmarshal message from pair socket") } @@ -234,30 +177,42 @@ func handleNodesStatementsRequest( case <-ctx.Done(): logger.Error("failed to send nodes status response, timeout exceeded", zap.Duration("timeout", defaultResponseTimeout), - zap.ByteString("nodes-status-response", response), + zap.ByteString("nodes-status-response", response.Data), zap.Error(ctx.Err()), ) return ctx.Err() } } -func handleUpdateNodeRequest(url, alias string, message *bytes.Buffer, pairSocket protocol.Socket) error { +func handleUpdateNodeRequest(url, alias string, message *bytes.Buffer, nc *nats.Conn, botRequestsTopic string) error { node := entities.Node{URL: url, Enabled: true, Alias: alias} nodeInfo, err := json.Marshal(node) if err != nil { return errors.Wrap(err, "failed to marshal node's info") } message.Write(nodeInfo) - err = pairSocket.Send(message.Bytes()) + // ignore a response + _, err = nc.Request(botRequestsTopic, message.Bytes(), defaultResponseTimeout) if err != nil { return errors.Wrap(err, "failed to send message") } return nil } -func handleInsertNewNodeRequest(url string, message *bytes.Buffer, pairSocket protocol.Socket) error { +func handleDeleteNodeRequest(url string, message *bytes.Buffer, nc *nats.Conn, botRequestsTopic string) error { message.WriteString(url) - err := pairSocket.Send(message.Bytes()) + // ignore response + _, err := nc.Request(botRequestsTopic, message.Bytes(), defaultResponseTimeout) + if err != nil { + return errors.Wrap(err, "failed to receive message from nodemon") + } + return nil +} + +func handleInsertNewNodeRequest(url string, message *bytes.Buffer, nc *nats.Conn, botRequestsTopic string) error { + message.WriteString(url) + // ignore a response + _, err := nc.Request(botRequestsTopic, message.Bytes(), defaultResponseTimeout) if err != nil { return errors.Wrap(err, "failed to send message") } @@ -266,25 +221,21 @@ func handleInsertNewNodeRequest(url string, message *bytes.Buffer, pairSocket pr func handleNodesListRequest( ctx context.Context, - pairSocket protocol.Socket, + nc *nats.Conn, message *bytes.Buffer, logger *zap.Logger, responsePair chan<- pair.Response, + botRequestsTopic string, ) error { ctx, cancel := context.WithTimeout(ctx, defaultResponseTimeout) defer cancel() - err := pairSocket.Send(message.Bytes()) - if err != nil { - return errors.Wrap(err, "failed to send message") - } - - response, err := pairSocket.Recv() + response, err := nc.Request(botRequestsTopic, message.Bytes(), defaultResponseTimeout) if err != nil { return errors.Wrap(err, "failed to receive message") } nodeList := pair.NodesListResponse{} - err = json.Unmarshal(response, &nodeList) + err = json.Unmarshal(response.Data, &nodeList) if err != nil { return errors.Wrap(err, "failed to unmarshal message") } @@ -294,7 +245,7 @@ func handleNodesListRequest( case <-ctx.Done(): logger.Error("failed to send nodes list response, timeout exceeded", zap.Duration("timeout", defaultResponseTimeout), - zap.ByteString("nodes-status-response", response), + zap.ByteString("nodes-status-response", response.Data), zap.Error(ctx.Err()), ) return ctx.Err() diff --git a/cmd/bots/internal/common/messaging/pubsub_client.go b/cmd/bots/internal/common/messaging/pubsub_client.go index 40af251b..b7a89579 100644 --- a/cmd/bots/internal/common/messaging/pubsub_client.go +++ b/cmd/bots/internal/common/messaging/pubsub_client.go @@ -3,76 +3,41 @@ package messaging import ( "context" + "github.com/nats-io/nats.go" "github.com/pkg/errors" - "go.nanomsg.org/mangos/v3/protocol" - "go.nanomsg.org/mangos/v3/protocol/sub" - _ "go.nanomsg.org/mangos/v3/transport/all" // registers all transports "go.uber.org/zap" "nodemon/pkg/messaging" ) -func StartSubMessagingClient(ctx context.Context, nanomsgURL string, bot Bot, logger *zap.Logger) error { - subSocket, sockErr := sub.NewSocket() - if sockErr != nil { - return sockErr +func StartSubMessagingClient(ctx context.Context, natsServerURL string, bot Bot, logger *zap.Logger) error { + // Connect to a NATS server + nc, err := nats.Connect(natsServerURL, nats.Timeout(nats.DefaultTimeout)) + if err != nil { + zap.S().Fatalf("Failed to connect to nats server: %v", err) + return err } - defer func() { - _ = subSocket.Close() // can be ignored, only possible error is protocol.ErrClosed - }() - - bot.SetSubSocket(subSocket) - - if dialErr := subSocket.Dial(nanomsgURL); dialErr != nil { - return dialErr + defer nc.Close() + bot.SetNatsConnection(nc) + alertHandlerFunc := func(msg *nats.Msg) { + hndlErr := handleReceivedMessage(msg.Data, bot) + if hndlErr != nil { + zap.S().Errorf("failed to handle received message from pubsub server %v", hndlErr) + } } + bot.SetAlertHandlerFunc(alertHandlerFunc) - if err := bot.SubscribeToAllAlerts(); err != nil { - return err + if subscrErr := bot.SubscribeToAllAlerts(); subscrErr != nil { + return subscrErr } - done := runSubLoop(ctx, subSocket, logger, bot) - <-ctx.Done() logger.Info("stopping sub messaging service...") - <-done logger.Info("sub messaging service finished") return nil } -func runSubLoop(ctx context.Context, subSocket protocol.Socket, logger *zap.Logger, bot Bot) <-chan struct{} { - sockCh := make(chan struct{}) - go func() { // run socket closer goroutine - defer close(sockCh) - <-ctx.Done() - _ = subSocket.Close() // can be ignored, only possible error is protocol.ErrClosed - }() - ch := make(chan struct{}) - go func(done chan<- struct{}, closedSock <-chan struct{}) { - defer func() { - <-closedSock - close(done) - }() - for { - if ctx.Err() != nil { - return - } - if err := recvMessage(subSocket, bot); err != nil { - if errors.Is(err, protocol.ErrClosed) { // socket is closed, this means that context is canceled - return - } - logger.Error("failed to receive message", zap.Error(err)) - } - } - }(ch, sockCh) - return ch -} - -func recvMessage(subSocket protocol.Socket, bot Bot) error { - msg, err := subSocket.Recv() // this operation is blocking, we have to close the socket to interrupt this block - if err != nil { - return errors.Wrap(err, "failed to receive message from sub socket") - } +func handleReceivedMessage(msg []byte, bot Bot) error { alertMsg, err := messaging.NewAlertMessageFromBytes(msg) if err != nil { return errors.Wrap(err, "failed to parse alert message from bytes") diff --git a/cmd/bots/telegram/README.md b/cmd/bots/telegram/README.md index 86452744..c504cc2b 100644 --- a/cmd/bots/telegram/README.md +++ b/cmd/bots/telegram/README.md @@ -9,21 +9,20 @@ To do the same as environment variable form use _**UPPER_SNAKE_CASE**_ option na ### List of supported options in kebab-case form -- _-behavior_ (string) — Behavior is either webhook or polling (default "webhook"). Communication used between - Telegarm and the bot +- _-behavior_ (string) — Behavior is either webhook or polling (default "webhook"). Communication used between + Telegram and the bot - _-bind_ (string) — Local network address to bind the HTTP API of the service on. - _-development_ (bool) — Development mode. It is used for zap logger. - _-log-level_ (string) — Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level INFO. (default "INFO") -- _-nano-msg-pair-telegram-url_ (string) — Nanomsg IPC URL for pair socket (default "ipc: - ///tmp/nano-msg-nodemon-pair.ipc"). Used for communication between the monitoring and bot services. -- _-nano-msg-pubsub-url_ (string) — Nanomsg IPC URL for pubsub socket (default "ipc: - ///tmp/telegram/nano-msg-nodemon-pubsub.ipc"). Used by the bot to subscribe to alerts generated by - the monitoring service. +- _-nats-msg-url_ (string) — Nats server URL for messaging (default "nats://127.0.0.1:4222"). + Used by the bot to subscribe to alerts generated by the monitoring service and + for communication between the monitoring and bot services. - _-public-url_ (string) — The public url (**for webhook only**) for Telegram to send events to the bot service. - _-telegram-chat-id_ (int) — Telegram chat ID to send alerts through a specific chat. - _-tg-bot-token_ (string) — The secret token used to authenticate the bot in Telegram. -- _-webhook-local-address_ (string) — The port (**for webhook only**) used for the webhook internal server (default ":8081") +- _-webhook-local-address_ (string) — The port (**for webhook only**) used for the webhook + internal server (default ":8081") ## Build requirements diff --git a/cmd/bots/telegram/telegram.go b/cmd/bots/telegram/telegram.go index db4c06ed..129e6eea 100644 --- a/cmd/bots/telegram/telegram.go +++ b/cmd/bots/telegram/telegram.go @@ -17,6 +17,7 @@ import ( "nodemon/cmd/bots/internal/telegram/config" "nodemon/cmd/bots/internal/telegram/handlers" "nodemon/internal" + generalMessaging "nodemon/pkg/messaging" "nodemon/pkg/messaging/pair" "nodemon/pkg/tools" @@ -40,8 +41,7 @@ func main() { } type telegramBotConfig struct { - nanomsgPubSubURL string - nanomsgPairURL string + natsMessagingURL string behavior string webhookLocalAddress string // only for webhook method publicURL string // only for webhook method @@ -50,14 +50,13 @@ type telegramBotConfig struct { logLevel string development bool bindAddress string + scheme string } func newTelegramBotConfig() *telegramBotConfig { c := new(telegramBotConfig) - tools.StringVarFlagWithEnv(&c.nanomsgPubSubURL, "nano-msg-pubsub-url", - "ipc:///tmp/telegram/nano-msg-nodemon-pubsub.ipc", "Nanomsg IPC URL for pubsub socket") - tools.StringVarFlagWithEnv(&c.nanomsgPairURL, "nano-msg-pair-telegram-url", - "ipc:///tmp/nano-msg-nodemon-pair.ipc", "Nanomsg IPC URL for pair socket") + tools.StringVarFlagWithEnv(&c.natsMessagingURL, "nats-msg-url", + "nats://127.0.0.1:4222", "NATS server URL for messaging") tools.StringVarFlagWithEnv(&c.behavior, "behavior", "webhook", "Behavior is either webhook or polling") tools.StringVarFlagWithEnv(&c.webhookLocalAddress, "webhook-local-address", @@ -73,6 +72,8 @@ func newTelegramBotConfig() *telegramBotConfig { tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.") tools.StringVarFlagWithEnv(&c.bindAddress, "bind", "", "Local network address to bind the HTTP API of the service on.") + tools.StringVarFlagWithEnv(&c.scheme, "scheme", + "", "Blockchain scheme i.e. mainnet, testnet, stagenet") return c } @@ -85,6 +86,10 @@ func (c *telegramBotConfig) validate(logger *zap.Logger) error { logger.Error("public url is required for webhook method") return common.ErrInvalidParameters } + if c.scheme == "" { + logger.Error("the blockchain scheme must be specified") + return common.ErrInvalidParameters + } if c.tgChatID == 0 { logger.Error("telegram chat ID is required") return common.ErrInvalidParameters @@ -121,8 +126,7 @@ func runTelegramBot() error { responseChan := make(chan pair.Response) tgBotEnv, initErr := initial.InitTgBot(cfg.behavior, cfg.webhookLocalAddress, cfg.publicURL, - cfg.tgBotToken, cfg.tgChatID, logger, requestChan, responseChan, - ) + cfg.tgBotToken, cfg.tgChatID, logger, requestChan, responseChan, cfg.scheme) if initErr != nil { logger.Fatal("failed to initialize telegram bot", zap.Error(initErr)) } @@ -178,14 +182,15 @@ func runMessagingClients( pairResponse chan<- pair.Response, ) { go func() { - err := messaging.StartSubMessagingClient(ctx, cfg.nanomsgPubSubURL, tgBotEnv, logger) + err := messaging.StartSubMessagingClient(ctx, cfg.natsMessagingURL, tgBotEnv, logger) if err != nil { logger.Fatal("failed to start sub messaging service", zap.Error(err)) } }() go func() { - err := messaging.StartPairMessagingClient(ctx, cfg.nanomsgPairURL, pairRequest, pairResponse, logger) + topic := generalMessaging.TelegramBotRequestsTopic(cfg.scheme) + err := messaging.StartPairMessagingClient(ctx, cfg.natsMessagingURL, pairRequest, pairResponse, logger, topic) if err != nil { logger.Fatal("failed to start pair messaging service", zap.Error(err)) } diff --git a/cmd/nodemon/README.md b/cmd/nodemon/README.md index 67d1713b..b94cc15e 100644 --- a/cmd/nodemon/README.md +++ b/cmd/nodemon/README.md @@ -9,7 +9,7 @@ To do the same as environment variable form use _**UPPER_SNAKE_CASE**_ option na ### List of supported options in kebab-case form -- _-api-read-timeout_ (duration) — HTTP API read timeout used by the monitoring API server. +- _-api-read-timeout_ (duration) — HTTP API read timeout used by the monitoring API server. Default value is 30s. (default 30s) - _-base-target-threshold_ (int) — Base target threshold used for base target alerts. Must be specified. - _-bind_ (string) — Local network address to bind the HTTP API of the service on. Default value is ":8080". (default ": @@ -19,21 +19,29 @@ To do the same as environment variable form use _**UPPER_SNAKE_CASE**_ option na - _-development_ (bool) — Development mode. It is used for zap logger. - _-log-level_ (string) — Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level INFO. (default "INFO") -- _-nano-msg-pair-discord-url_ (string) — Nanomsg IPC URL for pair socket. Used for communication with the discord bot. -- _-nano-msg-pair-telegram-url_ (string) — Nanomsg IPC URL for pair socket. Used for communication with the telegram bot. -- _-nano-msg-pubsub-url_ (string) — Nanomsg IPC URL for pubsub socket (default "ipc:///tmp/nano-msg-pubsub.ipc") +- _-nats-msg-url_ (string) — Nats URL for messaging (default "nats://127.0.0.1:4222"). + Used for communication with the discord bot, telegram bot and sending events for subscribers. +- _-nats-connection-timeout_ (string) — NATS connection to server timeout (default 5s). - _-nodes_ (string) — Initial list of Waves Blockchain nodes to monitor. Provide comma separated list of REST API URLs here. - _-retention_ (duration) — Events retention duration. Default value is 12h (default 12h0m0s) - _-storage_ (string) — Path to nodes storage. Will be **ignored** if _-vault-address_ is set. (default ".nodes.json") - _-timeout_ (duration) — Network timeout, seconds. Used by the poller to poll nodes with the timeout. Default value is 15 (default 15s) + +#### Optional parameters + - _-vault-address_ (string) — Vault server address. - _-vault-mount-path_ (string) — Vault mount path for nodemon nodes storage. (default "gonodemonitoring") - _-vault-password_ (string) — Vault user's password. - _-vault-secret-path_ (string) — Vault secret where nodemon nodes will be saved - _-vault-user_ (string) — Vault user. +- _-nats-server-enable_ (bool) — Enable NATS embedded server (default _false_) +- _-nats-server-address_ (string) — NATS embedded server address in form 'host:port' (default "127.0.0.1:4222") +- _-nats-server-max-payload_ (uint64) — NATS embedded server URL (default 1MB) +- _-nats-server-ready-timeout_ (duration) — NATS server 'ready for connections' timeout (default 10s) + ## Build requirements - `Make` utility diff --git a/cmd/nodemon/nodemon.go b/cmd/nodemon/nodemon.go index 9b7bd147..89305c71 100644 --- a/cmd/nodemon/nodemon.go +++ b/cmd/nodemon/nodemon.go @@ -12,15 +12,14 @@ import ( "syscall" "time" - "github.com/pkg/errors" - - "nodemon/internal" - "nodemon/pkg/analysis/l2" + "nodemon/pkg/messaging" "go.uber.org/zap" + "nodemon/internal" "nodemon/pkg/analysis" "nodemon/pkg/analysis/criteria" + "nodemon/pkg/analysis/l2" "nodemon/pkg/api" "nodemon/pkg/clients" "nodemon/pkg/entities" @@ -31,6 +30,8 @@ import ( "nodemon/pkg/storing/nodes" "nodemon/pkg/storing/specific" "nodemon/pkg/tools" + + "github.com/pkg/errors" ) const ( @@ -38,6 +39,9 @@ const ( defaultPollingInterval = 60 * time.Second defaultRetentionDuration = 12 * time.Hour defaultAPIReadTimeout = 30 * time.Second + + natsMaxPayloadSize int32 = 1024 * 1024 // 1 MB + natsConnectionsTimeoutDefault = 5 * time.Second ) var ( @@ -142,6 +146,13 @@ type nodemonVaultConfig struct { secretPath string } +type natsOptionalConfig struct { + enable bool + serverAddress string + maxPayload uint64 + readyForConnectionsTimeout time.Duration +} + func newNodemonVaultConfig() *nodemonVaultConfig { c := new(nodemonVaultConfig) tools.StringVarFlagWithEnv(&c.address, "vault-address", "", "Vault server address.") @@ -154,6 +165,18 @@ func newNodemonVaultConfig() *nodemonVaultConfig { return c } +func newNatsOptionalConfig() *natsOptionalConfig { + c := new(natsOptionalConfig) + tools.BoolVarFlagWithEnv(&c.enable, "nats-server-enable", false, "Enable NATS embedded server") + tools.StringVarFlagWithEnv(&c.serverAddress, "nats-server-address", + "127.0.0.1:4222", "NATS embedded server address in form 'host:port'") + tools.Uint64VarFlagWithEnv(&c.maxPayload, "nats-server-max-payload", uint64(natsMaxPayloadSize), + "Max server payload size in bytes") + tools.DurationVarFlagWithEnv(&c.readyForConnectionsTimeout, "nats-server-ready-timeout", + natsConnectionsTimeoutDefault, "NATS server 'ready for connections' timeout") + return c +} + func (n *nodemonVaultConfig) present() bool { return n.address != "" } @@ -182,23 +205,26 @@ func (n *nodemonVaultConfig) validate(logger *zap.Logger) error { } type nodemonConfig struct { - storage string - nodes string - L2nodeName string - L2nodeURL string - bindAddress string - interval time.Duration - timeout time.Duration - nanomsgPubSubURL string - nanomsgPairTelegramURL string - nanomsgPairDiscordURL string - retention time.Duration - apiReadTimeout time.Duration - baseTargetThreshold uint64 - logLevel string - development bool - vault *nodemonVaultConfig - l2 *nodemonL2Config + storage string + nodes string + L2nodeName string + L2nodeURL string + bindAddress string + interval time.Duration + timeout time.Duration + natsMessagingURL string + natsPairTelegram bool + natsPairDiscord bool + natsTimeout time.Duration + retention time.Duration + apiReadTimeout time.Duration + baseTargetThreshold uint64 + logLevel string + development bool + vault *nodemonVaultConfig + l2 *nodemonL2Config + scheme string + natsOptionalConfig *natsOptionalConfig } func newNodemonConfig() *nodemonConfig { @@ -215,21 +241,24 @@ func newNodemonConfig() *nodemonConfig { defaultNetworkTimeout, "Network timeout, seconds. Default value is 15") tools.Uint64VarFlagWithEnv(&c.baseTargetThreshold, "base-target-threshold", 0, "Base target threshold. Must be specified") - tools.StringVarFlagWithEnv(&c.nanomsgPubSubURL, "nano-msg-pubsub-url", - "ipc:///tmp/nano-msg-pubsub.ipc", "Nanomsg IPC URL for pubsub socket") - tools.StringVarFlagWithEnv(&c.nanomsgPairTelegramURL, "nano-msg-pair-telegram-url", - "", "Nanomsg IPC URL for pair socket") - tools.StringVarFlagWithEnv(&c.nanomsgPairDiscordURL, "nano-msg-pair-discord-url", - "", "Nanomsg IPC URL for pair socket") + tools.StringVarFlagWithEnv(&c.natsMessagingURL, "nats-msg-url", + "nats://127.0.0.1:4222", "Nats URL for messaging") + tools.DurationVarFlagWithEnv(&c.natsTimeout, "nats-connection-timeout", + natsConnectionsTimeoutDefault, "NATS connection to server timeout") tools.DurationVarFlagWithEnv(&c.retention, "retention", defaultRetentionDuration, "Events retention duration. Default value is 12h") tools.DurationVarFlagWithEnv(&c.apiReadTimeout, "api-read-timeout", defaultAPIReadTimeout, "HTTP API read timeout. Default value is 30s.") tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.") + tools.BoolVarFlagWithEnv(&c.natsPairDiscord, "bot-requests-discord", false, "Should let discord bot send commands?") + tools.BoolVarFlagWithEnv(&c.natsPairTelegram, "bot-requests-telegram", true, "Should let telegram bot send commands?") tools.StringVarFlagWithEnv(&c.logLevel, "log-level", "INFO", "Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level INFO.") + tools.StringVarFlagWithEnv(&c.scheme, "scheme", + "", "Blockchain scheme i.e. mainnet, testnet, stagenet") c.vault = newNodemonVaultConfig() c.l2 = newNodemonL2Config() + c.natsOptionalConfig = newNatsOptionalConfig() return c } @@ -244,6 +273,10 @@ func (c *nodemonConfig) validate(logger *zap.Logger) error { logger.Error("Invalid polling interval", zap.Stringer("interval", c.interval)) return errInvalidParameters } + if c.scheme == "" { + logger.Error("Empty blockchain scheme", zap.String("scheme", c.scheme)) + return errInvalidParameters + } if c.timeout <= 0 { logger.Error("Invalid network timeout", zap.Stringer("timeout", c.timeout)) return errInvalidParameters @@ -259,19 +292,17 @@ func (c *nodemonConfig) validate(logger *zap.Logger) error { return stderrs.Join(c.vault.validate(logger), c.l2.validate(logger)) } -func (c *nodemonConfig) runDiscordPairServer() bool { return c.nanomsgPairDiscordURL != "" } +func (c *nodemonConfig) runDiscordPairServer() bool { return c.natsPairDiscord } -func (c *nodemonConfig) runTelegramPairServer() bool { return c.nanomsgPairTelegramURL != "" } +func (c *nodemonConfig) runTelegramPairServer() bool { return c.natsPairTelegram } func (c *nodemonConfig) runAnalyzers( ctx context.Context, cfg *nodemonConfig, es *events.Storage, - ns nodes.Storage, logger *zap.Logger, - pew specific.PrivateNodesEventsWriter, notifications <-chan entities.NodesGatheringNotification, -) { +) <-chan entities.Alert { alerts := runAnalyzer(cfg, es, logger, notifications) // L2 analyzer will only be run if the arguments are set if cfg.l2.present() { @@ -280,7 +311,7 @@ func (c *nodemonConfig) runAnalyzers( mergedAlerts := tools.FanIn(alerts, alertL2) alerts = mergedAlerts } - runMessagingServices(ctx, cfg, alerts, logger, ns, es, pew) + return alerts } func run() error { @@ -307,26 +338,11 @@ func run() error { ctx, done := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer done() - ns, err := createNodesStorage(ctx, cfg, logger) - if err != nil { - return err - } - defer func(cs nodes.Storage) { - if closeErr := cs.Close(); closeErr != nil { - logger.Error("failed to close nodes storage", zap.Error(closeErr)) - } - }(ns) - - es, err := events.NewStorage(cfg.retention, logger) + ns, es, err := initializeStorages(ctx, cfg, logger) if err != nil { - logger.Error("failed to initialize events storage", zap.Error(err)) return err } - defer func(es *events.Storage) { - if closeErr := es.Close(); closeErr != nil { - logger.Error("failed to close events storage", zap.Error(closeErr)) - } - }(es) + defer closeStorages(ns, es, logger) scraper, err := scraping.NewScraper(ns, es, cfg.interval, cfg.timeout, logger) if err != nil { @@ -340,26 +356,108 @@ func run() error { return err } + shutdownFn, serviceErr := startServices(ctx, cfg, ns, es, scraper, privateNodesHandler, atom, logger) + if serviceErr != nil { + return serviceErr + } + defer shutdownFn() + + <-ctx.Done() + logger.Info("Shutting down") + return nil +} + +func initializeStorages(ctx context.Context, cfg *nodemonConfig, + logger *zap.Logger) (nodes.Storage, *events.Storage, error) { + ns, err := createNodesStorage(ctx, cfg, logger) + if err != nil { + logger.Error("failed to initialize nodes storage", zap.Error(err)) + return nil, nil, err + } + + es, err := events.NewStorage(cfg.retention, logger) + if err != nil { + logger.Error("failed to initialize events storage", zap.Error(err)) + return nil, nil, err + } + + return ns, es, nil +} + +func closeStorages(ns nodes.Storage, es *events.Storage, logger *zap.Logger) { + if err := ns.Close(); err != nil { + logger.Error("failed to close nodes storage", zap.Error(err)) + } + if err := es.Close(); err != nil { + logger.Error("failed to close events storage", zap.Error(err)) + } +} + +type shutdownFunc func() + +func startServices( //nolint:nonamedreturns // needs in defer + ctx context.Context, + cfg *nodemonConfig, + ns nodes.Storage, + es *events.Storage, + scraper *scraping.Scraper, + privateNodesHandler *specific.PrivateNodesHandler, + atom *zap.AtomicLevel, + logger *zap.Logger, +) (_ shutdownFunc, runErr error) { notifications := scraper.Start(ctx) - notifications = privateNodesHandler.Run(notifications) // wraps scrapper's notification with private nodes handler - pew := privateNodesHandler.PrivateNodesEventsWriter() + notifications = privateNodesHandler.Run(notifications) // wraps scraper's notifications + pew := privateNodesHandler.PrivateNodesEventsWriter() a, err := api.NewAPI(cfg.bindAddress, ns, es, cfg.apiReadTimeout, logger, pew, atom, cfg.development) if err != nil { logger.Error("failed to initialize API", zap.Error(err)) - return err + return nil, err } + if apiErr := a.Start(); apiErr != nil { logger.Error("failed to start API", zap.Error(apiErr)) - return apiErr + return nil, apiErr + } + defer func() { + if runErr != nil { // stop API if any error occurred + a.Shutdown() + } + }() + + shutdownFn := a.Shutdown + if nCfg := cfg.natsOptionalConfig; nCfg.enable { + natsShutdown, nErr := messaging.RunNatsMessagingServer( + nCfg.serverAddress, + logger, + nCfg.maxPayload, + nCfg.readyForConnectionsTimeout, + ) + if nErr != nil { + logger.Error("failed to start NATS server", zap.Error(nErr)) + return nil, nErr + } + defer func() { + if runErr != nil { // stop NATS server if any error occurred + natsShutdown() + } + }() + shutdownFn = chainShutdownFuncs(shutdownFn, natsShutdown) // add NATS server shutdown to the chain } - cfg.runAnalyzers(ctx, cfg, es, ns, logger, pew, notifications) + alerts := cfg.runAnalyzers(ctx, cfg, es, logger, notifications) - <-ctx.Done() - a.Shutdown() - logger.Info("shutting down") - return nil + runMessagingServices(ctx, cfg, alerts, logger, ns, es, pew) + + return shutdownFn, err +} + +func chainShutdownFuncs(fns ...shutdownFunc) shutdownFunc { + return func() { + for _, fn := range fns { + fn() + } + } } func createNodesStorage(ctx context.Context, cfg *nodemonConfig, logger *zap.Logger) (nodes.Storage, error) { @@ -415,7 +513,7 @@ func runMessagingServices( pew specific.PrivateNodesEventsWriter, ) { go func() { - pubSubErr := pubsub.StartPubMessagingServer(ctx, cfg.nanomsgPubSubURL, alerts, logger) + pubSubErr := pubsub.StartPubMessagingServer(ctx, cfg.natsMessagingURL, alerts, logger, cfg.scheme) if pubSubErr != nil { logger.Fatal("failed to start pub messaging server", zap.Error(pubSubErr)) } @@ -423,7 +521,8 @@ func runMessagingServices( if cfg.runTelegramPairServer() { go func() { - pairErr := pair.StartPairMessagingServer(ctx, cfg.nanomsgPairTelegramURL, ns, es, pew, logger) + telegramTopic := messaging.TelegramBotRequestsTopic(cfg.scheme) + pairErr := pair.StartPairMessagingServer(ctx, cfg.natsMessagingURL, ns, es, pew, logger, telegramTopic) if pairErr != nil { logger.Fatal("failed to start pair messaging server", zap.Error(pairErr)) } @@ -432,7 +531,8 @@ func runMessagingServices( if cfg.runDiscordPairServer() { go func() { - pairErr := pair.StartPairMessagingServer(ctx, cfg.nanomsgPairDiscordURL, ns, es, pew, logger) + discordTopic := messaging.DiscordBotRequestsTopic(cfg.scheme) + pairErr := pair.StartPairMessagingServer(ctx, cfg.natsMessagingURL, ns, es, pew, logger, discordTopic) if pairErr != nil { logger.Fatal("failed to start pair messaging server", zap.Error(pairErr)) } diff --git a/go.mod b/go.mod index ca616090..0246422a 100644 --- a/go.mod +++ b/go.mod @@ -8,20 +8,20 @@ require ( github.com/go-chi/chi v4.1.2+incompatible github.com/hashicorp/vault/api v1.15.0 github.com/hashicorp/vault/api/auth/userpass v0.8.0 + github.com/nats-io/nats-server/v2 v2.10.22 + github.com/nats-io/nats.go v1.37.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.5 github.com/stoewer/go-strcase v1.3.0 github.com/stretchr/testify v1.10.0 github.com/tidwall/buntdb v1.3.2 github.com/wavesplatform/gowaves v0.10.7-0.20240927070807-c256c5d98bfa - go.nanomsg.org/mangos/v3 v3.4.2 go.uber.org/zap v1.27.0 gopkg.in/telebot.v3 v3.3.8 ) require ( filippo.io/edwards25519 v1.1.0 // indirect - github.com/Microsoft/go-winio v0.6.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.14.2 // indirect github.com/blang/semver/v4 v4.0.0 // indirect @@ -33,6 +33,7 @@ require ( github.com/consensys/gnark-crypto v0.14.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect + github.com/fatih/color v1.18.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-jose/go-jose/v4 v4.0.1 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -50,18 +51,23 @@ require ( github.com/ingonyama-zk/icicle v1.1.0 // indirect github.com/ingonyama-zk/iciclegnark v0.1.0 // indirect github.com/jinzhu/copier v0.4.0 // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/minio/highwayhash v1.0.3 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mmcloughlin/addchain v0.4.0 // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nats-io/jwt/v2 v2.5.8 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/ronanh/intcomp v1.1.0 // indirect github.com/rs/zerolog v1.33.0 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect @@ -84,7 +90,7 @@ require ( golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect - golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect + golang.org/x/time v0.7.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/grpc v1.67.0 // indirect google.golang.org/protobuf v1.34.2 // indirect diff --git a/go.sum b/go.sum index d742cc3e..093f11e8 100644 --- a/go.sum +++ b/go.sum @@ -61,9 +61,6 @@ filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= -github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= -github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= -github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -145,13 +142,12 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= -github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= +github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= +github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= -github.com/gdamore/optopia v0.2.0/go.mod h1:YKYEwo5C1Pa617H7NlPcmQXl+vG6YnSSNB44n8dNL0Q= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec= github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= @@ -337,8 +333,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= @@ -379,6 +375,8 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= @@ -405,6 +403,16 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= +github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/nats-server/v2 v2.10.22 h1:Yt63BGu2c3DdMoBZNcR6pjGQwk/asrKU7VX846ibxDA= +github.com/nats-io/nats-server/v2 v2.10.22/go.mod h1:X/m1ye9NYansUXYFrbcDwUi/blHkrgHh2rgCJaakonk= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= @@ -447,8 +455,8 @@ github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoG github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/ronanh/intcomp v1.1.0 h1:i54kxmpmSoOZFcWPMWryuakN0vLxLswASsGa07zkvLU= github.com/ronanh/intcomp v1.1.0/go.mod h1:7FOLy3P3Zj3er/kVrU/pl+Ql7JFZj7bwliMGketo0IU= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -463,7 +471,6 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.8.2/go.mod h1:CtAatgMJh6bJEIs48Ay/FOnkljP3WeGUG0MC1RfAqwo= github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= @@ -533,8 +540,6 @@ go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dY go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v2 v2.305.4/go.mod h1:Ud+VUwIi9/uQHOMA+4ekToJ12lTxlv0zB/+DHwTGEbU= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= -go.nanomsg.org/mangos/v3 v3.4.2 h1:gHlopxjWvJcVCcUilQIsRQk9jdj6/HB7wrTiUN8Ki7Q= -go.nanomsg.org/mangos/v3 v3.4.2/go.mod h1:8+hjBMQub6HvXmuGvIq6hf19uxGQIjCofmc62lbedLA= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -767,6 +772,7 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -785,8 +791,8 @@ golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= -golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= +golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/pkg/messaging/messaging_server.go b/pkg/messaging/messaging_server.go new file mode 100644 index 00000000..b10e2509 --- /dev/null +++ b/pkg/messaging/messaging_server.go @@ -0,0 +1,64 @@ +package messaging + +import ( + "fmt" + "math" + "net" + "strconv" + "time" + + "go.uber.org/zap" + + "github.com/nats-io/nats-server/v2/server" + "github.com/pkg/errors" +) + +func RunNatsMessagingServer( //nolint:nonamedreturns // needs in defer + serverAddress string, + logger *zap.Logger, + maxPayload uint64, + connectionTimeout time.Duration, +) (_ func(), runErr error) { + host, portString, err := net.SplitHostPort(serverAddress) + if err != nil { + return nil, errors.Errorf("failed to parse host and port: %v", err) + } + + port, err := strconv.Atoi(portString) + if err != nil { + return nil, errors.Errorf("failed to parse port from the URL: %v", err) + } + if port <= 0 || port > math.MaxUint16 { + return nil, errors.Errorf("invalid port number (%d)", port) + } + + if connectionTimeout <= 0 { + return nil, errors.Errorf("connection timeout must be positive") + } + + if maxPayload > math.MaxInt32 { + return nil, errors.Errorf("max payload is too big, must be in range of int32") + } + + opts := &server.Options{ + MaxPayload: int32(maxPayload), + Host: host, + Port: port, + NoSigs: true, + } + s, err := server.NewServer(opts) + if err != nil { + return nil, errors.Wrap(err, "failed to create NATS server") + } + s.Start() // this will not block + defer func() { + if runErr != nil { // if there was an error, we need to shut down the server + s.Shutdown() + } + }() + if !s.ReadyForConnections(connectionTimeout) { + return nil, errors.New("NATS server is not ready for connections") + } + logger.Info(fmt.Sprintf("NATS Server is running on %s:%d", host, port)) + return s.Shutdown, nil +} diff --git a/pkg/messaging/pair/server.go b/pkg/messaging/pair/server.go index 4d2dd828..66776a38 100644 --- a/pkg/messaging/pair/server.go +++ b/pkg/messaging/pair/server.go @@ -11,81 +11,62 @@ import ( "nodemon/pkg/storing/nodes" "nodemon/pkg/storing/specific" + "github.com/nats-io/nats.go" "github.com/pkg/errors" - "go.nanomsg.org/mangos/v3/protocol" - "go.nanomsg.org/mangos/v3/protocol/pair" "go.uber.org/zap" ) +const okMessage = "ok" + func StartPairMessagingServer( ctx context.Context, - nanomsgURL string, + natsPairURL string, ns nodes.Storage, es *events.Storage, pew specific.PrivateNodesEventsWriter, logger *zap.Logger, + botRequestsTopic string, ) error { - if len(nanomsgURL) == 0 || len(strings.Fields(nanomsgURL)) > 1 { - return errors.New("invalid nanomsg IPC URL for pair socket") - } - socket, sockErr := pair.NewSocket() - if sockErr != nil { - return sockErr - } - defer func(socketPair protocol.Socket) { - if err := socketPair.Close(); err != nil { - logger.Error("Failed to close pair socket", zap.Error(err)) - } - }(socket) - - if err := socket.Listen(nanomsgURL); err != nil { + nc, err := nats.Connect(natsPairURL) + if err != nil { + logger.Fatal("Failed to connect to nats server", zap.Error(err)) return err } + defer nc.Close() - loopErr := enterLoop(ctx, socket, logger, ns, es, pew) - if loopErr != nil && !errors.Is(loopErr, context.Canceled) { - return loopErr + if len(natsPairURL) == 0 { + return errors.New("invalid nats URL for pair messaging") } - return nil -} -func enterLoop( - ctx context.Context, - socket protocol.Socket, - logger *zap.Logger, - ns nodes.Storage, - es *events.Storage, - pew specific.PrivateNodesEventsWriter, -) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - rawMsg, recvErr := socket.Recv() - if recvErr != nil { - logger.Error("Failed to receive a message from pair socket", zap.Error(recvErr)) - return recvErr - } - err := handleMessage(rawMsg, ns, logger, socket, es, pew) - if err != nil { - return err - } + _, subErr := nc.Subscribe(botRequestsTopic, func(request *nats.Msg) { + response, handleErr := handleMessage(request.Data, ns, logger, es, pew) + if handleErr != nil { + logger.Error("failed to handle bot request", zap.Error(handleErr)) + return + } + respondErr := request.Respond(response) + if respondErr != nil { + logger.Error("failed to respond to bot request", zap.Error(respondErr)) + return } + }) + if subErr != nil { + return subErr } + <-ctx.Done() + return nil } func handleMessage( rawMsg []byte, ns nodes.Storage, logger *zap.Logger, - socket protocol.Socket, es *events.Storage, pew specific.PrivateNodesEventsWriter, -) error { +) ([]byte, error) { if len(rawMsg) == 0 { logger.Warn("empty raw message received from pair socket") - return nil + return nil, nil } var ( t = RequestPairType(rawMsg[0]) @@ -93,13 +74,17 @@ func handleMessage( ) switch t { case RequestNodeListType: - if err := handleNodesRequest(ns, false, logger, socket); err != nil { - return err + response, err := handleNodesRequest(ns, false, logger) + if err != nil { + return nil, err } + return response, nil case RequestSpecificNodeListType: - if err := handleNodesRequest(ns, true, logger, socket); err != nil { - return err + response, err := handleNodesRequest(ns, true, logger) + if err != nil { + return nil, err } + return response, nil case RequestInsertNewNodeType: insertRegularNodeIfNew(msg, ns, logger) case RequestInsertSpecificNewNodeType: @@ -109,11 +94,13 @@ func handleMessage( case RequestDeleteNodeType: handleDeleteNodeRequest(msg, ns, logger) case RequestNodesStatusType, RequestNodeStatementType: - handleNodesStatementsRequest(msg, es, logger, socket) + response := handleNodesStatementsRequest(msg, es, logger) + return response, nil default: logger.Error("Unknown request type", zap.Int("type", int(t)), zap.Binary("message", msg)) } - return nil + // nats considers a message delivered only if there was a not nil response + return []byte(okMessage), nil } func insertNodeIfNew(url string, ns nodes.Storage, specific bool, logger *zap.Logger) bool { @@ -160,27 +147,24 @@ func handleUpdateNodeRequest(msg []byte, logger *zap.Logger, ns nodes.Storage) { } } -func handleNodesRequest(ns nodes.Storage, specific bool, logger *zap.Logger, socketPair protocol.Socket) error { +func handleNodesRequest(ns nodes.Storage, specific bool, logger *zap.Logger) ([]byte, error) { nodesList, err := ns.Nodes(specific) if err != nil { logger.Error("Failed to get list of nodes from storage", zap.Error(err), zap.Bool("specific", specific), ) - return err + return nil, err } response := NodesListResponse{Nodes: nodesList} marshaledResponse, err := json.Marshal(response) if err != nil { logger.Error("Failed to marshal node list to json", zap.Error(err)) + return nil, errors.Wrapf(err, "Failed to marshal node list to json") } - err = socketPair.Send(marshaledResponse) - if err != nil { - logger.Error("Failed to send a node list to pair socket", zap.Error(err)) - } - return nil + return marshaledResponse, nil } -func handleNodesStatementsRequest(msg []byte, es *events.Storage, logger *zap.Logger, socketPair protocol.Socket) { +func handleNodesStatementsRequest(msg []byte, es *events.Storage, logger *zap.Logger) []byte { listOfNodes := strings.Split(string(msg), ",") var nodesStatusResp NodesStatementsResponse @@ -211,8 +195,5 @@ func handleNodesStatementsRequest(msg []byte, es *events.Storage, logger *zap.Lo if err != nil { logger.Error("Failed to marshal node status to json", zap.Error(err)) } - err = socketPair.Send(response) - if err != nil { - logger.Error("Failed to send a response from pair socket", zap.Error(err)) - } + return response } diff --git a/pkg/messaging/pubsub/server.go b/pkg/messaging/pubsub/server.go index 9179ed89..020adaa8 100644 --- a/pkg/messaging/pubsub/server.go +++ b/pkg/messaging/pubsub/server.go @@ -2,49 +2,37 @@ package pubsub import ( "context" - "strings" - - "nodemon/pkg/entities" - "nodemon/pkg/messaging" + "github.com/nats-io/nats.go" "github.com/pkg/errors" - "go.nanomsg.org/mangos/v3/protocol" - "go.nanomsg.org/mangos/v3/protocol/pub" - _ "go.nanomsg.org/mangos/v3/transport/all" // registers all transports + "go.uber.org/zap" + + "nodemon/pkg/entities" + "nodemon/pkg/messaging" ) func StartPubMessagingServer( ctx context.Context, - nanomsgURL string, + natsPubSubURL string, // expected nats://host:port alerts <-chan entities.Alert, logger *zap.Logger, + scheme string, ) error { - if len(nanomsgURL) == 0 || len(strings.Fields(nanomsgURL)) > 1 { - return errors.New("invalid nanomsg IPC URL for pub sub socket") - } - - socketPub, sockErr := pub.NewSocket() - if sockErr != nil { - return sockErr - } - defer func(socketPub protocol.Socket) { - if err := socketPub.Close(); err != nil { - logger.Error("Failed to close pub socket", zap.Error(err)) - } - }(socketPub) - - if err := socketPub.Listen(nanomsgURL); err != nil { + nc, err := nats.Connect(natsPubSubURL) + if err != nil { return err } - loopErr := enterLoop(ctx, alerts, logger, socketPub) + defer nc.Close() + loopErr := enterLoop(ctx, alerts, logger, nc, scheme) if loopErr != nil && !errors.Is(loopErr, context.Canceled) { return loopErr } return nil } -func enterLoop(ctx context.Context, alerts <-chan entities.Alert, logger *zap.Logger, socketPub protocol.Socket) error { +func enterLoop(ctx context.Context, alerts <-chan entities.Alert, + logger *zap.Logger, nc *nats.Conn, scheme string) error { for { select { case <-ctx.Done(): @@ -62,7 +50,8 @@ func enterLoop(ctx context.Context, alerts <-chan entities.Alert, logger *zap.Lo logger.Error("Failed to marshal binary alert message", zap.Error(err)) continue } - err = socketPub.Send(data) + topic := messaging.PubSubMsgTopic(scheme, alert.Type()) + err = nc.Publish(topic, data) if err != nil { logger.Error("Failed to send alert to socket", zap.Error(err)) } diff --git a/pkg/messaging/topics.go b/pkg/messaging/topics.go new file mode 100644 index 00000000..875d7390 --- /dev/null +++ b/pkg/messaging/topics.go @@ -0,0 +1,27 @@ +package messaging + +import ( + "fmt" + + "nodemon/pkg/entities" +) + +const pubSubTopicPrefix = "alerts" + +func PubSubMsgTopic(scheme string, alertType entities.AlertType) string { + alertName, ok := alertType.AlertName() + if !ok { + return fmt.Sprintf("%s_%s_alert-%d", pubSubTopicPrefix, scheme, uint(alertType)) + } + return fmt.Sprintf("%s_%s_%s", pubSubTopicPrefix, scheme, alertName.String()) +} + +const botRequestsTopicPrefix = "bot_requests" + +func TelegramBotRequestsTopic(scheme string) string { + return fmt.Sprintf("%s_%s_telegram", botRequestsTopicPrefix, scheme) +} + +func DiscordBotRequestsTopic(scheme string) string { + return fmt.Sprintf("%s_%s_discord", botRequestsTopicPrefix, scheme) +}