Skip to content

Commit

Permalink
Add CLI command for Reset activity API (#732)
Browse files Browse the repository at this point in the history
<!--- Note to EXTERNAL Contributors -->
<!-- Thanks for opening a PR! 
If it is a significant code change, please **make sure there is an open
issue** for this.
We work best with you when we have accepted the idea first before you
code. -->

<!--- For ALL Contributors 👇 -->

## What was changed
<!-- Describe what has changed in this PR -->
Add CLI support for Reset activity API

## Why?
<!-- Tell your future self why have you made these changes -->
Part of the activity API work.

---------

Co-authored-by: Chad Retz <[email protected]>
  • Loading branch information
ychebotarev and cretz authored Jan 8, 2025
1 parent e110570 commit 17ff0ac
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 8 deletions.
37 changes: 37 additions & 0 deletions temporalcli/commands.activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,40 @@ func (c *TemporalActivityUnpauseCommand) run(cctx *CommandContext, args []string

return nil
}

func (c *TemporalActivityResetCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

request := &workflowservice.ResetActivityByIdRequest{
Namespace: c.Parent.Namespace,
WorkflowId: c.WorkflowId,
RunId: c.RunId,
ActivityId: c.ActivityId,
Identity: c.Identity,
NoWait: c.NoWait,
ResetHeartbeat: c.ResetHeartbeats,
}

resp, err := cl.WorkflowService().ResetActivityById(cctx, request)
if err != nil {
return fmt.Errorf("unable to reset an Activity: %w", err)
}

resetResponse := struct {
NoWait bool `json:"noWait"`
ResetHeartbeats bool `json:"resetHeartbeats"`
ServerResponse bool `json:"-"`
}{
ServerResponse: resp != nil,
NoWait: c.NoWait,
ResetHeartbeats: c.ResetHeartbeats,
}

_ = cctx.Printer.PrintStructured(resetResponse, printer.StructuredOptions{})

return nil
}
38 changes: 38 additions & 0 deletions temporalcli/commands.activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"go.temporal.io/api/enums/v1"
"go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/sdk/client"
)

Expand Down Expand Up @@ -231,6 +232,43 @@ func (s *SharedServerSuite) TestActivityUnPause_Failed() {
s.Error(res.Err)
}

func (s *SharedServerSuite) TestActivityReset() {
run := s.waitActivityStarted()
wid := run.GetID()
aid := "dev-activity-id"
identity := "MyIdentity"

res := s.Execute(
"activity", "reset",
"--activity-id", aid,
"--workflow-id", wid,
"--run-id", run.GetRunID(),
"--identity", identity,
"--address", s.Address(),
)

s.NoError(res.Err)
// make sure we receive a server response
out := res.Stdout.String()
s.ContainsOnSameLine(out, "ServerResponse", "true")

// reset should fail because activity is not found

res = s.Execute(
"activity", "reset",
"--activity-id", "fake_id",
"--workflow-id", wid,
"--run-id", run.GetRunID(),
"--identity", identity,
"--address", s.Address(),
)

s.Error(res.Err)
// make sure we receive a NotFound error from the server`
var notFound *serviceerror.NotFound
s.ErrorAs(res.Err, &notFound)
}

// Test helpers

func (s *SharedServerSuite) waitActivityStarted() client.WorkflowRun {
Expand Down
43 changes: 40 additions & 3 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,16 +326,17 @@ func NewTemporalActivityCommand(cctx *CommandContext, parent *TemporalCommand) *
var s TemporalActivityCommand
s.Parent = parent
s.Command.Use = "activity"
s.Command.Short = "Complete, update, pause, unpause or fail an Activity"
s.Command.Short = "Complete, update, pause, unpause, reset or fail an Activity"
if hasHighlighting {
s.Command.Long = "Update an Activity's options or update an Activity's state to completed \nor failed.\n\nPause or unpause an Activity.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n\x1b[1mtemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\x1b[0m"
s.Command.Long = "Update an Activity's options, manage activity lifecycle or update \nan Activity's state to completed or failed.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n\x1b[1mtemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\x1b[0m"
} else {
s.Command.Long = "Update an Activity's options or update an Activity's state to completed \nor failed.\n\nPause or unpause an Activity.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n```\ntemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\n```"
s.Command.Long = "Update an Activity's options, manage activity lifecycle or update \nan Activity's state to completed or failed.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n```\ntemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.AddCommand(&NewTemporalActivityCompleteCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityFailCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityPauseCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityResetCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityUnpauseCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityUpdateOptionsCommand(cctx, &s).Command)
s.ClientOptions.buildFlags(cctx, s.Command.PersistentFlags())
Expand Down Expand Up @@ -445,6 +446,42 @@ func NewTemporalActivityPauseCommand(cctx *CommandContext, parent *TemporalActiv
return &s
}

type TemporalActivityResetCommand struct {
Parent *TemporalActivityCommand
Command cobra.Command
WorkflowReferenceOptions
ActivityId string
Identity string
NoWait bool
ResetHeartbeats bool
}

func NewTemporalActivityResetCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityResetCommand {
var s TemporalActivityResetCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "reset [flags]"
s.Command.Short = "Reset an Activity"
if hasHighlighting {
s.Command.Long = "Resetting an activity resets both the number of attempts and the activity \ntimeout. If activity is paused, it will be un-paused. \n\nIf the \x1b[1mno-wait\x1b[0m flag is provided, the activity will be rescheduled \nimmediately. Even if the activity is currently running.\nIf the \x1b[1mno-wait\x1b[0m flag is not provided, the activity will be scheduled \nafter the current instance completes, if needed. \nIf the 'reset_heartbeats' flag is set, the activity heartbeat timer and \nheartbeats will be reset.\n\nSpecify the Activity and Workflow IDs:\n\n\x1b[1mtemporal activity reset \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --no-wait\n --reset-heartbeats\x1b[0m"
} else {
s.Command.Long = "Resetting an activity resets both the number of attempts and the activity \ntimeout. If activity is paused, it will be un-paused. \n\nIf the `no-wait` flag is provided, the activity will be rescheduled \nimmediately. Even if the activity is currently running.\nIf the `no-wait` flag is not provided, the activity will be scheduled \nafter the current instance completes, if needed. \nIf the 'reset_heartbeats' flag is set, the activity heartbeat timer and \nheartbeats will be reset.\n\nSpecify the Activity and Workflow IDs:\n\n```\ntemporal activity reset \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --no-wait\n --reset-heartbeats\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID to pause. Required.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id")
s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of the user submitting this request.")
s.Command.Flags().BoolVar(&s.NoWait, "no-wait", false, "Schedule the Activity immediately, even if its retry timeout has not expired or the activity is currently running.")
s.Command.Flags().BoolVar(&s.ResetHeartbeats, "reset-heartbeats", false, "Reset the Activity's heartbeat.")
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalActivityUnpauseCommand struct {
Parent *TemporalActivityCommand
Command cobra.Command
Expand Down
52 changes: 47 additions & 5 deletions temporalcli/commandsgen/commands.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,10 @@ commands:
The command execution timeout. 0s means no timeout.
- name: temporal activity
summary: Complete, update, pause, unpause or fail an Activity
summary: Complete, update, pause, unpause, reset or fail an Activity
description: |
Update an Activity's options or update an Activity's state to completed
or failed.
Pause or unpause an Activity.
Update an Activity's options, manage activity lifecycle or update
an Activity's state to completed or failed.
Updating activity state marks an Activity as successfully finished or as
having encountered an error.
Expand All @@ -230,6 +228,7 @@ commands:
- activity update-options
- activity pause
- activity unpause
- activity reset
- activity execution
- activity fail
- cli reference
Expand Down Expand Up @@ -454,6 +453,49 @@ commands:
option-sets:
- workflow reference

- name: temporal activity reset
summary: Reset an Activity
description: |
Resetting an activity resets both the number of attempts and the activity
timeout. If activity is paused, it will be un-paused.
If the `no-wait` flag is provided, the activity will be rescheduled
immediately. Even if the activity is currently running.
If the `no-wait` flag is not provided, the activity will be scheduled
after the current instance completes, if needed.
If the 'reset_heartbeats' flag is set, the activity heartbeat timer and
heartbeats will be reset.
Specify the Activity and Workflow IDs:
```
temporal activity reset \
--activity-id YourActivityId \
--workflow-id YourWorkflowId
--no-wait
--reset-heartbeats
```
options:
- name: activity-id
short: a
type: string
description: Activity ID to pause.
required: true
- name: identity
type: string
description: Identity of the user submitting this request.
- name: no-wait
type: bool
description: |
Schedule the Activity immediately, even if its retry timeout has not
expired or the activity is currently running.
- name: reset-heartbeats
type: bool
description: Reset the Activity's heartbeat.
option-sets:
- workflow reference



- name: temporal batch
summary: Manage running batch jobs
Expand Down

0 comments on commit 17ff0ac

Please sign in to comment.