Skip to content

Commit

Permalink
Merge pull request #26 from didil/queued-infos
Browse files Browse the repository at this point in the history
feat: extra logging info for queued messages
  • Loading branch information
didil authored May 14, 2023
2 parents 3f1c03f + bd7bb17 commit 3e0030a
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 46 deletions.
3 changes: 2 additions & 1 deletion pkg/models/requeued_info.go → pkg/models/queued_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package models

import "time"

type RequeuedInfo struct {
type QueuedInfo struct {
MessageID string
QueueStatus QueueStatus
DeliverAfter time.Time
}
12 changes: 11 additions & 1 deletion pkg/server/handlers/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"fmt"
"net/http"

"github.com/didil/inhooks/pkg/models"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func (app *App) HandleIngest(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -36,13 +38,21 @@ func (app *App) HandleIngest(w http.ResponseWriter, r *http.Request) {
}

// enqueue messages
err = app.messageEnqueuer.Enqueue(ctx, messages)
queuedInfos, err := app.messageEnqueuer.Enqueue(ctx, messages)
if err != nil {
logger.Error("ingest request failed: unable to enqueue messages", zap.Error(err))
app.WriteJSONErr(w, http.StatusBadRequest, reqID, fmt.Errorf("unable to enqueue data"))
return
}

for _, queuedInfo := range queuedInfos {
fields := []zapcore.Field{zap.String("messageID", queuedInfo.MessageID), zap.String("queue", string(queuedInfo.QueueStatus))}
if queuedInfo.QueueStatus == models.QueueStatusScheduled {
fields = append(fields, zap.Time("nextAttemptAfter", queuedInfo.DeliverAfter))
}
logger.Info("message queued", fields...)
}

app.WriteJSONResponse(w, http.StatusOK, JSONOK{})
logger.Info("ingest request succeeded")
}
7 changes: 5 additions & 2 deletions pkg/server/handlers/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ func TestUpdateLB(t *testing.T) {
}

messageBuilder.EXPECT().FromHttp(flow, gomock.AssignableToTypeOf(&http.Request{}), gomock.AssignableToTypeOf("")).Return(messages, nil)

messageEnqueuer.EXPECT().Enqueue(gomock.Any(), messages).Return(nil)
queuedInfos := []*models.QueuedInfo{
{MessageID: messages[0].ID, QueueStatus: models.QueueStatusReady},
{MessageID: messages[1].ID, QueueStatus: models.QueueStatusReady},
}
messageEnqueuer.EXPECT().Enqueue(gomock.Any(), messages).Return(queuedInfos, nil)

buf := bytes.NewBufferString(`{"id": "abc"}`)

Expand Down
55 changes: 34 additions & 21 deletions pkg/services/message_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type MessageEnqueuer interface {
Enqueue(ctx context.Context, messages []*models.Message) error
Enqueue(ctx context.Context, messages []*models.Message) ([]*models.QueuedInfo, error)
}

func NewMessageEnqueuer(redisStore RedisStore, timeSvc TimeService) MessageEnqueuer {
Expand All @@ -26,34 +26,47 @@ type messageEnqueuer struct {
timeSvc TimeService
}

func (e *messageEnqueuer) Enqueue(ctx context.Context, messages []*models.Message) error {
func (e *messageEnqueuer) Enqueue(ctx context.Context, messages []*models.Message) ([]*models.QueuedInfo, error) {
queuedInfos := []*models.QueuedInfo{}

for _, m := range messages {
queueStatus := getQueueStatus(m, e.timeSvc.Now())

b, err := json.Marshal(&m)
err := e.redisEnqueue(ctx, m, queueStatus)
if err != nil {
return errors.Wrapf(err, "failed to encode message for sink: %s", m.SinkID)
return nil, err
}

mKey := messageKey(m.FlowID, m.SinkID, m.ID)
qKey := queueKey(m.FlowID, m.SinkID, queueStatus)

switch queueStatus {
case models.QueueStatusReady:
err = e.redisStore.SetAndEnqueue(ctx, mKey, b, qKey, m.ID)
if err != nil {
return errors.Wrapf(err, "failed to set and enqueue message for sink: %s", m.SinkID)
}
case models.QueueStatusScheduled:
err = e.redisStore.SetAndZAdd(ctx, mKey, b, qKey, m.ID, float64(m.DeliverAfter.Unix()))
if err != nil {
return errors.Wrapf(err, "failed to set and enqueue message for sink: %s", m.SinkID)
}
default:
return fmt.Errorf("unexpected queue status %s", queueStatus)
}
queuedInfos = append(queuedInfos, &models.QueuedInfo{MessageID: m.ID, QueueStatus: queueStatus, DeliverAfter: m.DeliverAfter})
}

return queuedInfos, nil
}

func (e *messageEnqueuer) redisEnqueue(ctx context.Context, m *models.Message, queueStatus models.QueueStatus) error {
b, err := json.Marshal(&m)
if err != nil {
return errors.Wrapf(err, "failed to encode message for sink: %s", m.SinkID)
}

mKey := messageKey(m.FlowID, m.SinkID, m.ID)
qKey := queueKey(m.FlowID, m.SinkID, queueStatus)

switch queueStatus {
case models.QueueStatusReady:
err = e.redisStore.SetAndEnqueue(ctx, mKey, b, qKey, m.ID)
if err != nil {
return errors.Wrapf(err, "failed to set and enqueue message for sink: %s", m.SinkID)
}
case models.QueueStatusScheduled:
err = e.redisStore.SetAndZAdd(ctx, mKey, b, qKey, m.ID, float64(m.DeliverAfter.Unix()))
if err != nil {
return errors.Wrapf(err, "failed to set and enqueue message for sink: %s", m.SinkID)
}
default:
return fmt.Errorf("unexpected queue status %s", queueStatus)
}

return nil
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/services/message_enqueuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ func TestMessageEnqueuer(t *testing.T) {
Times(1).
Return(nil)

err = messageEnqueuer.Enqueue(ctx, []*models.Message{m1, m2})
queuedInfos, err := messageEnqueuer.Enqueue(ctx, []*models.Message{m1, m2})
assert.NoError(t, err)

expectedInfos := []*models.QueuedInfo{
{MessageID: m1ID, QueueStatus: models.QueueStatusReady, DeliverAfter: m1.DeliverAfter},
{MessageID: m2ID, QueueStatus: models.QueueStatusScheduled, DeliverAfter: m2.DeliverAfter},
}

assert.Equal(t, expectedInfos, queuedInfos)
}
8 changes: 4 additions & 4 deletions pkg/services/processing_results_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type ProcessingResultsService interface {
HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.RequeuedInfo, error)
HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.QueuedInfo, error)
HandleOK(ctx context.Context, m *models.Message) error
}

Expand All @@ -27,7 +27,7 @@ func NewProcessingResultsService(timeSvc TimeService, redisStore RedisStore) Pro
}
}

func (s *processingResultsService) HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.RequeuedInfo, error) {
func (s *processingResultsService) HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.QueuedInfo, error) {
now := s.timeSvc.Now()
m.DeliveryAttempts = append(m.DeliveryAttempts,
&models.DeliveryAttempt{
Expand Down Expand Up @@ -68,7 +68,7 @@ func (s *processingResultsService) HandleFailed(ctx context.Context, sink *model
return nil, errors.Wrapf(err, "failed to set and move to dead")
}

return &models.RequeuedInfo{QueueStatus: models.QueueStatusDead}, nil
return &models.QueuedInfo{MessageID: m.ID, QueueStatus: models.QueueStatusDead, DeliverAfter: m.DeliverAfter}, nil
}

queueStatus := getQueueStatus(m, now)
Expand All @@ -89,7 +89,7 @@ func (s *processingResultsService) HandleFailed(ctx context.Context, sink *model
return nil, fmt.Errorf("unexpected queue status %s", queueStatus)
}

return &models.RequeuedInfo{QueueStatus: queueStatus, DeliverAfter: m.DeliverAfter}, nil
return &models.QueuedInfo{MessageID: m.ID, QueueStatus: queueStatus, DeliverAfter: m.DeliverAfter}, nil
}

func (s *processingResultsService) HandleOK(ctx context.Context, m *models.Message) error {
Expand Down
19 changes: 11 additions & 8 deletions pkg/services/processing_results_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ func TestProcessingResultsServiceHandleFailed_Dead(t *testing.T) {
redisStore.EXPECT().SetAndMove(ctx, messageKey, b, sourceQueueKey, destQueueKey, mID).Return(nil)

s := NewProcessingResultsService(timeSvc, redisStore)
requeuedInfo, err := s.HandleFailed(ctx, sink, m, processingErr)
queuedInfo, err := s.HandleFailed(ctx, sink, m, processingErr)
assert.NoError(t, err)
assert.Equal(t, models.QueueStatusDead, requeuedInfo.QueueStatus)
assert.Equal(t, mID, queuedInfo.MessageID)
assert.Equal(t, models.QueueStatusDead, queuedInfo.QueueStatus)
}

func TestProcessingResultsServiceHandleFailed_Scheduled(t *testing.T) {
Expand Down Expand Up @@ -181,10 +182,11 @@ func TestProcessingResultsServiceHandleFailed_Scheduled(t *testing.T) {
redisStore.EXPECT().SetLRemZAdd(ctx, messageKey, b, sourceQueueKey, destQueueKey, mID, float64(mUpdated.DeliverAfter.Unix())).Return(nil)

s := NewProcessingResultsService(timeSvc, redisStore)
requeuedInfo, err := s.HandleFailed(ctx, sink, m, processingErr)
queuedInfo, err := s.HandleFailed(ctx, sink, m, processingErr)
assert.NoError(t, err)
assert.Equal(t, models.QueueStatusScheduled, requeuedInfo.QueueStatus)
assert.Equal(t, mUpdated.DeliverAfter, requeuedInfo.DeliverAfter)
assert.Equal(t, mID, queuedInfo.MessageID)
assert.Equal(t, models.QueueStatusScheduled, queuedInfo.QueueStatus)
assert.Equal(t, mUpdated.DeliverAfter, queuedInfo.DeliverAfter)
}

func TestProcessingResultsServiceHandleFailed_Ready(t *testing.T) {
Expand Down Expand Up @@ -244,8 +246,9 @@ func TestProcessingResultsServiceHandleFailed_Ready(t *testing.T) {
redisStore.EXPECT().SetAndMove(ctx, messageKey, b, sourceQueueKey, destQueueKey, mID).Return(nil)

s := NewProcessingResultsService(timeSvc, redisStore)
requeuedInfo, err := s.HandleFailed(ctx, sink, m, processingErr)
queuedInfo, err := s.HandleFailed(ctx, sink, m, processingErr)
assert.NoError(t, err)
assert.Equal(t, models.QueueStatusReady, requeuedInfo.QueueStatus)
assert.Equal(t, mUpdated.DeliverAfter, requeuedInfo.DeliverAfter)
assert.Equal(t, mID, queuedInfo.MessageID)
assert.Equal(t, models.QueueStatusReady, queuedInfo.QueueStatus)
assert.Equal(t, mUpdated.DeliverAfter, queuedInfo.DeliverAfter)
}
4 changes: 2 additions & 2 deletions pkg/supervisor/ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func (s *Supervisor) FetchAndProcess(ctx context.Context, f *models.Flow, sink *
processingErr := s.messageProcessor.Process(ctx, sink, m)
if processingErr != nil {
logger.Info("message processing failed")
requeuedInfo, err := s.processingResultsSvc.HandleFailed(ctx, sink, m, processingErr)
queuedInfo, err := s.processingResultsSvc.HandleFailed(ctx, sink, m, processingErr)
if err != nil {
return errors.Wrapf(err, "could not handle failed processing")
}
logger.Info("message queued after failure", zap.String("queue", string(requeuedInfo.QueueStatus)), zap.Time("nextAttemptAfter", requeuedInfo.DeliverAfter))
logger.Info("message queued after failure", zap.String("queue", string(queuedInfo.QueueStatus)), zap.Time("nextAttemptAfter", queuedInfo.DeliverAfter))
} else {
logger.Info("message processed ok")
err := s.processingResultsSvc.HandleOK(ctx, m)
Expand Down
2 changes: 1 addition & 1 deletion pkg/supervisor/ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestSupervisorFetchAndProcess_Failed(t *testing.T) {

messageFetcher.EXPECT().GetMessageForProcessing(ctx, appConf.Supervisor.ReadyWaitTime, flowId1, sinkID1).Return(m, nil)
messageProcessor.EXPECT().Process(ctx, sink1, m).Return(processingErr)
processingResultsService.EXPECT().HandleFailed(ctx, sink1, m, processingErr).Return(&models.RequeuedInfo{QueueStatus: models.QueueStatusReady}, nil)
processingResultsService.EXPECT().HandleFailed(ctx, sink1, m, processingErr).Return(&models.QueuedInfo{QueueStatus: models.QueueStatusReady}, nil)

logger, err := zap.NewDevelopment()
assert.NoError(t, err)
Expand Down
7 changes: 4 additions & 3 deletions pkg/testsupport/mocks/mock_message_enqueuer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/testsupport/mocks/mock_processing_results_service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3e0030a

Please sign in to comment.