Skip to content

Commit

Permalink
Merge branch 'next-server' into slog-integration
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy authored Dec 19, 2024
2 parents 4b7069c + 3185ff3 commit f9c562d
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 7 deletions.
66 changes: 65 additions & 1 deletion temporalcli/commands.activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"time"

"github.com/google/uuid"
"github.com/temporalio/cli/temporalcli/internal/printer"
activitypb "go.temporal.io/api/activity/v1"
"go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -165,7 +166,8 @@ func (c *TemporalActivityUpdateOptionsCommand) run(cctx *CommandContext, args []
Paths: updatePath,
},

Identity: c.Identity,
Identity: c.Identity,
RequestId: uuid.NewString(),
})
if err != nil {
return fmt.Errorf("unable to update Activity options: %w", err)
Expand All @@ -189,3 +191,65 @@ func (c *TemporalActivityUpdateOptionsCommand) run(cctx *CommandContext, args []

return nil
}

func (c *TemporalActivityPauseCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

_, err = cl.WorkflowService().PauseActivityById(cctx, &workflowservice.PauseActivityByIdRequest{
Namespace: c.Parent.Namespace,
WorkflowId: c.WorkflowId,
RunId: c.RunId,
ActivityId: c.ActivityId,
Identity: c.Identity,
RequestId: uuid.NewString(),
})
if err != nil {
return fmt.Errorf("unable to update Activity options: %w", err)
}

return nil
}

func (c *TemporalActivityUnpauseCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

request := &workflowservice.UnpauseActivityByIdRequest{
Namespace: c.Parent.Namespace,
WorkflowId: c.WorkflowId,
RunId: c.RunId,
ActivityId: c.ActivityId,
Identity: c.Identity,
}
if c.Reset {
request.Operation = &workflowservice.UnpauseActivityByIdRequest_Reset_{
Reset_: &workflowservice.UnpauseActivityByIdRequest_ResetOperation{
NoWait: c.NoWait,
ResetHeartbeat: c.ResetHeartbeats,
},
}
} else {
if c.ResetHeartbeats {
return fmt.Errorf("reset-heartbeats flag can only be used with reset flag")
}
request.Operation = &workflowservice.UnpauseActivityByIdRequest_Resume{
Resume: &workflowservice.UnpauseActivityByIdRequest_ResumeOperation{
NoWait: c.NoWait,
},
}
}

_, err = cl.WorkflowService().UnpauseActivityById(cctx, request)
if err != nil {
return fmt.Errorf("unable to uppause an Activity: %w", err)
}

return nil
}
50 changes: 50 additions & 0 deletions temporalcli/commands.activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,56 @@ func (s *SharedServerSuite) TestActivityOptionsUpdate_Partial() {
s.ContainsOnSameLine(out, "BackoffCoefficient", "2")
}

func (s *SharedServerSuite) TestActivityPauseUnpause() {
run := s.waitActivityStarted()
wid := run.GetID()
aid := "dev-activity-id"
identity := "MyIdentity"

res := s.Execute(
"activity", "pause",
"--activity-id", aid,
"--workflow-id", wid,
"--run-id", run.GetRunID(),
"--identity", identity,
"--address", s.Address(),
)

s.NoError(res.Err)

res = s.Execute(
"activity", "unpause",
"--activity-id", aid,
"--workflow-id", wid,
"--run-id", run.GetRunID(),
"--identity", identity,
"--address", s.Address(),
"--reset",
)

s.NoError(res.Err)
}

func (s *SharedServerSuite) TestActivityUnPause_Failed() {
run := s.waitActivityStarted()
wid := run.GetID()
aid := "dev-activity-id"
identity := "MyIdentity"

// should fail because --reset-heartbeat is provided, but --reset flag is missing
res := s.Execute(
"activity", "unpause",
"--activity-id", aid,
"--workflow-id", wid,
"--run-id", run.GetRunID(),
"--identity", identity,
"--address", s.Address(),
"--reset-heartbeats",
)

s.Error(res.Err)
}

// Test helpers

func (s *SharedServerSuite) waitActivityStarted() client.WorkflowRun {
Expand Down
78 changes: 75 additions & 3 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,17 @@ func NewTemporalActivityCommand(cctx *CommandContext, parent *TemporalCommand) *
var s TemporalActivityCommand
s.Parent = parent
s.Command.Use = "activity"
s.Command.Short = "Complete, update or fail an Activity"
s.Command.Short = "Complete, update, pause, unpause or fail an Activity"
if hasHighlighting {
s.Command.Long = "Update an Activity's options or update an Activity's state to completed \nor failed.\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n\x1b[1mtemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\x1b[0m"
s.Command.Long = "Update an Activity's options or update an Activity's state to completed \nor failed.\n\nPause or unpause an Activity.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n\x1b[1mtemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\x1b[0m"
} else {
s.Command.Long = "Update an Activity's options or update an Activity's state to completed \nor failed.\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n```\ntemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\n```"
s.Command.Long = "Update an Activity's options or update an Activity's state to completed \nor failed.\n\nPause or unpause an Activity.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n```\ntemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.AddCommand(&NewTemporalActivityCompleteCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityFailCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityPauseCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityUnpauseCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityUpdateOptionsCommand(cctx, &s).Command)
s.ClientOptions.buildFlags(cctx, s.Command.PersistentFlags())
return &s
Expand Down Expand Up @@ -411,6 +413,76 @@ func NewTemporalActivityFailCommand(cctx *CommandContext, parent *TemporalActivi
return &s
}

type TemporalActivityPauseCommand struct {
Parent *TemporalActivityCommand
Command cobra.Command
WorkflowReferenceOptions
ActivityId string
Identity string
}

func NewTemporalActivityPauseCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityPauseCommand {
var s TemporalActivityPauseCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "pause [flags]"
s.Command.Short = "Pause an Activity"
if hasHighlighting {
s.Command.Long = "Pause an Activity.\n\nIf the Activity is not currently running (e.g. because it previously\nfailed), it will not be run again until it is unpaused.\n\nHowever, if the Activity is currently running, it will run to completion.\nIf the Activity is on its last retry attempt and fails, the failure will\nbe returned to the caller, just as if the Activity had not been paused.\n\nSpecify the Activity and Workflow IDs:\n\n\x1b[1mtemporal activity pause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\x1b[0m"
} else {
s.Command.Long = "Pause an Activity.\n\nIf the Activity is not currently running (e.g. because it previously\nfailed), it will not be run again until it is unpaused.\n\nHowever, if the Activity is currently running, it will run to completion.\nIf the Activity is on its last retry attempt and fails, the failure will\nbe returned to the caller, just as if the Activity had not been paused.\n\nSpecify the Activity and Workflow IDs:\n\n```\ntemporal activity pause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID to pause. Required.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id")
s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of the user submitting this request.")
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalActivityUnpauseCommand struct {
Parent *TemporalActivityCommand
Command cobra.Command
WorkflowReferenceOptions
ActivityId string
Identity string
Reset bool
NoWait bool
ResetHeartbeats bool
}

func NewTemporalActivityUnpauseCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityUnpauseCommand {
var s TemporalActivityUnpauseCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "unpause [flags]"
s.Command.Short = "Unpause an Activity"
if hasHighlighting {
s.Command.Long = "Re-schedule a previously-paused Activity for execution.\n\nIf the Activity is not running and is past its retry timeout, it will be\nscheduled immediately. Otherwise, it will be scheduled after its retry\ntimeout expires. The Activity can be retried immediately with \x1b[1m--no-wait\x1b[0m.\n\nUse \x1b[1m--reset\x1b[0m to reset the number of previous run attempts to zero. For\nexample, if an Activity is near the maximum number of attempts N specified\nin its retry policy, \x1b[1m--reset\x1b[0m will allow the Activity to be retried\nanother N times after unpausing.\n\nSpecify the Activity and Workflow IDs:\n\n\x1b[1mtemporal activity unpause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --reset\n --no-wait\n --reset-heartbeats\x1b[0m"
} else {
s.Command.Long = "Re-schedule a previously-paused Activity for execution.\n\nIf the Activity is not running and is past its retry timeout, it will be\nscheduled immediately. Otherwise, it will be scheduled after its retry\ntimeout expires. The Activity can be retried immediately with `--no-wait`.\n\nUse `--reset` to reset the number of previous run attempts to zero. For\nexample, if an Activity is near the maximum number of attempts N specified\nin its retry policy, `--reset` will allow the Activity to be retried\nanother N times after unpausing.\n\nSpecify the Activity and Workflow IDs:\n\n```\ntemporal activity unpause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --reset\n --no-wait\n --reset-heartbeats\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID to pause. Required.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id")
s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of the user submitting this request.")
s.Command.Flags().BoolVar(&s.Reset, "reset", false, "Also reset the activity.")
s.Command.Flags().BoolVar(&s.NoWait, "no-wait", false, "Schedule the Activity immediately, even if its retry timeout has not expired.")
s.Command.Flags().BoolVar(&s.ResetHeartbeats, "reset-heartbeats", false, "Reset the Activity's heartbeat timeout. Only works with --reset.")
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalActivityUpdateOptionsCommand struct {
Parent *TemporalActivityCommand
Command cobra.Command
Expand Down
87 changes: 84 additions & 3 deletions temporalcli/commandsgen/commands.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,13 @@ commands:
The command execution timeout. 0s means no timeout.
- name: temporal activity
summary: Complete, update or fail an Activity
summary: Complete, update, pause, unpause or fail an Activity
description: |
Update an Activity's options or update an Activity's state to completed
or failed.
Pause or unpause an Activity.
Updating activity state marks an Activity as successfully finished or as
having encountered an error.
Expand All @@ -225,6 +228,8 @@ commands:
- activity
- activity complete
- activity update-options
- activity pause
- activity unpause
- activity execution
- activity fail
- cli reference
Expand Down Expand Up @@ -287,8 +292,6 @@ commands:
option-sets:
- workflow reference



- name: temporal activity update-options
summary: Update Activity options
description: |
Expand Down Expand Up @@ -374,6 +377,84 @@ commands:
option-sets:
- workflow reference

- name: temporal activity pause
summary: Pause an Activity
description: |
Pause an Activity.
If the Activity is not currently running (e.g. because it previously
failed), it will not be run again until it is unpaused.
However, if the Activity is currently running, it will run to completion.
If the Activity is on its last retry attempt and fails, the failure will
be returned to the caller, just as if the Activity had not been paused.
Specify the Activity and Workflow IDs:
```
temporal activity pause \
--activity-id YourActivityId \
--workflow-id YourWorkflowId
```
options:
- name: activity-id
short: a
type: string
description: Activity ID to pause.
required: true
- name: identity
type: string
description: Identity of the user submitting this request.
option-sets:
- workflow reference

- name: temporal activity unpause
summary: Unpause an Activity
description: |
Re-schedule a previously-paused Activity for execution.
If the Activity is not running and is past its retry timeout, it will be
scheduled immediately. Otherwise, it will be scheduled after its retry
timeout expires. The Activity can be retried immediately with `--no-wait`.
Use `--reset` to reset the number of previous run attempts to zero. For
example, if an Activity is near the maximum number of attempts N specified
in its retry policy, `--reset` will allow the Activity to be retried
another N times after unpausing.
Specify the Activity and Workflow IDs:
```
temporal activity unpause \
--activity-id YourActivityId \
--workflow-id YourWorkflowId
--reset
--no-wait
--reset-heartbeats
```
options:
- name: activity-id
short: a
type: string
description: Activity ID to pause.
required: true
- name: identity
type: string
description: Identity of the user submitting this request.
- name: reset
type: bool
description: Also reset the activity.
- name: no-wait
type: bool
description: |
Schedule the Activity immediately, even if its retry timeout has not expired.
- name: reset-heartbeats
type: bool
description: Reset the Activity's heartbeat timeout. Only works with --reset.
option-sets:
- workflow reference


- name: temporal batch
summary: Manage running batch jobs
description: |
Expand Down

0 comments on commit f9c562d

Please sign in to comment.