Skip to content

Commit

Permalink
Batch operation rate limit (#366)
Browse files Browse the repository at this point in the history
## What was changed
<!-- Describe what has changed in this PR -->

Added a new flag `--rps` to the 4 batch operations.

Note that although `batch-reset` is a batch operation, it performs its
work locally instead of on the server - so the flag does not apply here.

## Why?
<!-- Tell your future self why have you made these changes -->

To fix temporalio/temporal#4926.

## Checklist
<!--- add/delete as needed --->

1. Closes OSS-1681

2. How was this tested:
<!--- Please describe how you tested your changes/how we can test them
-->

3. Any docs updates needed? see
temporalio/documentation#2413
  • Loading branch information
stephanos authored Oct 30, 2023
1 parent 6333773 commit 700234c
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 280 deletions.
8 changes: 8 additions & 0 deletions batch/batch_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ func ListBatchJobs(c *cli.Context) error {
// BatchTerminate terminate a list of workflows
func BatchTerminate(c *cli.Context) error {
operator := common.GetCurrentUserFromEnv()
rps := float32(c.Float64(common.FlagRPS))

req := workflowservice.StartBatchOperationRequest{
MaxOperationsPerSecond: rps,
Operation: &workflowservice.StartBatchOperationRequest_TerminationOperation{
TerminationOperation: &batch.BatchOperationTermination{
Identity: operator,
Expand All @@ -98,8 +100,10 @@ func BatchTerminate(c *cli.Context) error {
// BatchCancel cancel a list of workflows
func BatchCancel(c *cli.Context) error {
operator := common.GetCurrentUserFromEnv()
rps := float32(c.Float64(common.FlagRPS))

req := workflowservice.StartBatchOperationRequest{
MaxOperationsPerSecond: rps,
Operation: &workflowservice.StartBatchOperationRequest_CancellationOperation{
CancellationOperation: &batch.BatchOperationCancellation{
Identity: operator,
Expand All @@ -114,12 +118,14 @@ func BatchCancel(c *cli.Context) error {
func BatchSignal(c *cli.Context) error {
signalName := c.String(common.FlagName)
operator := common.GetCurrentUserFromEnv()
rps := float32(c.Float64(common.FlagRPS))
input, err := common.ProcessJSONInput(c)
if err != nil {
return err
}

req := workflowservice.StartBatchOperationRequest{
MaxOperationsPerSecond: rps,
Operation: &workflowservice.StartBatchOperationRequest_SignalOperation{
SignalOperation: &batch.BatchOperationSignal{
Signal: signalName,
Expand All @@ -135,8 +141,10 @@ func BatchSignal(c *cli.Context) error {
// BatchDelete delete a list of workflows
func BatchDelete(c *cli.Context) error {
operator := common.GetCurrentUserFromEnv()
rps := float32(c.Float64(common.FlagRPS))

req := workflowservice.StartBatchOperationRequest{
MaxOperationsPerSecond: rps,
Operation: &workflowservice.StartBatchOperationRequest_DeletionOperation{
DeletionOperation: &batch.BatchOperationDeletion{
Identity: operator,
Expand Down
5 changes: 3 additions & 2 deletions common/defs-flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ const (
FlagUpdateIDDefinition = "UpdateID to check the result of an update (either UpdateID or Update handler name should be passed)"
FlagCancelWorkflow = "Cancel Workflow Execution with given Workflow Id."
FlagWorkflowIDTerminate = "Terminate Workflow Execution with given Workflow Id."
FlagRPSDefinition = "Limit for batch operation's requests per second."
FlagQueryCancel = "Cancel Workflow Executions with given List Filter."
FlagQueryDelete = "Delete Workflow Executions with given List Filter."
FlagQuerySignal = "Signal Workflow Executions with given List List Filter."
FlagQuerySignal = "Signal Workflow Executions with given List Filter."
FlagQueryReset = "Reset Workflow Executions with given List Filter"
FlagQueryTerminate = "Terminate Workflow Executions with given List Filter."
FlagEventIDDefinition = "The Event Id for any Event after WorkflowTaskStarted you want to reset to (exclusive). It can be WorkflowTaskCompleted, WorkflowTaskFailed or others."
FlagQueryResetBatch = "Visibility Query of Search Attributes describing the Workflow Executions to reset. See https://docs.temporal.io/docs/tctl/workflow/list#--query."
FlagInputFileReset = "Input file that specifies Workflow Executions to reset. Each line contains one Workflow Id as the base Run and, optionally, a Run Id."
FlagExcludeFileDefinition = "Input file that specifies Workflow Executions to exclude from a reset."
FlagInputSeparatorDefinition = "Separator for the input file. The default is a tab (`\t`)."
Expand Down
31 changes: 15 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@ require (
github.com/temporalio/tctl-kit v0.0.0-20230328153839-577f95d16fa0
github.com/temporalio/ui-server/v2 v2.18.2
github.com/urfave/cli/v2 v2.25.7
go.temporal.io/api v1.24.0
go.temporal.io/api v1.25.0
go.temporal.io/sdk v1.24.0
go.temporal.io/sdk/contrib/tools/workflowcheck v0.1.0
go.temporal.io/server v1.22.0
go.uber.org/zap v1.25.0
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
google.golang.org/grpc v1.57.0
google.golang.org/grpc v1.59.0
)

require github.com/google/s2a-go v0.1.4 // indirect

require github.com/cactus/go-statsd-client/v5 v5.0.0 // indirect

require (
cloud.google.com/go v0.110.7 // indirect
cloud.google.com/go/compute v1.23.0 // indirect
cloud.google.com/go v0.110.8 // indirect
cloud.google.com/go/compute v1.23.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.1 // indirect
cloud.google.com/go/iam v1.1.3 // indirect
cloud.google.com/go/storage v1.30.1 // indirect
github.com/apache/thrift v0.18.1 // indirect
github.com/aws/aws-sdk-go v1.44.289 // indirect
Expand All @@ -60,10 +60,9 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/gorilla/securecookie v1.1.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand Down Expand Up @@ -125,21 +124,21 @@ require (
go.uber.org/dig v1.17.0 // indirect
go.uber.org/fx v1.20.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/oauth2 v0.9.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.11.0
golang.org/x/text v0.12.0 // indirect
golang.org/x/sys v0.13.0
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.10.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.128.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230815205213-6bfd019c3878 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230815205213-6bfd019c3878 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 // indirect
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
Expand Down
Loading

0 comments on commit 700234c

Please sign in to comment.