Skip to content

Commit

Permalink
Add executionClusterLabel (#5394)
Browse files Browse the repository at this point in the history
* wAdd executionClusterLabel

Signed-off-by: Rafael Raposo <[email protected]>

* Run make -c flytectl generate

Signed-off-by: Eduardo Apolinario <[email protected]>

---------

Signed-off-by: Rafael Raposo <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
RRap0so and eapolinario authored May 28, 2024
1 parent d04cf66 commit 1078e09
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 35 deletions.
57 changes: 39 additions & 18 deletions flytectl/cmd/create/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,36 +79,56 @@ The generated spec file can be modified to change the envs values, as shown belo
task: core.control_flow.merge_sort.merge
version: "v2"
4. Run the execution by passing the generated YAML file.
4. [Optional] Update the TargetExecutionCluster, if needed.
The generated spec file can be modified to change the TargetExecutionCluster values, as shown below:
.. code-block:: yaml
iamRoleARN: ""
inputs:
sorted_list1:
- 0
sorted_list2:
- 0
envs:
foo: bar
kubeServiceAcct: ""
targetDomain: ""
targetProject: ""
targetExecutionCluster: ""
task: core.control_flow.merge_sort.merge
version: "v2"
5. Run the execution by passing the generated YAML file.
The file can then be passed through the command line.
It is worth noting that the source's and target's project and domain can be different.
::
flytectl create execution --execFile execution_spec.yaml -p flytesnacks -d staging --targetProject flytesnacks
5. To relaunch an execution, pass the current execution ID as follows:
6. To relaunch an execution, pass the current execution ID as follows:
::
flytectl create execution --relaunch ffb31066a0f8b4d52b77 -p flytesnacks -d development
6. To recover an execution, i.e., recreate it from the last known failure point for previously-run workflow execution, run:
7. To recover an execution, i.e., recreate it from the last known failure point for previously-run workflow execution, run:
::
flytectl create execution --recover ffb31066a0f8b4d52b77 -p flytesnacks -d development
See :ref:` + "`ref_flyteidl.admin.ExecutionRecoverRequest`" + ` for more details.
7. You can create executions idempotently by naming them. This is also a way to *name* an execution for discovery. Note,
8. You can create executions idempotently by naming them. This is also a way to *name* an execution for discovery. Note,
an execution id has to be unique within a project domain. So if the *name* matches an existing execution an already exists exceptioj
will be raised.
::
flytectl create execution --recover ffb31066a0f8b4d52b77 -p flytesnacks -d development custom_name
8. Generic/Struct/Dataclass/JSON types are supported for execution in a similar manner.
9. Generic/Struct/Dataclass/JSON types are supported for execution in a similar manner.
The following is an example of how generic data can be specified while creating the execution.
::
Expand All @@ -128,7 +148,7 @@ The generated file would look similar to this. Here, empty values have been dump
task: core.type_system.custom_objects.add
version: v3
9. Modified file with struct data populated for 'x' and 'y' parameters for the task "core.type_system.custom_objects.add":
10. Modified file with struct data populated for 'x' and 'y' parameters for the task "core.type_system.custom_objects.add":
::
Expand All @@ -152,7 +172,7 @@ The generated file would look similar to this. Here, empty values have been dump
task: core.type_system.custom_objects.add
version: v3
10. If you have configured a plugin that implements github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces/WorkflowExecutor
11. If you have configured a plugin that implements github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces/WorkflowExecutor
that supports cluster pools, then when creating a new execution, you can assign it to a specific cluster pool:
::
Expand All @@ -166,17 +186,18 @@ The generated file would look similar to this. Here, empty values have been dump
// ExecutionConfig hold configuration for create execution flags and configuration of the actual task or workflow to be launched.
type ExecutionConfig struct {
// pflag section
ExecFile string `json:"execFile,omitempty" pflag:",file for the execution params. If not specified defaults to <<workflow/task>_name>.execution_spec.yaml"`
TargetDomain string `json:"targetDomain" pflag:",project where execution needs to be created. If not specified configured domain would be used."`
TargetProject string `json:"targetProject" pflag:",project where execution needs to be created. If not specified configured project would be used."`
KubeServiceAcct string `json:"kubeServiceAcct" pflag:",kubernetes service account AuthRole for launching execution."`
IamRoleARN string `json:"iamRoleARN" pflag:",iam role ARN AuthRole for launching execution."`
Relaunch string `json:"relaunch" pflag:",execution id to be relaunched."`
Recover string `json:"recover" pflag:",execution id to be recreated from the last known failure point."`
DryRun bool `json:"dryRun" pflag:",execute command without making any modifications."`
Version string `json:"version" pflag:",specify version of execution workflow/task."`
ClusterPool string `json:"clusterPool" pflag:",specify which cluster pool to assign execution to."`
OverwriteCache bool `json:"overwriteCache" pflag:",skip cached results when performing execution,causing all outputs to be re-calculated and stored data to be overwritten. Does not work for recovered executions."`
ExecFile string `json:"execFile,omitempty" pflag:",file for the execution params. If not specified defaults to <<workflow/task>_name>.execution_spec.yaml"`
TargetDomain string `json:"targetDomain" pflag:",domain where execution needs to be created. If not specified configured domain would be used."`
TargetProject string `json:"targetProject" pflag:",project where execution needs to be created. If not specified configured project would be used."`
TargetExecutionCluster string `json:"targetExecutionCluster" pflag:",cluster where execution needs to be created. If not specific the default would be used."`
KubeServiceAcct string `json:"kubeServiceAcct" pflag:",kubernetes service account AuthRole for launching execution."`
IamRoleARN string `json:"iamRoleARN" pflag:",iam role ARN AuthRole for launching execution."`
Relaunch string `json:"relaunch" pflag:",execution id to be relaunched."`
Recover string `json:"recover" pflag:",execution id to be recreated from the last known failure point."`
DryRun bool `json:"dryRun" pflag:",execute command without making any modifications."`
Version string `json:"version" pflag:",specify version of execution workflow/task."`
ClusterPool string `json:"clusterPool" pflag:",specify which cluster pool to assign execution to."`
OverwriteCache bool `json:"overwriteCache" pflag:",skip cached results when performing execution,causing all outputs to be re-calculated and stored data to be overwritten. Does not work for recovered executions."`
// Non plfag section is read from the execution config generated by get task/launch plan
Workflow string `json:"workflow,omitempty"`
Task string `json:"task,omitempty"`
Expand Down
21 changes: 13 additions & 8 deletions flytectl/cmd/create/execution_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func createExecutionRequestForWorkflow(ctx context.Context, workflowName, projec
}
}

return createExecutionRequest(lp.Id, inputs, envs, securityContext, authRole, targetExecName), nil
return createExecutionRequest(lp.Id, inputs, envs, securityContext, authRole, targetExecName, executionConfig.TargetExecutionCluster), nil
}

func createExecutionRequestForTask(ctx context.Context, taskName string, project string, domain string,
Expand Down Expand Up @@ -102,7 +102,7 @@ func createExecutionRequestForTask(ctx context.Context, taskName string, project
Version: task.Id.Version,
}

return createExecutionRequest(id, inputs, envs, securityContext, authRole, targetExecName), nil
return createExecutionRequest(id, inputs, envs, securityContext, authRole, targetExecName, executionConfig.TargetExecutionCluster), nil
}

func relaunchExecution(ctx context.Context, executionName string, project string, domain string,
Expand Down Expand Up @@ -148,7 +148,7 @@ func recoverExecution(ctx context.Context, executionName string, project string,
return nil
}

func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *admin.Envs, securityContext *core.SecurityContext, authRole *admin.AuthRole, targetExecName string) *admin.ExecutionCreateRequest {
func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *admin.Envs, securityContext *core.SecurityContext, authRole *admin.AuthRole, targetExecName string, targetExecutionCluster string) *admin.ExecutionCreateRequest {

if len(targetExecName) == 0 {
targetExecName = "f" + strings.ReplaceAll(uuid.New().String(), "-", "")[:19]
Expand All @@ -157,6 +157,10 @@ func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *
if executionConfig.ClusterPool != "" {
clusterAssignment = &admin.ClusterAssignment{ClusterPoolName: executionConfig.ClusterPool}
}
var executionClusterLabel *admin.ExecutionClusterLabel
if targetExecutionCluster != "" {
executionClusterLabel = &admin.ExecutionClusterLabel{Value: targetExecutionCluster}
}
return &admin.ExecutionCreateRequest{
Project: executionConfig.TargetProject,
Domain: executionConfig.TargetDomain,
Expand All @@ -168,11 +172,12 @@ func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *
Principal: "sdk",
Nesting: 0,
},
AuthRole: authRole,
SecurityContext: securityContext,
ClusterAssignment: clusterAssignment,
OverwriteCache: executionConfig.OverwriteCache,
Envs: envs,
AuthRole: authRole,
SecurityContext: securityContext,
ClusterAssignment: clusterAssignment,
OverwriteCache: executionConfig.OverwriteCache,
Envs: envs,
ExecutionClusterLabel: executionClusterLabel,
},
Inputs: inputs,
}
Expand Down
16 changes: 16 additions & 0 deletions flytectl/cmd/create/execution_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,22 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
})
t.Run("successful with execution Cluster label and envs", func(t *testing.T) {
s := setup()
defer s.TearDown()

createExecutionUtilSetup()
launchPlan := &admin.LaunchPlan{}
s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(launchPlan, nil)
var executionConfigWithEnvs = &ExecutionConfig{
Envs: map[string]string{},
TargetExecutionCluster: "cluster",
}
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
assert.Equal(t, "cluster", execCreateRequest.Spec.ExecutionClusterLabel.Value)
})
t.Run("failed literal conversion", func(t *testing.T) {
s := setup()
defer s.TearDown()
Expand Down
3 changes: 2 additions & 1 deletion flytectl/cmd/create/executionconfig_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions flytectl/cmd/create/executionconfig_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions flytectl/cmd/get/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestGetExecutionFuncWithIOData(t *testing.T) {
nodeExecList := &admin.NodeExecutionList{NodeExecutions: nodeExecutions}

inputs := map[string]*core.Literal{
"val1": &core.Literal{
"val1": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Expand All @@ -191,7 +191,7 @@ func TestGetExecutionFuncWithIOData(t *testing.T) {
},
}
outputs := map[string]*core.Literal{
"o2": &core.Literal{
"o2": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestGetExecutionFuncWithIOData(t *testing.T) {
nodeExecutions := []*admin.NodeExecution{nodeExec1}
nodeExecList := &admin.NodeExecutionList{NodeExecutions: nodeExecutions}
inputs := map[string]*core.Literal{
"val1": &core.Literal{
"val1": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Expand All @@ -303,7 +303,7 @@ func TestGetExecutionFuncWithIOData(t *testing.T) {
},
}
outputs := map[string]*core.Literal{
"o2": &core.Literal{
"o2": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Expand Down
8 changes: 4 additions & 4 deletions flytectl/cmd/get/node_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestGetExecutionDetails(t *testing.T) {
nodeExecList := &admin.NodeExecutionList{NodeExecutions: nodeExecutions}

inputs := map[string]*core.Literal{
"val1": &core.Literal{
"val1": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Expand All @@ -192,7 +192,7 @@ func TestGetExecutionDetails(t *testing.T) {
},
}
outputs := map[string]*core.Literal{
"o2": &core.Literal{
"o2": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestGetExecutionDetails(t *testing.T) {
nodeExecList := &admin.NodeExecutionList{NodeExecutions: nodeExecutions}

inputs := map[string]*core.Literal{
"val1": &core.Literal{
"val1": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Expand All @@ -258,7 +258,7 @@ func TestGetExecutionDetails(t *testing.T) {
},
}
outputs := map[string]*core.Literal{
"o2": &core.Literal{
"o2": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Expand Down

0 comments on commit 1078e09

Please sign in to comment.