Skip to content

Commit

Permalink
[UserMetadata] Pass the user metadata to new executions from schedule…
Browse files Browse the repository at this point in the history
… actions (#6462)

When user metadata is provided in the
[ScheduleAction](https://github.com/temporalio/api/blob/4a1ab98a84b94b59f82aab4b35ea505cff968804/temporal/api/schedule/v1/message.proto#L252-L259),
we were not passing it through to the new executions that were started
by the scheduler. This fixes it
(#6413).

To be feature complete. Every schedule action that contains user
metadata should pass it on to the newly created executions.

Unit tests.

N/A

N/A

No

---------

Co-authored-by: David Reiss <[email protected]>
  • Loading branch information
2 people authored and lina-temporal committed Aug 30, 2024
1 parent d09ee3b commit e0c72f1
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
1 change: 1 addition & 0 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,7 @@ func (s *scheduler) startWorkflow(
Header: newWorkflow.Header,
LastCompletionResult: lastCompletionResult,
ContinuedFailure: continuedFailure,
UserMetadata: newWorkflow.UserMetadata,
},
}
for {
Expand Down
26 changes: 22 additions & 4 deletions service/worker/scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
schedpb "go.temporal.io/api/schedule/v1"
sdkpb "go.temporal.io/api/sdk/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/sdk/testsuite"
Expand All @@ -50,6 +48,9 @@ import (
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/testing/protoassert"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

type (
Expand Down Expand Up @@ -113,7 +114,9 @@ func (s *workflowSuite) run(sched *schedpb.Schedule, iterations int) {
s.env.SetStartTime(baseStartTime)

// fill this in so callers don't need to
sched.Action = s.defaultAction("myid")
if sched.Action == nil {
sched.Action = s.defaultAction("myid")
}

s.env.ExecuteWorkflow(SchedulerWorkflow, &schedspb.StartScheduleArgs{
Schedule: sched,
Expand Down Expand Up @@ -336,6 +339,19 @@ func (s *workflowSuite) runAcrossContinue(
func (s *workflowSuite) TestStart() {
// written using low-level mocks so we can test all fields in the start request

userMetadata := &sdkpb.UserMetadata{
Summary: &commonpb.Payload{
Metadata: map[string][]byte{"test_key": []byte(`test_val`)},
Data: []byte(`Test summary Data`),
},
Details: &commonpb.Payload{
Metadata: map[string][]byte{"test_key": []byte(`test_val`)},
Data: []byte(`Test Details Data`),
},
}
action := s.defaultAction("myid")
action.Action.(*schedpb.ScheduleAction_StartWorkflow).StartWorkflow.UserMetadata = userMetadata

s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
s.True(time.Date(2022, 6, 1, 0, 15, 0, 0, time.UTC).Equal(s.now()))
s.Nil(req.Request.LastCompletionResult)
Expand All @@ -347,6 +363,7 @@ func (s *workflowSuite) TestStart() {
s.Equal(`"value"`, payload.ToString(req.Request.SearchAttributes.IndexedFields["myfield"]))
s.Equal(`"myschedule"`, payload.ToString(req.Request.SearchAttributes.IndexedFields[searchattribute.TemporalScheduledById]))
s.Equal(`"2022-06-01T00:15:00Z"`, payload.ToString(req.Request.SearchAttributes.IndexedFields[searchattribute.TemporalScheduledStartTime]))
protoassert.ProtoEqual(s.T(), userMetadata, req.Request.GetUserMetadata())

return nil, nil
})
Expand All @@ -357,6 +374,7 @@ func (s *workflowSuite) TestStart() {
Interval: durationpb.New(55 * time.Minute),
}},
},
Action: action,
}, 2)
// two iterations to start one workflow: first will sleep, second will start and then sleep again
s.True(s.env.IsWorkflowCompleted())
Expand Down

0 comments on commit e0c72f1

Please sign in to comment.