From 17ff0aca68deb95e377fec343f5ad8030d774f43 Mon Sep 17 00:00:00 2001 From: Yuri Date: Wed, 8 Jan 2025 11:55:11 -0800 Subject: [PATCH] Add CLI command for Reset activity API (#732) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What was changed Add CLI support for Reset activity API ## Why? Part of the activity API work. --------- Co-authored-by: Chad Retz --- temporalcli/commands.activity.go | 37 +++++++++++++++++++ temporalcli/commands.activity_test.go | 38 ++++++++++++++++++++ temporalcli/commands.gen.go | 43 ++++++++++++++++++++-- temporalcli/commandsgen/commands.yml | 52 ++++++++++++++++++++++++--- 4 files changed, 162 insertions(+), 8 deletions(-) diff --git a/temporalcli/commands.activity.go b/temporalcli/commands.activity.go index 0eb608c0..c0953f88 100644 --- a/temporalcli/commands.activity.go +++ b/temporalcli/commands.activity.go @@ -253,3 +253,40 @@ func (c *TemporalActivityUnpauseCommand) run(cctx *CommandContext, args []string return nil } + +func (c *TemporalActivityResetCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + request := &workflowservice.ResetActivityByIdRequest{ + Namespace: c.Parent.Namespace, + WorkflowId: c.WorkflowId, + RunId: c.RunId, + ActivityId: c.ActivityId, + Identity: c.Identity, + NoWait: c.NoWait, + ResetHeartbeat: c.ResetHeartbeats, + } + + resp, err := cl.WorkflowService().ResetActivityById(cctx, request) + if err != nil { + return fmt.Errorf("unable to reset an Activity: %w", err) + } + + resetResponse := struct { + NoWait bool `json:"noWait"` + ResetHeartbeats bool `json:"resetHeartbeats"` + ServerResponse bool `json:"-"` + }{ + ServerResponse: resp != nil, + NoWait: c.NoWait, + ResetHeartbeats: c.ResetHeartbeats, + } + + _ = cctx.Printer.PrintStructured(resetResponse, printer.StructuredOptions{}) + + return nil +} diff --git a/temporalcli/commands.activity_test.go b/temporalcli/commands.activity_test.go index cbb5c747..fc2f3e4e 100644 --- a/temporalcli/commands.activity_test.go +++ b/temporalcli/commands.activity_test.go @@ -6,6 +6,7 @@ import ( "go.temporal.io/api/enums/v1" "go.temporal.io/api/history/v1" + "go.temporal.io/api/serviceerror" "go.temporal.io/sdk/client" ) @@ -231,6 +232,43 @@ func (s *SharedServerSuite) TestActivityUnPause_Failed() { s.Error(res.Err) } +func (s *SharedServerSuite) TestActivityReset() { + run := s.waitActivityStarted() + wid := run.GetID() + aid := "dev-activity-id" + identity := "MyIdentity" + + res := s.Execute( + "activity", "reset", + "--activity-id", aid, + "--workflow-id", wid, + "--run-id", run.GetRunID(), + "--identity", identity, + "--address", s.Address(), + ) + + s.NoError(res.Err) + // make sure we receive a server response + out := res.Stdout.String() + s.ContainsOnSameLine(out, "ServerResponse", "true") + + // reset should fail because activity is not found + + res = s.Execute( + "activity", "reset", + "--activity-id", "fake_id", + "--workflow-id", wid, + "--run-id", run.GetRunID(), + "--identity", identity, + "--address", s.Address(), + ) + + s.Error(res.Err) + // make sure we receive a NotFound error from the server` + var notFound *serviceerror.NotFound + s.ErrorAs(res.Err, ¬Found) +} + // Test helpers func (s *SharedServerSuite) waitActivityStarted() client.WorkflowRun { diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index fb14d057..478d1178 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -326,16 +326,17 @@ func NewTemporalActivityCommand(cctx *CommandContext, parent *TemporalCommand) * var s TemporalActivityCommand s.Parent = parent s.Command.Use = "activity" - s.Command.Short = "Complete, update, pause, unpause or fail an Activity" + s.Command.Short = "Complete, update, pause, unpause, reset or fail an Activity" if hasHighlighting { - s.Command.Long = "Update an Activity's options or update an Activity's state to completed \nor failed.\n\nPause or unpause an Activity.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n\x1b[1mtemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\x1b[0m" + s.Command.Long = "Update an Activity's options, manage activity lifecycle or update \nan Activity's state to completed or failed.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n\x1b[1mtemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\x1b[0m" } else { - s.Command.Long = "Update an Activity's options or update an Activity's state to completed \nor failed.\n\nPause or unpause an Activity.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n```\ntemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\n```" + s.Command.Long = "Update an Activity's options, manage activity lifecycle or update \nan Activity's state to completed or failed.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n```\ntemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\n```" } s.Command.Args = cobra.NoArgs s.Command.AddCommand(&NewTemporalActivityCompleteCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalActivityFailCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalActivityPauseCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalActivityResetCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalActivityUnpauseCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalActivityUpdateOptionsCommand(cctx, &s).Command) s.ClientOptions.buildFlags(cctx, s.Command.PersistentFlags()) @@ -445,6 +446,42 @@ func NewTemporalActivityPauseCommand(cctx *CommandContext, parent *TemporalActiv return &s } +type TemporalActivityResetCommand struct { + Parent *TemporalActivityCommand + Command cobra.Command + WorkflowReferenceOptions + ActivityId string + Identity string + NoWait bool + ResetHeartbeats bool +} + +func NewTemporalActivityResetCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityResetCommand { + var s TemporalActivityResetCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "reset [flags]" + s.Command.Short = "Reset an Activity" + if hasHighlighting { + s.Command.Long = "Resetting an activity resets both the number of attempts and the activity \ntimeout. If activity is paused, it will be un-paused. \n\nIf the \x1b[1mno-wait\x1b[0m flag is provided, the activity will be rescheduled \nimmediately. Even if the activity is currently running.\nIf the \x1b[1mno-wait\x1b[0m flag is not provided, the activity will be scheduled \nafter the current instance completes, if needed. \nIf the 'reset_heartbeats' flag is set, the activity heartbeat timer and \nheartbeats will be reset.\n\nSpecify the Activity and Workflow IDs:\n\n\x1b[1mtemporal activity reset \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --no-wait\n --reset-heartbeats\x1b[0m" + } else { + s.Command.Long = "Resetting an activity resets both the number of attempts and the activity \ntimeout. If activity is paused, it will be un-paused. \n\nIf the `no-wait` flag is provided, the activity will be rescheduled \nimmediately. Even if the activity is currently running.\nIf the `no-wait` flag is not provided, the activity will be scheduled \nafter the current instance completes, if needed. \nIf the 'reset_heartbeats' flag is set, the activity heartbeat timer and \nheartbeats will be reset.\n\nSpecify the Activity and Workflow IDs:\n\n```\ntemporal activity reset \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --no-wait\n --reset-heartbeats\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID to pause. Required.") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id") + s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of the user submitting this request.") + s.Command.Flags().BoolVar(&s.NoWait, "no-wait", false, "Schedule the Activity immediately, even if its retry timeout has not expired or the activity is currently running.") + s.Command.Flags().BoolVar(&s.ResetHeartbeats, "reset-heartbeats", false, "Reset the Activity's heartbeat.") + s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalActivityUnpauseCommand struct { Parent *TemporalActivityCommand Command cobra.Command diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index df220da6..dbc436d7 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -201,12 +201,10 @@ commands: The command execution timeout. 0s means no timeout. - name: temporal activity - summary: Complete, update, pause, unpause or fail an Activity + summary: Complete, update, pause, unpause, reset or fail an Activity description: | - Update an Activity's options or update an Activity's state to completed - or failed. - - Pause or unpause an Activity. + Update an Activity's options, manage activity lifecycle or update + an Activity's state to completed or failed. Updating activity state marks an Activity as successfully finished or as having encountered an error. @@ -230,6 +228,7 @@ commands: - activity update-options - activity pause - activity unpause + - activity reset - activity execution - activity fail - cli reference @@ -454,6 +453,49 @@ commands: option-sets: - workflow reference + - name: temporal activity reset + summary: Reset an Activity + description: | + Resetting an activity resets both the number of attempts and the activity + timeout. If activity is paused, it will be un-paused. + + If the `no-wait` flag is provided, the activity will be rescheduled + immediately. Even if the activity is currently running. + If the `no-wait` flag is not provided, the activity will be scheduled + after the current instance completes, if needed. + If the 'reset_heartbeats' flag is set, the activity heartbeat timer and + heartbeats will be reset. + + Specify the Activity and Workflow IDs: + + ``` + temporal activity reset \ + --activity-id YourActivityId \ + --workflow-id YourWorkflowId + --no-wait + --reset-heartbeats + ``` + options: + - name: activity-id + short: a + type: string + description: Activity ID to pause. + required: true + - name: identity + type: string + description: Identity of the user submitting this request. + - name: no-wait + type: bool + description: | + Schedule the Activity immediately, even if its retry timeout has not + expired or the activity is currently running. + - name: reset-heartbeats + type: bool + description: Reset the Activity's heartbeat. + option-sets: + - workflow reference + + - name: temporal batch summary: Manage running batch jobs