diff --git a/pkg/models/requeued_info.go b/pkg/models/queued_info.go similarity index 64% rename from pkg/models/requeued_info.go rename to pkg/models/queued_info.go index 579e633..91e120b 100644 --- a/pkg/models/requeued_info.go +++ b/pkg/models/queued_info.go @@ -2,7 +2,8 @@ package models import "time" -type RequeuedInfo struct { +type QueuedInfo struct { + MessageID string QueueStatus QueueStatus DeliverAfter time.Time } diff --git a/pkg/server/handlers/ingest.go b/pkg/server/handlers/ingest.go index 7bdfbbc..9c9c8e6 100644 --- a/pkg/server/handlers/ingest.go +++ b/pkg/server/handlers/ingest.go @@ -4,9 +4,11 @@ import ( "fmt" "net/http" + "github.com/didil/inhooks/pkg/models" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) func (app *App) HandleIngest(w http.ResponseWriter, r *http.Request) { @@ -36,13 +38,21 @@ func (app *App) HandleIngest(w http.ResponseWriter, r *http.Request) { } // enqueue messages - err = app.messageEnqueuer.Enqueue(ctx, messages) + queuedInfos, err := app.messageEnqueuer.Enqueue(ctx, messages) if err != nil { logger.Error("ingest request failed: unable to enqueue messages", zap.Error(err)) app.WriteJSONErr(w, http.StatusBadRequest, reqID, fmt.Errorf("unable to enqueue data")) return } + for _, queuedInfo := range queuedInfos { + fields := []zapcore.Field{zap.String("messageID", queuedInfo.MessageID), zap.String("queue", string(queuedInfo.QueueStatus))} + if queuedInfo.QueueStatus == models.QueueStatusScheduled { + fields = append(fields, zap.Time("nextAttemptAfter", queuedInfo.DeliverAfter)) + } + logger.Info("message queued", fields...) + } + app.WriteJSONResponse(w, http.StatusOK, JSONOK{}) logger.Info("ingest request succeeded") } diff --git a/pkg/server/handlers/ingest_test.go b/pkg/server/handlers/ingest_test.go index 15544c6..15ec164 100644 --- a/pkg/server/handlers/ingest_test.go +++ b/pkg/server/handlers/ingest_test.go @@ -53,8 +53,11 @@ func TestUpdateLB(t *testing.T) { } messageBuilder.EXPECT().FromHttp(flow, gomock.AssignableToTypeOf(&http.Request{}), gomock.AssignableToTypeOf("")).Return(messages, nil) - - messageEnqueuer.EXPECT().Enqueue(gomock.Any(), messages).Return(nil) + queuedInfos := []*models.QueuedInfo{ + {MessageID: messages[0].ID, QueueStatus: models.QueueStatusReady}, + {MessageID: messages[1].ID, QueueStatus: models.QueueStatusReady}, + } + messageEnqueuer.EXPECT().Enqueue(gomock.Any(), messages).Return(queuedInfos, nil) buf := bytes.NewBufferString(`{"id": "abc"}`) diff --git a/pkg/services/message_enqueuer.go b/pkg/services/message_enqueuer.go index c5b0867..13f90cb 100644 --- a/pkg/services/message_enqueuer.go +++ b/pkg/services/message_enqueuer.go @@ -11,7 +11,7 @@ import ( ) type MessageEnqueuer interface { - Enqueue(ctx context.Context, messages []*models.Message) error + Enqueue(ctx context.Context, messages []*models.Message) ([]*models.QueuedInfo, error) } func NewMessageEnqueuer(redisStore RedisStore, timeSvc TimeService) MessageEnqueuer { @@ -26,34 +26,47 @@ type messageEnqueuer struct { timeSvc TimeService } -func (e *messageEnqueuer) Enqueue(ctx context.Context, messages []*models.Message) error { +func (e *messageEnqueuer) Enqueue(ctx context.Context, messages []*models.Message) ([]*models.QueuedInfo, error) { + queuedInfos := []*models.QueuedInfo{} + for _, m := range messages { queueStatus := getQueueStatus(m, e.timeSvc.Now()) - b, err := json.Marshal(&m) + err := e.redisEnqueue(ctx, m, queueStatus) if err != nil { - return errors.Wrapf(err, "failed to encode message for sink: %s", m.SinkID) + return nil, err } - mKey := messageKey(m.FlowID, m.SinkID, m.ID) - qKey := queueKey(m.FlowID, m.SinkID, queueStatus) - - switch queueStatus { - case models.QueueStatusReady: - err = e.redisStore.SetAndEnqueue(ctx, mKey, b, qKey, m.ID) - if err != nil { - return errors.Wrapf(err, "failed to set and enqueue message for sink: %s", m.SinkID) - } - case models.QueueStatusScheduled: - err = e.redisStore.SetAndZAdd(ctx, mKey, b, qKey, m.ID, float64(m.DeliverAfter.Unix())) - if err != nil { - return errors.Wrapf(err, "failed to set and enqueue message for sink: %s", m.SinkID) - } - default: - return fmt.Errorf("unexpected queue status %s", queueStatus) - } + queuedInfos = append(queuedInfos, &models.QueuedInfo{MessageID: m.ID, QueueStatus: queueStatus, DeliverAfter: m.DeliverAfter}) + } + return queuedInfos, nil +} + +func (e *messageEnqueuer) redisEnqueue(ctx context.Context, m *models.Message, queueStatus models.QueueStatus) error { + b, err := json.Marshal(&m) + if err != nil { + return errors.Wrapf(err, "failed to encode message for sink: %s", m.SinkID) } + + mKey := messageKey(m.FlowID, m.SinkID, m.ID) + qKey := queueKey(m.FlowID, m.SinkID, queueStatus) + + switch queueStatus { + case models.QueueStatusReady: + err = e.redisStore.SetAndEnqueue(ctx, mKey, b, qKey, m.ID) + if err != nil { + return errors.Wrapf(err, "failed to set and enqueue message for sink: %s", m.SinkID) + } + case models.QueueStatusScheduled: + err = e.redisStore.SetAndZAdd(ctx, mKey, b, qKey, m.ID, float64(m.DeliverAfter.Unix())) + if err != nil { + return errors.Wrapf(err, "failed to set and enqueue message for sink: %s", m.SinkID) + } + default: + return fmt.Errorf("unexpected queue status %s", queueStatus) + } + return nil } diff --git a/pkg/services/message_enqueuer_test.go b/pkg/services/message_enqueuer_test.go index 79d455c..890001f 100644 --- a/pkg/services/message_enqueuer_test.go +++ b/pkg/services/message_enqueuer_test.go @@ -68,6 +68,13 @@ func TestMessageEnqueuer(t *testing.T) { Times(1). Return(nil) - err = messageEnqueuer.Enqueue(ctx, []*models.Message{m1, m2}) + queuedInfos, err := messageEnqueuer.Enqueue(ctx, []*models.Message{m1, m2}) assert.NoError(t, err) + + expectedInfos := []*models.QueuedInfo{ + {MessageID: m1ID, QueueStatus: models.QueueStatusReady, DeliverAfter: m1.DeliverAfter}, + {MessageID: m2ID, QueueStatus: models.QueueStatusScheduled, DeliverAfter: m2.DeliverAfter}, + } + + assert.Equal(t, expectedInfos, queuedInfos) } diff --git a/pkg/services/processing_results_service.go b/pkg/services/processing_results_service.go index ee8e8b3..3563641 100644 --- a/pkg/services/processing_results_service.go +++ b/pkg/services/processing_results_service.go @@ -11,7 +11,7 @@ import ( ) type ProcessingResultsService interface { - HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.RequeuedInfo, error) + HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.QueuedInfo, error) HandleOK(ctx context.Context, m *models.Message) error } @@ -27,7 +27,7 @@ func NewProcessingResultsService(timeSvc TimeService, redisStore RedisStore) Pro } } -func (s *processingResultsService) HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.RequeuedInfo, error) { +func (s *processingResultsService) HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.QueuedInfo, error) { now := s.timeSvc.Now() m.DeliveryAttempts = append(m.DeliveryAttempts, &models.DeliveryAttempt{ @@ -68,7 +68,7 @@ func (s *processingResultsService) HandleFailed(ctx context.Context, sink *model return nil, errors.Wrapf(err, "failed to set and move to dead") } - return &models.RequeuedInfo{QueueStatus: models.QueueStatusDead}, nil + return &models.QueuedInfo{MessageID: m.ID, QueueStatus: models.QueueStatusDead, DeliverAfter: m.DeliverAfter}, nil } queueStatus := getQueueStatus(m, now) @@ -89,7 +89,7 @@ func (s *processingResultsService) HandleFailed(ctx context.Context, sink *model return nil, fmt.Errorf("unexpected queue status %s", queueStatus) } - return &models.RequeuedInfo{QueueStatus: queueStatus, DeliverAfter: m.DeliverAfter}, nil + return &models.QueuedInfo{MessageID: m.ID, QueueStatus: queueStatus, DeliverAfter: m.DeliverAfter}, nil } func (s *processingResultsService) HandleOK(ctx context.Context, m *models.Message) error { diff --git a/pkg/services/processing_results_service_test.go b/pkg/services/processing_results_service_test.go index 5ad9ecb..6c62a1e 100644 --- a/pkg/services/processing_results_service_test.go +++ b/pkg/services/processing_results_service_test.go @@ -119,9 +119,10 @@ func TestProcessingResultsServiceHandleFailed_Dead(t *testing.T) { redisStore.EXPECT().SetAndMove(ctx, messageKey, b, sourceQueueKey, destQueueKey, mID).Return(nil) s := NewProcessingResultsService(timeSvc, redisStore) - requeuedInfo, err := s.HandleFailed(ctx, sink, m, processingErr) + queuedInfo, err := s.HandleFailed(ctx, sink, m, processingErr) assert.NoError(t, err) - assert.Equal(t, models.QueueStatusDead, requeuedInfo.QueueStatus) + assert.Equal(t, mID, queuedInfo.MessageID) + assert.Equal(t, models.QueueStatusDead, queuedInfo.QueueStatus) } func TestProcessingResultsServiceHandleFailed_Scheduled(t *testing.T) { @@ -181,10 +182,11 @@ func TestProcessingResultsServiceHandleFailed_Scheduled(t *testing.T) { redisStore.EXPECT().SetLRemZAdd(ctx, messageKey, b, sourceQueueKey, destQueueKey, mID, float64(mUpdated.DeliverAfter.Unix())).Return(nil) s := NewProcessingResultsService(timeSvc, redisStore) - requeuedInfo, err := s.HandleFailed(ctx, sink, m, processingErr) + queuedInfo, err := s.HandleFailed(ctx, sink, m, processingErr) assert.NoError(t, err) - assert.Equal(t, models.QueueStatusScheduled, requeuedInfo.QueueStatus) - assert.Equal(t, mUpdated.DeliverAfter, requeuedInfo.DeliverAfter) + assert.Equal(t, mID, queuedInfo.MessageID) + assert.Equal(t, models.QueueStatusScheduled, queuedInfo.QueueStatus) + assert.Equal(t, mUpdated.DeliverAfter, queuedInfo.DeliverAfter) } func TestProcessingResultsServiceHandleFailed_Ready(t *testing.T) { @@ -244,8 +246,9 @@ func TestProcessingResultsServiceHandleFailed_Ready(t *testing.T) { redisStore.EXPECT().SetAndMove(ctx, messageKey, b, sourceQueueKey, destQueueKey, mID).Return(nil) s := NewProcessingResultsService(timeSvc, redisStore) - requeuedInfo, err := s.HandleFailed(ctx, sink, m, processingErr) + queuedInfo, err := s.HandleFailed(ctx, sink, m, processingErr) assert.NoError(t, err) - assert.Equal(t, models.QueueStatusReady, requeuedInfo.QueueStatus) - assert.Equal(t, mUpdated.DeliverAfter, requeuedInfo.DeliverAfter) + assert.Equal(t, mID, queuedInfo.MessageID) + assert.Equal(t, models.QueueStatusReady, queuedInfo.QueueStatus) + assert.Equal(t, mUpdated.DeliverAfter, queuedInfo.DeliverAfter) } diff --git a/pkg/supervisor/ready.go b/pkg/supervisor/ready.go index 996f2f2..0789c28 100644 --- a/pkg/supervisor/ready.go +++ b/pkg/supervisor/ready.go @@ -48,11 +48,11 @@ func (s *Supervisor) FetchAndProcess(ctx context.Context, f *models.Flow, sink * processingErr := s.messageProcessor.Process(ctx, sink, m) if processingErr != nil { logger.Info("message processing failed") - requeuedInfo, err := s.processingResultsSvc.HandleFailed(ctx, sink, m, processingErr) + queuedInfo, err := s.processingResultsSvc.HandleFailed(ctx, sink, m, processingErr) if err != nil { return errors.Wrapf(err, "could not handle failed processing") } - logger.Info("message queued after failure", zap.String("queue", string(requeuedInfo.QueueStatus)), zap.Time("nextAttemptAfter", requeuedInfo.DeliverAfter)) + logger.Info("message queued after failure", zap.String("queue", string(queuedInfo.QueueStatus)), zap.Time("nextAttemptAfter", queuedInfo.DeliverAfter)) } else { logger.Info("message processed ok") err := s.processingResultsSvc.HandleOK(ctx, m) diff --git a/pkg/supervisor/ready_test.go b/pkg/supervisor/ready_test.go index f07da84..b8e5bc9 100644 --- a/pkg/supervisor/ready_test.go +++ b/pkg/supervisor/ready_test.go @@ -102,7 +102,7 @@ func TestSupervisorFetchAndProcess_Failed(t *testing.T) { messageFetcher.EXPECT().GetMessageForProcessing(ctx, appConf.Supervisor.ReadyWaitTime, flowId1, sinkID1).Return(m, nil) messageProcessor.EXPECT().Process(ctx, sink1, m).Return(processingErr) - processingResultsService.EXPECT().HandleFailed(ctx, sink1, m, processingErr).Return(&models.RequeuedInfo{QueueStatus: models.QueueStatusReady}, nil) + processingResultsService.EXPECT().HandleFailed(ctx, sink1, m, processingErr).Return(&models.QueuedInfo{QueueStatus: models.QueueStatusReady}, nil) logger, err := zap.NewDevelopment() assert.NoError(t, err) diff --git a/pkg/testsupport/mocks/mock_message_enqueuer.go b/pkg/testsupport/mocks/mock_message_enqueuer.go index a2a68f2..7f607dc 100644 --- a/pkg/testsupport/mocks/mock_message_enqueuer.go +++ b/pkg/testsupport/mocks/mock_message_enqueuer.go @@ -36,11 +36,12 @@ func (m *MockMessageEnqueuer) EXPECT() *MockMessageEnqueuerMockRecorder { } // Enqueue mocks base method. -func (m *MockMessageEnqueuer) Enqueue(ctx context.Context, messages []*models.Message) error { +func (m *MockMessageEnqueuer) Enqueue(ctx context.Context, messages []*models.Message) ([]*models.QueuedInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Enqueue", ctx, messages) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].([]*models.QueuedInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 } // Enqueue indicates an expected call of Enqueue. diff --git a/pkg/testsupport/mocks/mock_processing_results_service.go b/pkg/testsupport/mocks/mock_processing_results_service.go index 4471cda..7428fbf 100644 --- a/pkg/testsupport/mocks/mock_processing_results_service.go +++ b/pkg/testsupport/mocks/mock_processing_results_service.go @@ -36,10 +36,10 @@ func (m *MockProcessingResultsService) EXPECT() *MockProcessingResultsServiceMoc } // HandleFailed mocks base method. -func (m_2 *MockProcessingResultsService) HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.RequeuedInfo, error) { +func (m_2 *MockProcessingResultsService) HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.QueuedInfo, error) { m_2.ctrl.T.Helper() ret := m_2.ctrl.Call(m_2, "HandleFailed", ctx, sink, m, processingErr) - ret0, _ := ret[0].(*models.RequeuedInfo) + ret0, _ := ret[0].(*models.QueuedInfo) ret1, _ := ret[1].(error) return ret0, ret1 }