Skip to content

Commit

Permalink
Add executionClusterLabel
Browse files Browse the repository at this point in the history
Signed-off-by: Rafael Raposo <[email protected]>
  • Loading branch information
RRap0so committed May 20, 2024
1 parent 2f1f813 commit 5af6afb
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 11 deletions.
37 changes: 29 additions & 8 deletions flytectl/cmd/create/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,36 +79,56 @@ The generated spec file can be modified to change the envs values, as shown belo
task: core.control_flow.merge_sort.merge
version: "v2"
4. Run the execution by passing the generated YAML file.
4. [Optional] Update the TargetExecutionCluster, if needed.
The generated spec file can be modified to change the TargetExecutionCluster values, as shown below:
.. code-block:: yaml
iamRoleARN: ""
inputs:
sorted_list1:
- 0
sorted_list2:
- 0
envs:
foo: bar
kubeServiceAcct: ""
targetDomain: ""
targetProject: ""
targetExecutionCluster: ""
task: core.control_flow.merge_sort.merge
version: "v2"
5. Run the execution by passing the generated YAML file.
The file can then be passed through the command line.
It is worth noting that the source's and target's project and domain can be different.
::
flytectl create execution --execFile execution_spec.yaml -p flytesnacks -d staging --targetProject flytesnacks
5. To relaunch an execution, pass the current execution ID as follows:
6. To relaunch an execution, pass the current execution ID as follows:
::
flytectl create execution --relaunch ffb31066a0f8b4d52b77 -p flytesnacks -d development
6. To recover an execution, i.e., recreate it from the last known failure point for previously-run workflow execution, run:
7. To recover an execution, i.e., recreate it from the last known failure point for previously-run workflow execution, run:
::
flytectl create execution --recover ffb31066a0f8b4d52b77 -p flytesnacks -d development
See :ref:` + "`ref_flyteidl.admin.ExecutionRecoverRequest`" + ` for more details.
7. You can create executions idempotently by naming them. This is also a way to *name* an execution for discovery. Note,
8. You can create executions idempotently by naming them. This is also a way to *name* an execution for discovery. Note,
an execution id has to be unique within a project domain. So if the *name* matches an existing execution an already exists exceptioj
will be raised.
::
flytectl create execution --recover ffb31066a0f8b4d52b77 -p flytesnacks -d development custom_name
8. Generic/Struct/Dataclass/JSON types are supported for execution in a similar manner.
9. Generic/Struct/Dataclass/JSON types are supported for execution in a similar manner.
The following is an example of how generic data can be specified while creating the execution.
::
Expand All @@ -128,7 +148,7 @@ The generated file would look similar to this. Here, empty values have been dump
task: core.type_system.custom_objects.add
version: v3
9. Modified file with struct data populated for 'x' and 'y' parameters for the task "core.type_system.custom_objects.add":
10. Modified file with struct data populated for 'x' and 'y' parameters for the task "core.type_system.custom_objects.add":
::
Expand All @@ -152,7 +172,7 @@ The generated file would look similar to this. Here, empty values have been dump
task: core.type_system.custom_objects.add
version: v3
10. If you have configured a plugin that implements github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces/WorkflowExecutor
11. If you have configured a plugin that implements github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces/WorkflowExecutor
that supports cluster pools, then when creating a new execution, you can assign it to a specific cluster pool:
::
Expand All @@ -167,8 +187,9 @@ The generated file would look similar to this. Here, empty values have been dump
type ExecutionConfig struct {
// pflag section
ExecFile string `json:"execFile,omitempty" pflag:",file for the execution params. If not specified defaults to <<workflow/task>_name>.execution_spec.yaml"`
TargetDomain string `json:"targetDomain" pflag:",project where execution needs to be created. If not specified configured domain would be used."`
TargetDomain string `json:"targetDomain" pflag:",domain where execution needs to be created. If not specified configured domain would be used."`
TargetProject string `json:"targetProject" pflag:",project where execution needs to be created. If not specified configured project would be used."`
TargetExecutionCluster string `json:"targetExecutionCluster" pflag:",cluster where execution needs to be created. If not specific the default would be used."`
KubeServiceAcct string `json:"kubeServiceAcct" pflag:",kubernetes service account AuthRole for launching execution."`
IamRoleARN string `json:"iamRoleARN" pflag:",iam role ARN AuthRole for launching execution."`
Relaunch string `json:"relaunch" pflag:",execution id to be relaunched."`
Expand Down
11 changes: 8 additions & 3 deletions flytectl/cmd/create/execution_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func createExecutionRequestForWorkflow(ctx context.Context, workflowName, projec
}
}

return createExecutionRequest(lp.Id, inputs, envs, securityContext, authRole, targetExecName), nil
return createExecutionRequest(lp.Id, inputs, envs, securityContext, authRole, targetExecName, executionConfig.TargetExecutionCluster), nil
}

func createExecutionRequestForTask(ctx context.Context, taskName string, project string, domain string,
Expand Down Expand Up @@ -102,7 +102,7 @@ func createExecutionRequestForTask(ctx context.Context, taskName string, project
Version: task.Id.Version,
}

return createExecutionRequest(id, inputs, envs, securityContext, authRole, targetExecName), nil
return createExecutionRequest(id, inputs, envs, securityContext, authRole, targetExecName, executionConfig.TargetExecutionCluster), nil
}

func relaunchExecution(ctx context.Context, executionName string, project string, domain string,
Expand Down Expand Up @@ -148,7 +148,7 @@ func recoverExecution(ctx context.Context, executionName string, project string,
return nil
}

func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *admin.Envs, securityContext *core.SecurityContext, authRole *admin.AuthRole, targetExecName string) *admin.ExecutionCreateRequest {
func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *admin.Envs, securityContext *core.SecurityContext, authRole *admin.AuthRole, targetExecName string, targetExecutionCluster string) *admin.ExecutionCreateRequest {

if len(targetExecName) == 0 {
targetExecName = "f" + strings.ReplaceAll(uuid.New().String(), "-", "")[:19]
Expand All @@ -157,6 +157,10 @@ func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *
if executionConfig.ClusterPool != "" {
clusterAssignment = &admin.ClusterAssignment{ClusterPoolName: executionConfig.ClusterPool}
}
var executionClusterLabel *admin.ExecutionClusterLabel
if targetExecutionCluster != "" {
executionClusterLabel = &admin.ExecutionClusterLabel{Value: targetExecutionCluster}
}
return &admin.ExecutionCreateRequest{
Project: executionConfig.TargetProject,
Domain: executionConfig.TargetDomain,
Expand All @@ -173,6 +177,7 @@ func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *
ClusterAssignment: clusterAssignment,
OverwriteCache: executionConfig.OverwriteCache,
Envs: envs,
ExecutionClusterLabel: executionClusterLabel,
},
Inputs: inputs,
}
Expand Down
16 changes: 16 additions & 0 deletions flytectl/cmd/create/execution_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,22 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
})
t.Run("successful with execution Cluster label and envs", func(t *testing.T) {
s := setup()
defer s.TearDown()

createExecutionUtilSetup()
launchPlan := &admin.LaunchPlan{}
s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(launchPlan, nil)
var executionConfigWithEnvs = &ExecutionConfig{
Envs: map[string]string{},
TargetExecutionCluster: "cluster",
}
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
assert.Equal(t, "cluster", execCreateRequest.Spec.ExecutionClusterLabel.Value)
})
t.Run("failed literal conversion", func(t *testing.T) {
s := setup()
defer s.TearDown()
Expand Down

0 comments on commit 5af6afb

Please sign in to comment.