Skip to content

Commit

Permalink
Add more logs for the messaging services and HTTP APIs.
Browse files Browse the repository at this point in the history
  • Loading branch information
nickeskov committed Jan 3, 2024
1 parent 517803a commit 5c74f2c
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 23 deletions.
3 changes: 2 additions & 1 deletion cmd/bots/internal/common/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (a *BotAPI) Start() error {
return errors.Errorf("Failed to start REST API at '%s': %v", a.srv.Addr, listenErr)
}
go func() {
a.zap.Info("HTTP API is ready to serve requests", zap.String("addr", a.srv.Addr))
err := a.srv.Serve(l)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
a.zap.Fatal("Failed to serve REST API", zap.String("address", a.srv.Addr), zap.Error(err))
Expand All @@ -99,7 +100,7 @@ func (a *BotAPI) Start() error {
func (a *BotAPI) Shutdown() {
ctx, cancel := context.WithTimeout(context.Background(), botAPIShutdownTimeout)
defer cancel()

a.zap.Info("Shutting down HTTP API")
if err := a.srv.Shutdown(ctx); err != nil {
a.zap.Error("Failed to shutdown REST API", zap.Error(err))
}
Expand Down
25 changes: 18 additions & 7 deletions cmd/bots/internal/common/messaging/pair_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,35 +74,44 @@ func StartPairMessagingClient(
) error {
pairSocket, sockErr := pairProtocol.NewSocket()
if sockErr != nil {
return errors.Wrap(sockErr, "failed to get new pair socket")
return errors.Wrap(sockErr, "failed to create new pair socket")
}
defer func() {
_ = pairSocket.Close() // can be ignored, only possible error is protocol.ErrClosed
}()

logger.Debug("Dialing the pair socket", zap.String("url", nanomsgURL))
if err := pairSocket.Dial(nanomsgURL); err != nil {
return errors.Wrapf(err, "failed to dial '%s' on pair socket", nanomsgURL)
}

done := runPairLoop(ctx, requestPair, sendRecvDeadlineSocketWrapper{pairSocket}, logger, responsePair)
logger.Info("Staring pair messaging service loop...")
done := runPairLoop(ctx, nanomsgURL, sendRecvDeadlineSocketWrapper{pairSocket}, requestPair, responsePair, logger)

<-ctx.Done()
logger.Info("stopping pair messaging service...")
logger.Info("Stopping pair messaging service...")
<-done
logger.Info("pair messaging service finished")
logger.Info("Sair messaging service finished")
return nil
}

func runPairLoop(
ctx context.Context,
requestPair <-chan pair.Request,
destinationURL string,
pairSocket protocol.Socket,
logger *zap.Logger,
requestPair <-chan pair.Request,
responsePair chan<- pair.Response,
logger *zap.Logger,
) <-chan struct{} {
ch := make(chan struct{})
go func(done chan<- struct{}) {
defer close(done)
if ctx.Err() != nil {
return
}
logger.Info("Pair messaging service is ready to receive and send messages to/from other side",
zap.String("destination-endpoint", destinationURL),
)
for {
if ctx.Err() != nil {
return
Expand All @@ -113,7 +122,9 @@ func runPairLoop(
case request := <-requestPair:
message := &bytes.Buffer{}
message.WriteByte(byte(request.RequestType()))

logger.Debug("Pair service received request message",
zap.Int("request-type", int(request.RequestType())),
)
err := handlePairRequest(ctx, request, pairSocket, message, logger, responsePair)
if err != nil {
logger.Error("failed to handle pair request",
Expand Down
38 changes: 31 additions & 7 deletions cmd/bots/internal/common/messaging/pubsub_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,41 @@ import (
func StartSubMessagingClient(ctx context.Context, nanomsgURL string, bot Bot, logger *zap.Logger) error {
subSocket, sockErr := sub.NewSocket()
if sockErr != nil {
return sockErr
return errors.Wrap(sockErr, "failed to create new subscriber socket")
}
defer func() {
_ = subSocket.Close() // can be ignored, only possible error is protocol.ErrClosed
}()

bot.SetSubSocket(subSocket)

logger.Debug("Dialing the publisher socket", zap.String("url", nanomsgURL))
if dialErr := subSocket.Dial(nanomsgURL); dialErr != nil {
return errors.Wrapf(dialErr, "failed to dial '%s on sub socket'", nanomsgURL)
}

logger.Debug("Subscribing to all alert types")
if err := bot.SubscribeToAllAlerts(); err != nil {
return errors.Wrap(err, "failed to subscribe to all alerts")
}

done := runSubLoop(ctx, subSocket, logger, bot)
logger.Info("Staring subscriber messaging service loop...")
done := runSubLoop(ctx, nanomsgURL, subSocket, logger, bot)

<-ctx.Done()
logger.Info("stopping sub messaging service...")
logger.Info("Stopping subscriber messaging service...")
<-done
logger.Info("sub messaging service finished")
logger.Info("Subscriber messaging service finished")
return nil
}

func runSubLoop(ctx context.Context, subSocket protocol.Socket, logger *zap.Logger, bot Bot) <-chan struct{} {
func runSubLoop(
ctx context.Context,
publisherURL string,
subSocket protocol.Socket,
logger *zap.Logger,
bot Bot,
) <-chan struct{} {
sockCh := make(chan struct{})
go func() { // run socket closer goroutine
defer close(sockCh)
Expand All @@ -53,11 +62,17 @@ func runSubLoop(ctx context.Context, subSocket protocol.Socket, logger *zap.Logg
<-closedSock
close(done)
}()
if ctx.Err() != nil {
return
}
logger.Info("Subscriber messaging service is ready to receive messages from publisher",
zap.String("publisher-url", publisherURL),
)
for {
if ctx.Err() != nil {
return
}
if err := recvMessage(subSocket, bot); err != nil {
if err := recvMessage(subSocket, logger, bot); err != nil {
if errors.Is(err, protocol.ErrClosed) { // socket is closed, this means that context is canceled
return
}
Expand All @@ -68,15 +83,24 @@ func runSubLoop(ctx context.Context, subSocket protocol.Socket, logger *zap.Logg
return ch
}

func recvMessage(subSocket protocol.Socket, bot Bot) error {
func recvMessage(subSocket protocol.Socket, logger *zap.Logger, bot Bot) error {
logger.Debug("Subscriber service waiting for a new message...")
msg, err := subSocket.Recv() // this operation is blocking, we have to close the socket to interrupt this block
if err != nil {
return errors.Wrap(err, "failed to receive message from sub socket")
}
logger.Debug("Subscriber service received a new message")
alertMsg, err := messaging.NewAlertMessageFromBytes(msg)
if err != nil {
return errors.Wrap(err, "failed to parse alert message from bytes")
}
alertName, _ := alertMsg.AlertType().AlertName()
logger.Debug("Subscriber service received a new alert",
zap.Int("alert-type", int(alertMsg.AlertType())),
zap.Stringer("alert-name", alertName),
zap.Stringer("reference-id", alertMsg.ReferenceID()),
)
bot.SendAlertMessage(alertMsg)
logger.Debug("Subscriber service sent received message to the bot service")
return nil
}
3 changes: 2 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (a *API) Start() error {
return errors.Errorf("Failed to start REST API at '%s': %v", a.srv.Addr, listenErr)
}
go func() {
a.zap.Info("HTTP API is ready to serve requests", zap.String("addr", a.srv.Addr))
err := a.srv.Serve(l)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
a.zap.Sugar().Fatalf("Failed to serve REST API at '%s': %v", a.srv.Addr, err)
Expand All @@ -89,7 +90,7 @@ func (a *API) Start() error {
func (a *API) Shutdown() {
ctx, cancel := context.WithTimeout(context.Background(), apiShutdownTimeout)
defer cancel()

a.zap.Info("Shutting down HTTP API")
if err := a.srv.Shutdown(ctx); err != nil {
a.zap.Error("Failed to shutdown REST API", zap.Error(err))
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/messaging/pair/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,37 @@ func StartPairMessagingServer(
}
socket, sockErr := pair.NewSocket()
if sockErr != nil {
return sockErr
return errors.Wrap(sockErr, "failed to create new pair socket")
}
defer func(socketPair protocol.Socket) {
if err := socketPair.Close(); err != nil {
logger.Error("Failed to close pair socket", zap.Error(err))
}
}(socket)

logger.Debug("Pair messaging service start listening", zap.String("listen", nanomsgURL))
if err := socket.Listen(nanomsgURL); err != nil {
return err
return errors.Wrapf(err, "pair socket failed to start listening on '%s'", nanomsgURL)
}

logger.Info("Pair messaging service is ready to receive and send messages to/from other side",
zap.String("listen", nanomsgURL),
)
for {
select {
case <-ctx.Done():
return nil
default:
logger.Debug("Pair service waiting for a new message...")
rawMsg, recvErr := socket.Recv()
if recvErr != nil {
logger.Error("Failed to receive a message from pair socket", zap.Error(recvErr))
return recvErr
}
logger.Debug("Pair service received a new message")
err := handleMessage(rawMsg, ns, logger, socket, es)
if err != nil {
return err
return errors.Wrap(err, "failed to handle message")
}
}
}
Expand All @@ -72,6 +78,7 @@ func handleMessage(
t = RequestPairType(rawMsg[0])
msg = rawMsg[1:] // cut first byte, which is request type
)
logger.Debug("Pair service received request message", zap.Int("request-type", int(t)))
switch t {
case RequestNodeListType:
if err := handleNodesRequest(ns, false, logger, socket); err != nil {
Expand Down
19 changes: 15 additions & 4 deletions pkg/messaging/pubsub/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,33 @@ func StartPubMessagingServer(

socketPub, sockErr := pub.NewSocket()
if sockErr != nil {
return sockErr
return errors.Wrap(sockErr, "failed to create new publisher socket")
}
defer func(socketPub protocol.Socket) {
if err := socketPub.Close(); err != nil {
logger.Error("Failed to close pub socket", zap.Error(err))
}
}(socketPub)

logger.Debug("Publisher messaging service start listening", zap.String("listen", nanomsgURL))
if err := socketPub.Listen(nanomsgURL); err != nil {
return err
return errors.Wrapf(err, "publisher socket failed to start listening on '%s'", nanomsgURL)
}

return enterLoop(ctx, alerts, logger, socketPub)
logger.Info("Staring publisher messaging service loop...")
return enterLoop(ctx, alerts, logger, nanomsgURL, socketPub)
}

func enterLoop(ctx context.Context, alerts <-chan entities.Alert, logger *zap.Logger, socketPub protocol.Socket) error {
func enterLoop(
ctx context.Context,
alerts <-chan entities.Alert,
logger *zap.Logger,
listen string,
socketPub protocol.Socket,
) error {
logger.Info("Publisher messaging service is ready to send messages to subscribers",
zap.String("listen", listen),
)
for {
select {
case <-ctx.Done():
Expand Down

0 comments on commit 5c74f2c

Please sign in to comment.