Skip to content

Commit

Permalink
Fix schedule_to_close not triggering after retry issue
Browse files Browse the repository at this point in the history
  • Loading branch information
ychebotarev committed Sep 23, 2024
1 parent 8681705 commit a609af2
Show file tree
Hide file tree
Showing 11 changed files with 865 additions and 612 deletions.
1,217 changes: 615 additions & 602 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ message ActivityInfo {
// workflows redirect_counter value when this activity started last time
int64 last_redirect_counter = 2;
}

google.protobuf.Timestamp first_scheduled_time = 39;
}

// timer_map column
Expand Down
3 changes: 2 additions & 1 deletion service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,9 @@ func (r *workflowResetterImpl) failInflightActivity(
switch ai.StartedEventId {
case common.EmptyEventID:
// activity not started, noop
// override the activity time to now
// override the scheduled activity time to now
ai.ScheduledTime = timestamppb.New(now)
ai.FirstScheduledTime = timestamppb.New(now)
if err := mutableState.UpdateActivity(ai); err != nil {
return err
}
Expand Down
19 changes: 11 additions & 8 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,15 +461,17 @@ func (s *workflowResetterSuite) TestFailInflightActivity() {
Version: 12,
ScheduledEventId: 123,
ScheduledTime: timestamppb.New(now.Add(-10 * time.Second)),
FirstScheduledTime: timestamppb.New(now.Add(-10 * time.Second)),
StartedEventId: 124,
LastHeartbeatDetails: payloads.EncodeString("some random activity 1 details"),
StartedIdentity: "some random activity 1 started identity",
}
activity2 := &persistencespb.ActivityInfo{
Version: 12,
ScheduledEventId: 456,
ScheduledTime: timestamppb.New(now.Add(-10 * time.Second)),
StartedEventId: common.EmptyEventID,
Version: 12,
ScheduledEventId: 456,
ScheduledTime: timestamppb.New(now.Add(-10 * time.Second)),
FirstScheduledTime: timestamppb.New(now.Add(-10 * time.Second)),
StartedEventId: common.EmptyEventID,
}
mutableState.EXPECT().GetPendingActivityInfos().Return(map[int64]*persistencespb.ActivityInfo{
activity1.ScheduledEventId: activity1,
Expand All @@ -486,10 +488,11 @@ func (s *workflowResetterSuite) TestFailInflightActivity() {
).Return(&historypb.HistoryEvent{}, nil)

mutableState.EXPECT().UpdateActivity(&persistencespb.ActivityInfo{
Version: activity2.Version,
ScheduledEventId: activity2.ScheduledEventId,
ScheduledTime: timestamppb.New(now),
StartedEventId: activity2.StartedEventId,
Version: activity2.Version,
ScheduledEventId: activity2.ScheduledEventId,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: activity2.StartedEventId,
}).Return(nil)

err := s.workflowResetter.failInflightActivity(now, mutableState, terminateReason)
Expand Down
6 changes: 6 additions & 0 deletions service/history/timer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package history
import (
"context"
"fmt"
"time"

"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -215,6 +216,11 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(
ctx, cancel := context.WithTimeout(ctx, taskTimeout)
defer cancel()

// QQQQQQQ
println(fmt.Sprintf("In executeActivityTimeoutTask - %v - %v",
task.TimeoutType.String(),
time.Now().In(time.UTC)))

weContext, release, err := getWorkflowExecutionContextForTask(ctx, t.shardContext, t.cache, task)
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions service/history/workflow/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,10 @@ func updateActivityInfoForRetries(
ai.TimerTaskStatus = TimerTaskStatusNone
ai.RetryLastWorkerIdentity = ai.StartedIdentity
ai.RetryLastFailure = failure

if ai.FirstScheduledTime == nil {
ai.FirstScheduledTime = ai.ScheduledTime
}

return ai
}
2 changes: 2 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,7 @@ func (ms *MutableStateImpl) UpdateActivityInfo(

ai.Version = incomingActivityInfo.GetVersion()
ai.ScheduledTime = incomingActivityInfo.GetScheduledTime()
ai.FirstScheduledTime = incomingActivityInfo.GetScheduledTime()
ai.StartedEventId = incomingActivityInfo.GetStartedEventId()
ai.LastHeartbeatUpdateTime = incomingActivityInfo.GetLastHeartbeatTime()
if ai.StartedEventId == common.EmptyEventID {
Expand Down Expand Up @@ -2863,6 +2864,7 @@ func (ms *MutableStateImpl) ApplyActivityTaskScheduledEvent(
ScheduledEventId: scheduledEventID,
ScheduledEventBatchId: firstEventID,
ScheduledTime: event.GetEventTime(),
FirstScheduledTime: event.GetEventTime(),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: attributes.ActivityId,
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflow/timer_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ func (t *timerSequenceImpl) getActivityScheduleToCloseTimeout(
return nil
}

timeoutTime := timestamp.TimeValue(activityInfo.ScheduledTime).Add(scheduleToCloseDuration)
timeoutTime := timestamp.TimeValue(activityInfo.FirstScheduledTime).Add(scheduleToCloseDuration)
//timeoutTime := timestamp.TimeValue(activityInfo.ScheduledTime).Add(scheduleToCloseDuration)

return &TimerSequenceID{
EventID: activityInfo.ScheduledEventId,
Expand Down
49 changes: 49 additions & 0 deletions service/history/workflow/timer_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_NotCreated_AfterWorkflo
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -353,6 +354,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_NotCreated_BeforeWorkfl
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -394,6 +396,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_NotCreated_NoWorkflowEx
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -435,6 +438,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_HeartbeatTimer_AfterWor
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -463,6 +467,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_HeartbeatTimer_BeforeWo
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -506,6 +511,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_HeartbeatTimer_NoWorkfl
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -653,6 +659,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_One_Scheduled_NotStar
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -692,6 +699,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_One_Scheduled_Started
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -738,6 +746,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_One_Scheduled_Started
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -777,6 +786,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_One_Scheduled_Started
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -823,6 +833,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_One_Scheduled_Started
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -862,6 +873,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_Multiple() {
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand All @@ -877,6 +889,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_Multiple() {
Version: 123,
ScheduledEventId: 2345,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: "other random activity ID",
Expand Down Expand Up @@ -1140,6 +1153,7 @@ func (s *timerSequenceSuite) TestGetActivityScheduleToCloseTimeout_WithTimeout_S
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -1570,3 +1584,38 @@ func (s *timerSequenceSuite) TestLess_CompareType() {
s.True(timerSequenceIDs.Less(0, 1))
s.False(timerSequenceIDs.Less(1, 0))
}

func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_FirstScheduledTime() {
now := time.Now().UTC()
activityInfo := &persistencespb.ActivityInfo{
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(10),
ScheduleToCloseTimeout: timestamp.DurationFromSeconds(1000),
StartToCloseTimeout: timestamp.DurationFromSeconds(100),
HeartbeatTimeout: timestamp.DurationFromSeconds(1),
TimerTaskStatus: TimerTaskStatusCreatedScheduleToClose | TimerTaskStatusCreatedScheduleToStart,
Attempt: 12,
}
activityInfo.FirstScheduledTime = timestamppb.New(now.Add(1 * time.Second))
activityInfos := map[int64]*persistencespb.ActivityInfo{activityInfo.ScheduledEventId: activityInfo}
s.mockMutableState.EXPECT().GetPendingActivityInfos().Return(activityInfos)

timerSequenceIDs := s.timerSequence.LoadAndSortActivityTimers()
s.Equal([]TimerSequenceID{
{
EventID: activityInfo.ScheduledEventId,
Timestamp: activityInfo.ScheduledTime.AsTime().Add(activityInfo.ScheduleToStartTimeout.AsDuration()),
TimerType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START,
TimerCreated: true,
Attempt: activityInfo.Attempt,
},
{
EventID: activityInfo.ScheduledEventId,
Timestamp: activityInfo.FirstScheduledTime.AsTime().Add(activityInfo.ScheduleToCloseTimeout.AsDuration()),
TimerType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
TimerCreated: true,
Attempt: activityInfo.Attempt,
},
}, timerSequenceIDs)
}
Loading

0 comments on commit a609af2

Please sign in to comment.