Skip to content

Commit

Permalink
working on comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ychebotarev committed Sep 24, 2024
1 parent 0e9b794 commit 2a7db79
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 22 deletions.
4 changes: 0 additions & 4 deletions service/history/workflow/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,5 @@ func updateActivityInfoForRetries(
ai.RetryLastWorkerIdentity = ai.StartedIdentity
ai.RetryLastFailure = failure

if ai.FirstScheduledTime == nil {
ai.FirstScheduledTime = ai.ScheduledTime
}

return ai
}
5 changes: 4 additions & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,7 @@ func (ms *MutableStateImpl) UpdateActivityInfo(

ai.Version = incomingActivityInfo.GetVersion()
ai.ScheduledTime = incomingActivityInfo.GetScheduledTime()
ai.FirstScheduledTime = incomingActivityInfo.GetScheduledTime()
// we don't need to update FirstScheduledTime
ai.StartedEventId = incomingActivityInfo.GetStartedEventId()
ai.LastHeartbeatUpdateTime = incomingActivityInfo.GetLastHeartbeatTime()
if ai.StartedEventId == common.EmptyEventID {
Expand Down Expand Up @@ -2854,6 +2854,9 @@ func (ms *MutableStateImpl) ApplyActivityTaskScheduledEvent(
firstEventID int64,
event *historypb.HistoryEvent,
) (*persistencespb.ActivityInfo, error) {
// QQQQQQQQ
println(fmt.Sprintf("ApplyActivityTaskScheduledEvent, %v", time.Now().UTC()))

attributes := event.GetActivityTaskScheduledEventAttributes()

scheduledEventID := event.GetEventId()
Expand Down
7 changes: 6 additions & 1 deletion service/history/workflow/timer_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,12 @@ func (t *timerSequenceImpl) getActivityScheduleToCloseTimeout(
return nil
}

timeoutTime := timestamp.TimeValue(activityInfo.FirstScheduledTime).Add(scheduleToCloseDuration)
var timeoutTime time.Time
if activityInfo.FirstScheduledTime != nil {
timeoutTime = timestamp.TimeValue(activityInfo.FirstScheduledTime).Add(scheduleToCloseDuration)
} else {
timeoutTime = timestamp.TimeValue(activityInfo.ScheduledTime).Add(scheduleToCloseDuration)
}

return &TimerSequenceID{
EventID: activityInfo.ScheduledEventId,
Expand Down
28 changes: 15 additions & 13 deletions tests/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"sync/atomic"
"time"
Expand Down Expand Up @@ -66,7 +65,6 @@ func (s *ActivityTestSuite) TestActivityScheduleToClose_FiredDuringBackoff() {
// activity will be scheduled twice. After second failure (that should happen at ~4.2 sec) next retry will not
// be scheduled because "schedule_to_close" will happen before retry happens
initialRetryInterval := time.Second * 2
workingInterval := time.Millisecond * 100
scheduleToCloseTimeout := 3 * time.Second
startToCloseTimeout := 1 * time.Second

Expand All @@ -79,7 +77,6 @@ func (s *ActivityTestSuite) TestActivityScheduleToClose_FiredDuringBackoff() {

var activityCompleted atomic.Int32
activityFunction := func() (string, error) {
time.Sleep(workingInterval) //nolint:forbidigo
activityErr := errors.New("bad-luck-please-retry") //nolint:goerr113
activityCompleted.Add(1)
return "", activityErr
Expand All @@ -93,8 +90,7 @@ func (s *ActivityTestSuite) TestActivityScheduleToClose_FiredDuringBackoff() {
ScheduleToCloseTimeout: scheduleToCloseTimeout,
RetryPolicy: activityRetryPolicy,
}), activityFunction).Get(ctx, &ret)
s.Error(err)
return "done!", nil
return "done!", err
}

s.worker.RegisterWorkflow(workflowFn)
Expand All @@ -113,8 +109,16 @@ func (s *ActivityTestSuite) TestActivityScheduleToClose_FiredDuringBackoff() {

var out string
err = workflowRun.Get(ctx, &out)
s.Equal(2, activityCompleted.Load())
s.NoError(err)

s.Error(err)
var wfExecutionError *temporal.WorkflowExecutionError
s.True(errors.As(err, &wfExecutionError))
var activityError *temporal.ActivityError
s.True(errors.As(wfExecutionError.Unwrap(), &activityError))
s.Equal(enumspb.RETRY_STATE_TIMEOUT, activityError.RetryState())

s.Equal(int32(2), activityCompleted.Load())

}

func (s *ActivityTestSuite) TestActivityScheduleToClose_FiredDuringActivityRun() {
Expand All @@ -138,7 +142,6 @@ func (s *ActivityTestSuite) TestActivityScheduleToClose_FiredDuringActivityRun()

var activityCompleted atomic.Int32
activityFunction := func() (string, error) {
println(fmt.Sprintf("Activity #%d", activityCompleted.Load()+1))
time.Sleep(workingInterval) //nolint:forbidigo
activityErr := errors.New("bad-luck-please-retry") //nolint:goerr113
lastActivityRun = time.Now().UTC()
Expand All @@ -163,17 +166,16 @@ func (s *ActivityTestSuite) TestActivityScheduleToClose_FiredDuringActivityRun()

workflowCompleteRun = time.Now().UTC()
// schedule to close timeout should fire while last activity is still running.
s.Equal(2, activityCompleted)
s.Equal(int32(2), activityCompleted.Load())
return "done!", nil
}

s.worker.RegisterWorkflow(workflowFn)
s.worker.RegisterActivity(activityFunction)

workflowOptions := sdkclient.StartWorkflowOptions{
ID: "functional-test-schedule_to_close_failed_while_running",
TaskQueue: s.taskQueue,
WorkflowRunTimeout: 10 * time.Second,
ID: s.T().Name(),
TaskQueue: s.taskQueue,
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand All @@ -184,7 +186,7 @@ func (s *ActivityTestSuite) TestActivityScheduleToClose_FiredDuringActivityRun()
err = workflowRun.Get(ctx, &out)
workflowCompleteRun = time.Now().UTC()
// we expect activities to be executed 3 times
s.Eventually(func() bool { return activityCompleted.Load() == 3 }, time.Second*10, time.Millisecond*500)
s.Eventually(func() bool { return activityCompleted.Load() == int32(3) }, time.Second*10, time.Millisecond*500)
s.True(lastActivityRun.After(workflowCompleteRun))
s.NoError(err)
}
Expand Down
4 changes: 1 addition & 3 deletions tests/activity_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down

0 comments on commit 2a7db79

Please sign in to comment.