Skip to content

Commit

Permalink
Artf/switch event (#4428)
Browse files Browse the repository at this point in the history
Most of the changes here are generated proto changes. Actual code changes are:
IDL
* Remove the supplemental fields in the CloudEventTaskExecution object and move them to CloudEventNodeExecution object.
* Remove some fields that the artifact service ended up not using (parent_node_execution and scheduled_at)

in the cloudevent publisher, change the code filling in of the aforementioned supplemental information to happen for node execution events instead of task execution events.
* Remove the deleted fields.

On the event handling side, move the logic to the handling of the node event instead of the task event.
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored Nov 15, 2023
1 parent 63b577e commit 484e144
Show file tree
Hide file tree
Showing 15 changed files with 3,511 additions and 4,717 deletions.
172 changes: 123 additions & 49 deletions flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"context"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flytestdlib/contextutils"

"reflect"
"time"

Expand Down Expand Up @@ -200,47 +203,89 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
}

return &event.CloudEventWorkflowExecution{
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: &workflowInterface,
InputData: inputs,
ScheduledAt: spec.GetMetadata().GetScheduledAt(),
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ParentNodeExecution: spec.GetMetadata().GetParentNodeExecution(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: &workflowInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,
}, nil

Check warning on line 213 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L205-L213

Added lines #L205 - L213 were not covered by tests
}

func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Context, rawEvent *event.NodeExecutionEvent) (*event.CloudEventNodeExecution, error) {
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
}, nil
func getNodeExecutionContext(ctx context.Context, identifier *core.NodeExecutionIdentifier) context.Context {
ctx = contextutils.WithProjectDomain(ctx, identifier.ExecutionId.Project, identifier.ExecutionId.Domain)
ctx = contextutils.WithExecutionID(ctx, identifier.ExecutionId.Name)
return contextutils.WithNodeID(ctx, identifier.NodeId)

Check warning on line 219 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L216-L219

Added lines #L216 - L219 were not covered by tests
}

func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Context, rawEvent *event.TaskExecutionEvent) (*event.CloudEventTaskExecution, error) {
// This is a rough copy of the ListTaskExecutions function in TaskExecutionManager. It can be deprecated once we move the processing out of Admin itself.
// Just return the highest retry attempt.
func (c *CloudEventWrappedPublisher) getLatestTaskExecutions(ctx context.Context, nodeExecutionID core.NodeExecutionIdentifier) (*admin.TaskExecution, error) {
ctx = getNodeExecutionContext(ctx, &nodeExecutionID)

if rawEvent == nil {
return nil, fmt.Errorf("nothing to publish, TaskExecution event is nil")
identifierFilters, err := util.GetNodeExecutionIdentifierFilters(ctx, nodeExecutionID)
if err != nil {
return nil, err
}

Check warning on line 230 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L224-L230

Added lines #L224 - L230 were not covered by tests

// For now, don't append any additional information unless succeeded
if rawEvent.Phase != core.TaskExecution_SUCCEEDED {
return &event.CloudEventTaskExecution{
RawEvent: rawEvent,
OutputData: nil,
OutputInterface: nil,
sort := admin.Sort{
Key: "retry_attempt",
Direction: 0,
}
sortParameter, err := common.NewSortParameter(&sort, models.TaskExecutionColumns)
if err != nil {
return nil, err
}

Check warning on line 239 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L232-L239

Added lines #L232 - L239 were not covered by tests

output, err := c.db.TaskExecutionRepo().List(ctx, repositoryInterfaces.ListResourceInput{
InlineFilters: identifierFilters,
Offset: 0,
Limit: 1,
SortParameter: sortParameter,
})
if err != nil {
return nil, err
}
if output.TaskExecutions == nil || len(output.TaskExecutions) == 0 {
logger.Debugf(ctx, "no task executions found for node exec id [%+v]", nodeExecutionID)
return nil, nil
}

Check warning on line 253 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L241-L253

Added lines #L241 - L253 were not covered by tests

taskExecutionList, err := transformers.FromTaskExecutionModels(output.TaskExecutions, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "failed to transform task execution models for node exec id [%+v] with err: %v", nodeExecutionID, err)
return nil, err
}

Check warning on line 259 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L255-L259

Added lines #L255 - L259 were not covered by tests

return taskExecutionList[0], nil

Check warning on line 261 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L261

Added line #L261 was not covered by tests
}

func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Context, rawEvent *event.NodeExecutionEvent) (*event.CloudEventNodeExecution, error) {
if rawEvent == nil || rawEvent.Id == nil {
return nil, fmt.Errorf("nothing to publish, NodeExecution event or ID is nil")
}

Check warning on line 267 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L264-L267

Added lines #L264 - L267 were not covered by tests

// Skip nodes unless they're succeeded and not start nodes
if rawEvent.Phase != core.NodeExecution_SUCCEEDED {
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
}, nil
} else if rawEvent.Id.NodeId == "start-node" {
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
}, nil
}

Check warning on line 278 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L270-L278

Added lines #L270 - L278 were not covered by tests
// metric

// This gets the parent workflow execution metadata
executionModel, err := c.db.ExecutionRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: rawEvent.ParentNodeExecutionId.ExecutionId.Project,
Domain: rawEvent.ParentNodeExecutionId.ExecutionId.Domain,
Name: rawEvent.ParentNodeExecutionId.ExecutionId.Name,
Project: rawEvent.Id.ExecutionId.Project,
Domain: rawEvent.Id.ExecutionId.Domain,
Name: rawEvent.Id.ExecutionId.Name,
})
if err != nil {
logger.Infof(ctx, "couldn't find execution [%+v] to save termination cause", rawEvent.ParentNodeExecutionId)
logger.Infof(ctx, "couldn't find execution [%+v] for cloud event processing", rawEvent.Id.ExecutionId)
return nil, err
}

Check warning on line 290 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L282-L290

Added lines #L282 - L290 were not covered by tests

Expand All @@ -250,19 +295,9 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con
fmt.Printf("there was an error with spec %v %v", err, executionModel.Spec)
}

Check warning on line 296 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L292-L296

Added lines #L292 - L296 were not covered by tests

taskModel, err := c.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: rawEvent.TaskId.Project,
Domain: rawEvent.TaskId.Domain,
Name: rawEvent.TaskId.Name,
Version: rawEvent.TaskId.Version,
})
if err != nil {
// TODO: metric this
logger.Debugf(ctx, "Failed to get task with task id [%+v] with err %v", rawEvent.TaskId, err)
return nil, err
}
task, err := transformers.FromTaskModel(taskModel)

// Get inputs/outputs
// This will likely need to move to the artifact service side, given message size limits.
// Replace with call to GetNodeExecutionData
var inputs *core.LiteralMap
if rawEvent.GetInputData() != nil {
inputs = rawEvent.GetInputData()
Expand All @@ -273,9 +308,10 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con
fmt.Printf("Error fetching input literal map %v", rawEvent)
}
} else {
logger.Infof(ctx, "Task execution for node exec [%+v] has no input data", rawEvent.ParentNodeExecutionId)
logger.Infof(ctx, "Node execution for node exec [%+v] has no input data", rawEvent.Id)
}

Check warning on line 312 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L301-L312

Added lines #L301 - L312 were not covered by tests

// This will likely need to move to the artifact service side, given message size limits.
var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
outputs = rawEvent.GetOutputData()
Expand All @@ -289,16 +325,53 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con
}

Check warning on line 325 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L315-L325

Added lines #L315 - L325 were not covered by tests
}

// Fetch the latest task execution if any, and pull out the task interface, if applicable.
// These are optional fields... if the node execution doesn't have a task execution then these will be empty.
var taskExecID *core.TaskExecutionIdentifier
var typedInterface *core.TypedInterface

lte, err := c.getLatestTaskExecutions(ctx, *rawEvent.Id)
if err != nil {
logger.Errorf(ctx, "failed to get latest task execution for node exec id [%+v] with err: %v", rawEvent.Id, err)
return nil, err
}
if lte != nil {
taskModel, err := c.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: lte.Id.TaskId.Project,
Domain: lte.Id.TaskId.Domain,
Name: lte.Id.TaskId.Name,
Version: lte.Id.TaskId.Version,
})
if err != nil {
// TODO: metric this
// metric
logger.Debugf(ctx, "Failed to get task with task id [%+v] with err %v", lte.Id.TaskId, err)
return nil, err
}
task, err := transformers.FromTaskModel(taskModel)
typedInterface = task.Closure.CompiledTask.Template.Interface
taskExecID = lte.Id

Check warning on line 353 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L330-L353

Added lines #L330 - L353 were not covered by tests
}

return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
TaskExecId: taskExecID,
OutputData: outputs,
OutputInterface: typedInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
LaunchPlanId: spec.LaunchPlan,
}, nil

Check warning on line 364 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L356-L364

Added lines #L356 - L364 were not covered by tests
}

func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Context, rawEvent *event.TaskExecutionEvent) (*event.CloudEventTaskExecution, error) {

if rawEvent == nil {
return nil, fmt.Errorf("nothing to publish, TaskExecution event is nil")
}

Check warning on line 371 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L367-L371

Added lines #L367 - L371 were not covered by tests

return &event.CloudEventTaskExecution{
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: task.Closure.CompiledTask.Template.Interface,
InputData: inputs,
ScheduledAt: spec.GetMetadata().GetScheduledAt(),
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ParentNodeExecution: spec.GetMetadata().GetParentNodeExecution(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,
RawEvent: rawEvent,
}, nil

Check warning on line 375 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L373-L375

Added lines #L373 - L375 were not covered by tests
}

Expand Down Expand Up @@ -359,6 +432,7 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy
phase = e.Phase.String()
eventTime = e.OccurredAt.AsTime()
eventID = fmt.Sprintf("%v.%v", executionID, phase)
eventSource = common.FlyteURLKeyFromNodeExecutionID(*msgType.Event.Id)
finalMsg, err = c.TransformNodeExecutionEvent(ctx, e)
case *event.CloudEventExecutionStart:
topic = "cloudevents.ExecutionStart"
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
}

go func() {
ceCtx := context.Background()
ceCtx := context.TODO()
if err := m.cloudEventsPublisher.Publish(ceCtx, proto.MessageName(&request), &request); err != nil {
logger.Errorf(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err)
}
Expand Down
63 changes: 46 additions & 17 deletions flyteartifacts/pkg/server/processor/events_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ func (s *ServiceCallHandler) HandleEvent(ctx context.Context, cloudEvent *event2
return s.HandleEventTaskExec(ctx, source, msgType)
case *event.CloudEventNodeExecution:
logger.Debugf(ctx, "Handling CloudEventNodeExecution [%v]", msgType.RawEvent.Id)
return s.HandleEventNodeExec(ctx, msgType)
return s.HandleEventNodeExec(ctx, source, msgType)
default:
return fmt.Errorf("HandleEvent found unknown message type [%T]", msgType)
}
}

func (s *ServiceCallHandler) HandleEventExecStart(_ context.Context, _ *event.CloudEventExecutionStart) error {
// metric
return nil
}

Expand Down Expand Up @@ -171,18 +172,57 @@ func getPartitionsAndTag(ctx context.Context, partialID core.ArtifactID, variabl
return partitions, tag, nil
}

func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source string, evt *event.CloudEventTaskExecution) error {
func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, _ string, evt *event.CloudEventTaskExecution) error {

if evt.RawEvent.Phase != core.TaskExecution_SUCCEEDED {
logger.Debug(ctx, "Skipping non-successful task execution event")
return nil
}
// metric

execID := evt.RawEvent.ParentNodeExecutionId.ExecutionId
return nil
}

func (s *ServiceCallHandler) HandleEventNodeExec(ctx context.Context, source string, evt *event.CloudEventNodeExecution) error {
if evt.RawEvent.Phase != core.NodeExecution_SUCCEEDED {
logger.Debug(ctx, "Skipping non-successful task execution event")
return nil
}
if evt.RawEvent.Id.NodeId == "end-node" {
logger.Debug(ctx, "Skipping end node for %s", evt.RawEvent.Id.ExecutionId.Name)
return nil
}
// metric

execID := evt.RawEvent.Id.ExecutionId
if evt.GetOutputData().GetLiterals() == nil || len(evt.OutputData.Literals) == 0 {
logger.Debugf(ctx, "No output data to process for task event from [%v]: %s", execID, evt.RawEvent.TaskId.Name)
logger.Debugf(ctx, "No output data to process for task event from [%s] node %s", execID, evt.RawEvent.Id.NodeId)
}

if evt.OutputInterface == nil {
if evt.GetOutputData() != nil {
// metric this as error
logger.Errorf(ctx, "No output interface to process for task event from [%s] node %s, but output data is not nil", execID, evt.RawEvent.Id.NodeId)
}
logger.Debugf(ctx, "No output interface to process for task event from [%s] node %s", execID, evt.RawEvent.Id.NodeId)
return nil
}

if evt.RawEvent.GetTaskNodeMetadata() != nil {
if evt.RawEvent.GetTaskNodeMetadata().CacheStatus == core.CatalogCacheStatus_CACHE_HIT {
logger.Debugf(ctx, "Skipping cache hit for %s", evt.RawEvent.Id)
return nil
}
}
var taskExecID *core.TaskExecutionIdentifier
if taskExecID = evt.GetTaskExecId(); taskExecID == nil {
logger.Debugf(ctx, "No task execution id to process for task event from [%s] node %s", execID, evt.RawEvent.Id.NodeId)
return nil
}

// See note on the cloudevent_publisher side, we'll have to call one of the get data endpoints to get the actual data
// rather than reading them here. But read here for now.

// Iterate through the output interface. For any outputs that have an artifact ID specified, grab the
// output Literal and construct a Create request and call the service.
for varName, variable := range evt.OutputInterface.Outputs.Variables {
Expand All @@ -191,14 +231,8 @@ func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source str

output := evt.OutputData.Literals[varName]

taskExecID := core.TaskExecutionIdentifier{
TaskId: evt.RawEvent.TaskId,
NodeExecutionId: evt.RawEvent.ParentNodeExecutionId,
RetryAttempt: evt.RawEvent.RetryAttempt,
}

// Add a tracking tag to the Literal before saving.
version := fmt.Sprintf("%s/%s", source, varName)
version := fmt.Sprintf("%s/%d/%s", source, taskExecID.RetryAttempt, varName)
trackingTag := fmt.Sprintf("%s/%s/%s", execID.Project, execID.Domain, version)
if output.Metadata == nil {
output.Metadata = make(map[string]string, 1)
Expand All @@ -208,7 +242,7 @@ func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source str
spec := artifact.ArtifactSpec{
Value: output,
Type: evt.OutputInterface.Outputs.Variables[varName].Type,
TaskExecution: &taskExecID,
TaskExecution: taskExecID,
Execution: execID,
}

Expand Down Expand Up @@ -253,11 +287,6 @@ func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source str
logger.Debugf(ctx, "Created artifact id [%+v] for key %s", resp.Artifact.ArtifactId, varName)
}
}

return nil
}

func (s *ServiceCallHandler) HandleEventNodeExec(_ context.Context, _ *event.CloudEventNodeExecution) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions flyteidl/gen/pb-cpp/flyteidl/artifact/artifacts.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 484e144

Please sign in to comment.