diff --git a/flytectl/cmd/create/execution.go b/flytectl/cmd/create/execution.go index afef1db3f3..38c7e8a886 100644 --- a/flytectl/cmd/create/execution.go +++ b/flytectl/cmd/create/execution.go @@ -79,20 +79,40 @@ The generated spec file can be modified to change the envs values, as shown belo task: core.control_flow.merge_sort.merge version: "v2" -4. Run the execution by passing the generated YAML file. +4. [Optional] Update the TargetExecutionCluster, if needed. +The generated spec file can be modified to change the TargetExecutionCluster values, as shown below: + +.. code-block:: yaml + + iamRoleARN: "" + inputs: + sorted_list1: + - 0 + sorted_list2: + - 0 + envs: + foo: bar + kubeServiceAcct: "" + targetDomain: "" + targetProject: "" + targetExecutionCluster: "" + task: core.control_flow.merge_sort.merge + version: "v2" + +5. Run the execution by passing the generated YAML file. The file can then be passed through the command line. It is worth noting that the source's and target's project and domain can be different. :: flytectl create execution --execFile execution_spec.yaml -p flytesnacks -d staging --targetProject flytesnacks -5. To relaunch an execution, pass the current execution ID as follows: +6. To relaunch an execution, pass the current execution ID as follows: :: flytectl create execution --relaunch ffb31066a0f8b4d52b77 -p flytesnacks -d development -6. To recover an execution, i.e., recreate it from the last known failure point for previously-run workflow execution, run: +7. To recover an execution, i.e., recreate it from the last known failure point for previously-run workflow execution, run: :: @@ -100,7 +120,7 @@ It is worth noting that the source's and target's project and domain can be diff See :ref:` + "`ref_flyteidl.admin.ExecutionRecoverRequest`" + ` for more details. -7. You can create executions idempotently by naming them. This is also a way to *name* an execution for discovery. Note, +8. You can create executions idempotently by naming them. This is also a way to *name* an execution for discovery. Note, an execution id has to be unique within a project domain. So if the *name* matches an existing execution an already exists exceptioj will be raised. @@ -108,7 +128,7 @@ will be raised. flytectl create execution --recover ffb31066a0f8b4d52b77 -p flytesnacks -d development custom_name -8. Generic/Struct/Dataclass/JSON types are supported for execution in a similar manner. +9. Generic/Struct/Dataclass/JSON types are supported for execution in a similar manner. The following is an example of how generic data can be specified while creating the execution. :: @@ -128,7 +148,7 @@ The generated file would look similar to this. Here, empty values have been dump task: core.type_system.custom_objects.add version: v3 -9. Modified file with struct data populated for 'x' and 'y' parameters for the task "core.type_system.custom_objects.add": +10. Modified file with struct data populated for 'x' and 'y' parameters for the task "core.type_system.custom_objects.add": :: @@ -152,7 +172,7 @@ The generated file would look similar to this. Here, empty values have been dump task: core.type_system.custom_objects.add version: v3 -10. If you have configured a plugin that implements github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces/WorkflowExecutor +11. If you have configured a plugin that implements github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces/WorkflowExecutor that supports cluster pools, then when creating a new execution, you can assign it to a specific cluster pool: :: @@ -167,8 +187,9 @@ The generated file would look similar to this. Here, empty values have been dump type ExecutionConfig struct { // pflag section ExecFile string `json:"execFile,omitempty" pflag:",file for the execution params. If not specified defaults to <_name>.execution_spec.yaml"` - TargetDomain string `json:"targetDomain" pflag:",project where execution needs to be created. If not specified configured domain would be used."` + TargetDomain string `json:"targetDomain" pflag:",domain where execution needs to be created. If not specified configured domain would be used."` TargetProject string `json:"targetProject" pflag:",project where execution needs to be created. If not specified configured project would be used."` + TargetExecutionCluster string `json:"targetExecutionCluster" pflag:",cluster where execution needs to be created. If not specific the default would be used."` KubeServiceAcct string `json:"kubeServiceAcct" pflag:",kubernetes service account AuthRole for launching execution."` IamRoleARN string `json:"iamRoleARN" pflag:",iam role ARN AuthRole for launching execution."` Relaunch string `json:"relaunch" pflag:",execution id to be relaunched."` diff --git a/flytectl/cmd/create/execution_util.go b/flytectl/cmd/create/execution_util.go index 4961f4d9fc..d1e77c9233 100644 --- a/flytectl/cmd/create/execution_util.go +++ b/flytectl/cmd/create/execution_util.go @@ -54,7 +54,7 @@ func createExecutionRequestForWorkflow(ctx context.Context, workflowName, projec } } - return createExecutionRequest(lp.Id, inputs, envs, securityContext, authRole, targetExecName), nil + return createExecutionRequest(lp.Id, inputs, envs, securityContext, authRole, targetExecName, executionConfig.TargetExecutionCluster), nil } func createExecutionRequestForTask(ctx context.Context, taskName string, project string, domain string, @@ -102,7 +102,7 @@ func createExecutionRequestForTask(ctx context.Context, taskName string, project Version: task.Id.Version, } - return createExecutionRequest(id, inputs, envs, securityContext, authRole, targetExecName), nil + return createExecutionRequest(id, inputs, envs, securityContext, authRole, targetExecName, executionConfig.TargetExecutionCluster), nil } func relaunchExecution(ctx context.Context, executionName string, project string, domain string, @@ -148,7 +148,7 @@ func recoverExecution(ctx context.Context, executionName string, project string, return nil } -func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *admin.Envs, securityContext *core.SecurityContext, authRole *admin.AuthRole, targetExecName string) *admin.ExecutionCreateRequest { +func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *admin.Envs, securityContext *core.SecurityContext, authRole *admin.AuthRole, targetExecName string, targetExecutionCluster string) *admin.ExecutionCreateRequest { if len(targetExecName) == 0 { targetExecName = "f" + strings.ReplaceAll(uuid.New().String(), "-", "")[:19] @@ -157,6 +157,10 @@ func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs * if executionConfig.ClusterPool != "" { clusterAssignment = &admin.ClusterAssignment{ClusterPoolName: executionConfig.ClusterPool} } + var executionClusterLabel *admin.ExecutionClusterLabel + if targetExecutionCluster != "" { + executionClusterLabel = &admin.ExecutionClusterLabel{Value: targetExecutionCluster} + } return &admin.ExecutionCreateRequest{ Project: executionConfig.TargetProject, Domain: executionConfig.TargetDomain, @@ -173,6 +177,7 @@ func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs * ClusterAssignment: clusterAssignment, OverwriteCache: executionConfig.OverwriteCache, Envs: envs, + ExecutionClusterLabel: executionClusterLabel, }, Inputs: inputs, } diff --git a/flytectl/cmd/create/execution_util_test.go b/flytectl/cmd/create/execution_util_test.go index 526d863ca2..9d53b6c90f 100644 --- a/flytectl/cmd/create/execution_util_test.go +++ b/flytectl/cmd/create/execution_util_test.go @@ -129,6 +129,22 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, execCreateRequest) }) + t.Run("successful with execution Cluster label and envs", func(t *testing.T) { + s := setup() + defer s.TearDown() + + createExecutionUtilSetup() + launchPlan := &admin.LaunchPlan{} + s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(launchPlan, nil) + var executionConfigWithEnvs = &ExecutionConfig{ + Envs: map[string]string{}, + TargetExecutionCluster: "cluster", + } + execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs, "") + assert.Nil(t, err) + assert.NotNil(t, execCreateRequest) + assert.Equal(t, "cluster", execCreateRequest.Spec.ExecutionClusterLabel.Value) + }) t.Run("failed literal conversion", func(t *testing.T) { s := setup() defer s.TearDown()