diff --git a/pkg/services/processing_results_service.go b/pkg/services/processing_results_service.go index 085a912..40a853b 100644 --- a/pkg/services/processing_results_service.go +++ b/pkg/services/processing_results_service.go @@ -29,17 +29,16 @@ func NewProcessingResultsService(timeSvc TimeService, redisStore RedisStore, ret } func (s *processingResultsService) HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.QueuedInfo, error) { - now := s.timeSvc.Now() m.DeliveryAttempts = append(m.DeliveryAttempts, &models.DeliveryAttempt{ - At: now, + At: s.timeSvc.Now(), Status: models.DeliveryAttemptStatusFailed, Error: processingErr.Error(), }, ) nextAttemptInterval := s.retryCalculator.NextAttemptInterval(len(m.DeliveryAttempts), sink.RetryInterval, sink.RetryExpMultiplier) - m.DeliverAfter = now.Add(nextAttemptInterval) + m.DeliverAfter = s.timeSvc.Now().Add(nextAttemptInterval) var maxAttempts int if sink.MaxAttempts == nil { @@ -67,7 +66,7 @@ func (s *processingResultsService) HandleFailed(ctx context.Context, sink *model return &models.QueuedInfo{MessageID: m.ID, QueueStatus: models.QueueStatusDead, DeliverAfter: m.DeliverAfter}, nil } - queueStatus := getQueueStatus(m, now) + queueStatus := getQueueStatus(m, s.timeSvc.Now()) destQueueKey := queueKey(m.FlowID, m.SinkID, queueStatus) switch queueStatus { @@ -106,7 +105,7 @@ func (s *processingResultsService) HandleOK(ctx context.Context, m *models.Messa } // update message and move to done - err = s.redisStore.SetAndMove(ctx, mKey, b, sourceQueueKey, destQueueKey, m.ID) + err = s.redisStore.SetLRemZAdd(ctx, mKey, b, sourceQueueKey, destQueueKey, m.ID, float64(s.timeSvc.Now().Unix())) if err != nil { return errors.Wrapf(err, "failed to set and move to done") } diff --git a/pkg/services/processing_results_service_test.go b/pkg/services/processing_results_service_test.go index 3c916a7..47837f4 100644 --- a/pkg/services/processing_results_service_test.go +++ b/pkg/services/processing_results_service_test.go @@ -24,7 +24,7 @@ func TestProcessingResultsServiceHandleOK(t *testing.T) { retryCalculator := mocks.NewMockRetryCalculator(ctrl) now := time.Date(2023, 05, 5, 8, 9, 12, 0, time.UTC) - timeSvc.EXPECT().Now().Return(now) + timeSvc.EXPECT().Now().AnyTimes().Return(now) flowID := "flow-1" sinkID := "sink-1" @@ -56,7 +56,7 @@ func TestProcessingResultsServiceHandleOK(t *testing.T) { b, err := json.Marshal(&mUpdated) assert.NoError(t, err) - redisStore.EXPECT().SetAndMove(ctx, messageKey, b, sourceQueueKey, destQueueKey, mID).Return(nil) + redisStore.EXPECT().SetLRemZAdd(ctx, messageKey, b, sourceQueueKey, destQueueKey, mID, float64(now.Unix())).Return(nil) s := NewProcessingResultsService(timeSvc, redisStore, retryCalculator) err = s.HandleOK(ctx, m) @@ -74,7 +74,7 @@ func TestProcessingResultsServiceHandleFailed_Dead(t *testing.T) { retryCalculator := mocks.NewMockRetryCalculator(ctrl) now := time.Date(2023, 05, 5, 8, 9, 12, 0, time.UTC) - timeSvc.EXPECT().Now().Return(now) + timeSvc.EXPECT().Now().AnyTimes().Return(now) flowID := "flow-1" sinkID := "sink-1" @@ -146,7 +146,7 @@ func TestProcessingResultsServiceHandleFailed_Scheduled(t *testing.T) { retryCalculator := mocks.NewMockRetryCalculator(ctrl) now := time.Date(2023, 05, 5, 8, 9, 12, 0, time.UTC) - timeSvc.EXPECT().Now().Return(now) + timeSvc.EXPECT().Now().AnyTimes().Return(now) flowID := "flow-1" sinkID := "sink-1" @@ -217,7 +217,7 @@ func TestProcessingResultsServiceHandleFailed_Ready(t *testing.T) { retryCalculator := mocks.NewMockRetryCalculator(ctrl) now := time.Date(2023, 05, 5, 8, 9, 12, 0, time.UTC) - timeSvc.EXPECT().Now().Return(now) + timeSvc.EXPECT().Now().AnyTimes().Return(now) flowID := "flow-1" sinkID := "sink-1"