From 5c74f2c1d2ee1ce1eaa54950d6ccaae2fdf5df13 Mon Sep 17 00:00:00 2001 From: Nikolay Eskov Date: Thu, 5 Oct 2023 01:59:05 +0300 Subject: [PATCH] Add more logs for the messaging services and HTTP APIs. --- cmd/bots/internal/common/api/api.go | 3 +- .../internal/common/messaging/pair_client.go | 25 ++++++++---- .../common/messaging/pubsub_client.go | 38 +++++++++++++++---- pkg/api/api.go | 3 +- pkg/messaging/pair/server.go | 13 +++++-- pkg/messaging/pubsub/server.go | 19 ++++++++-- 6 files changed, 78 insertions(+), 23 deletions(-) diff --git a/cmd/bots/internal/common/api/api.go b/cmd/bots/internal/common/api/api.go index 1721d7bf..bfe33ccc 100644 --- a/cmd/bots/internal/common/api/api.go +++ b/cmd/bots/internal/common/api/api.go @@ -88,6 +88,7 @@ func (a *BotAPI) Start() error { return errors.Errorf("Failed to start REST API at '%s': %v", a.srv.Addr, listenErr) } go func() { + a.zap.Info("HTTP API is ready to serve requests", zap.String("addr", a.srv.Addr)) err := a.srv.Serve(l) if err != nil && !errors.Is(err, http.ErrServerClosed) { a.zap.Fatal("Failed to serve REST API", zap.String("address", a.srv.Addr), zap.Error(err)) @@ -99,7 +100,7 @@ func (a *BotAPI) Start() error { func (a *BotAPI) Shutdown() { ctx, cancel := context.WithTimeout(context.Background(), botAPIShutdownTimeout) defer cancel() - + a.zap.Info("Shutting down HTTP API") if err := a.srv.Shutdown(ctx); err != nil { a.zap.Error("Failed to shutdown REST API", zap.Error(err)) } diff --git a/cmd/bots/internal/common/messaging/pair_client.go b/cmd/bots/internal/common/messaging/pair_client.go index 8c3ccf38..caf3624f 100644 --- a/cmd/bots/internal/common/messaging/pair_client.go +++ b/cmd/bots/internal/common/messaging/pair_client.go @@ -74,35 +74,44 @@ func StartPairMessagingClient( ) error { pairSocket, sockErr := pairProtocol.NewSocket() if sockErr != nil { - return errors.Wrap(sockErr, "failed to get new pair socket") + return errors.Wrap(sockErr, "failed to create new pair socket") } defer func() { _ = pairSocket.Close() // can be ignored, only possible error is protocol.ErrClosed }() + logger.Debug("Dialing the pair socket", zap.String("url", nanomsgURL)) if err := pairSocket.Dial(nanomsgURL); err != nil { return errors.Wrapf(err, "failed to dial '%s' on pair socket", nanomsgURL) } - done := runPairLoop(ctx, requestPair, sendRecvDeadlineSocketWrapper{pairSocket}, logger, responsePair) + logger.Info("Staring pair messaging service loop...") + done := runPairLoop(ctx, nanomsgURL, sendRecvDeadlineSocketWrapper{pairSocket}, requestPair, responsePair, logger) <-ctx.Done() - logger.Info("stopping pair messaging service...") + logger.Info("Stopping pair messaging service...") <-done - logger.Info("pair messaging service finished") + logger.Info("Sair messaging service finished") return nil } func runPairLoop( ctx context.Context, - requestPair <-chan pair.Request, + destinationURL string, pairSocket protocol.Socket, - logger *zap.Logger, + requestPair <-chan pair.Request, responsePair chan<- pair.Response, + logger *zap.Logger, ) <-chan struct{} { ch := make(chan struct{}) go func(done chan<- struct{}) { defer close(done) + if ctx.Err() != nil { + return + } + logger.Info("Pair messaging service is ready to receive and send messages to/from other side", + zap.String("destination-endpoint", destinationURL), + ) for { if ctx.Err() != nil { return @@ -113,7 +122,9 @@ func runPairLoop( case request := <-requestPair: message := &bytes.Buffer{} message.WriteByte(byte(request.RequestType())) - + logger.Debug("Pair service received request message", + zap.Int("request-type", int(request.RequestType())), + ) err := handlePairRequest(ctx, request, pairSocket, message, logger, responsePair) if err != nil { logger.Error("failed to handle pair request", diff --git a/cmd/bots/internal/common/messaging/pubsub_client.go b/cmd/bots/internal/common/messaging/pubsub_client.go index 6da4f53d..fdec6f96 100644 --- a/cmd/bots/internal/common/messaging/pubsub_client.go +++ b/cmd/bots/internal/common/messaging/pubsub_client.go @@ -15,7 +15,7 @@ import ( func StartSubMessagingClient(ctx context.Context, nanomsgURL string, bot Bot, logger *zap.Logger) error { subSocket, sockErr := sub.NewSocket() if sockErr != nil { - return sockErr + return errors.Wrap(sockErr, "failed to create new subscriber socket") } defer func() { _ = subSocket.Close() // can be ignored, only possible error is protocol.ErrClosed @@ -23,24 +23,33 @@ func StartSubMessagingClient(ctx context.Context, nanomsgURL string, bot Bot, lo bot.SetSubSocket(subSocket) + logger.Debug("Dialing the publisher socket", zap.String("url", nanomsgURL)) if dialErr := subSocket.Dial(nanomsgURL); dialErr != nil { return errors.Wrapf(dialErr, "failed to dial '%s on sub socket'", nanomsgURL) } + logger.Debug("Subscribing to all alert types") if err := bot.SubscribeToAllAlerts(); err != nil { return errors.Wrap(err, "failed to subscribe to all alerts") } - done := runSubLoop(ctx, subSocket, logger, bot) + logger.Info("Staring subscriber messaging service loop...") + done := runSubLoop(ctx, nanomsgURL, subSocket, logger, bot) <-ctx.Done() - logger.Info("stopping sub messaging service...") + logger.Info("Stopping subscriber messaging service...") <-done - logger.Info("sub messaging service finished") + logger.Info("Subscriber messaging service finished") return nil } -func runSubLoop(ctx context.Context, subSocket protocol.Socket, logger *zap.Logger, bot Bot) <-chan struct{} { +func runSubLoop( + ctx context.Context, + publisherURL string, + subSocket protocol.Socket, + logger *zap.Logger, + bot Bot, +) <-chan struct{} { sockCh := make(chan struct{}) go func() { // run socket closer goroutine defer close(sockCh) @@ -53,11 +62,17 @@ func runSubLoop(ctx context.Context, subSocket protocol.Socket, logger *zap.Logg <-closedSock close(done) }() + if ctx.Err() != nil { + return + } + logger.Info("Subscriber messaging service is ready to receive messages from publisher", + zap.String("publisher-url", publisherURL), + ) for { if ctx.Err() != nil { return } - if err := recvMessage(subSocket, bot); err != nil { + if err := recvMessage(subSocket, logger, bot); err != nil { if errors.Is(err, protocol.ErrClosed) { // socket is closed, this means that context is canceled return } @@ -68,15 +83,24 @@ func runSubLoop(ctx context.Context, subSocket protocol.Socket, logger *zap.Logg return ch } -func recvMessage(subSocket protocol.Socket, bot Bot) error { +func recvMessage(subSocket protocol.Socket, logger *zap.Logger, bot Bot) error { + logger.Debug("Subscriber service waiting for a new message...") msg, err := subSocket.Recv() // this operation is blocking, we have to close the socket to interrupt this block if err != nil { return errors.Wrap(err, "failed to receive message from sub socket") } + logger.Debug("Subscriber service received a new message") alertMsg, err := messaging.NewAlertMessageFromBytes(msg) if err != nil { return errors.Wrap(err, "failed to parse alert message from bytes") } + alertName, _ := alertMsg.AlertType().AlertName() + logger.Debug("Subscriber service received a new alert", + zap.Int("alert-type", int(alertMsg.AlertType())), + zap.Stringer("alert-name", alertName), + zap.Stringer("reference-id", alertMsg.ReferenceID()), + ) bot.SendAlertMessage(alertMsg) + logger.Debug("Subscriber service sent received message to the bot service") return nil } diff --git a/pkg/api/api.go b/pkg/api/api.go index 2e4da153..a2474b60 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -78,6 +78,7 @@ func (a *API) Start() error { return errors.Errorf("Failed to start REST API at '%s': %v", a.srv.Addr, listenErr) } go func() { + a.zap.Info("HTTP API is ready to serve requests", zap.String("addr", a.srv.Addr)) err := a.srv.Serve(l) if err != nil && !errors.Is(err, http.ErrServerClosed) { a.zap.Sugar().Fatalf("Failed to serve REST API at '%s': %v", a.srv.Addr, err) @@ -89,7 +90,7 @@ func (a *API) Start() error { func (a *API) Shutdown() { ctx, cancel := context.WithTimeout(context.Background(), apiShutdownTimeout) defer cancel() - + a.zap.Info("Shutting down HTTP API") if err := a.srv.Shutdown(ctx); err != nil { a.zap.Error("Failed to shutdown REST API", zap.Error(err)) } diff --git a/pkg/messaging/pair/server.go b/pkg/messaging/pair/server.go index ca65d081..07f1159e 100644 --- a/pkg/messaging/pair/server.go +++ b/pkg/messaging/pair/server.go @@ -27,7 +27,7 @@ func StartPairMessagingServer( } socket, sockErr := pair.NewSocket() if sockErr != nil { - return sockErr + return errors.Wrap(sockErr, "failed to create new pair socket") } defer func(socketPair protocol.Socket) { if err := socketPair.Close(); err != nil { @@ -35,23 +35,29 @@ func StartPairMessagingServer( } }(socket) + logger.Debug("Pair messaging service start listening", zap.String("listen", nanomsgURL)) if err := socket.Listen(nanomsgURL); err != nil { - return err + return errors.Wrapf(err, "pair socket failed to start listening on '%s'", nanomsgURL) } + logger.Info("Pair messaging service is ready to receive and send messages to/from other side", + zap.String("listen", nanomsgURL), + ) for { select { case <-ctx.Done(): return nil default: + logger.Debug("Pair service waiting for a new message...") rawMsg, recvErr := socket.Recv() if recvErr != nil { logger.Error("Failed to receive a message from pair socket", zap.Error(recvErr)) return recvErr } + logger.Debug("Pair service received a new message") err := handleMessage(rawMsg, ns, logger, socket, es) if err != nil { - return err + return errors.Wrap(err, "failed to handle message") } } } @@ -72,6 +78,7 @@ func handleMessage( t = RequestPairType(rawMsg[0]) msg = rawMsg[1:] // cut first byte, which is request type ) + logger.Debug("Pair service received request message", zap.Int("request-type", int(t))) switch t { case RequestNodeListType: if err := handleNodesRequest(ns, false, logger, socket); err != nil { diff --git a/pkg/messaging/pubsub/server.go b/pkg/messaging/pubsub/server.go index 0f55dc55..f63c2ba1 100644 --- a/pkg/messaging/pubsub/server.go +++ b/pkg/messaging/pubsub/server.go @@ -26,7 +26,7 @@ func StartPubMessagingServer( socketPub, sockErr := pub.NewSocket() if sockErr != nil { - return sockErr + return errors.Wrap(sockErr, "failed to create new publisher socket") } defer func(socketPub protocol.Socket) { if err := socketPub.Close(); err != nil { @@ -34,14 +34,25 @@ func StartPubMessagingServer( } }(socketPub) + logger.Debug("Publisher messaging service start listening", zap.String("listen", nanomsgURL)) if err := socketPub.Listen(nanomsgURL); err != nil { - return err + return errors.Wrapf(err, "publisher socket failed to start listening on '%s'", nanomsgURL) } - return enterLoop(ctx, alerts, logger, socketPub) + logger.Info("Staring publisher messaging service loop...") + return enterLoop(ctx, alerts, logger, nanomsgURL, socketPub) } -func enterLoop(ctx context.Context, alerts <-chan entities.Alert, logger *zap.Logger, socketPub protocol.Socket) error { +func enterLoop( + ctx context.Context, + alerts <-chan entities.Alert, + logger *zap.Logger, + listen string, + socketPub protocol.Socket, +) error { + logger.Info("Publisher messaging service is ready to send messages to subscribers", + zap.String("listen", listen), + ) for { select { case <-ctx.Done():