Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Artf/switch event #4428

Merged
merged 3 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 123 additions & 49 deletions flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
"context"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flytestdlib/contextutils"

"reflect"
"time"

Expand Down Expand Up @@ -200,47 +203,89 @@
}

return &event.CloudEventWorkflowExecution{
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: &workflowInterface,
InputData: inputs,
ScheduledAt: spec.GetMetadata().GetScheduledAt(),
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ParentNodeExecution: spec.GetMetadata().GetParentNodeExecution(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: &workflowInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,

Check warning on line 212 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L206-L212

Added lines #L206 - L212 were not covered by tests
}, nil
}

func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Context, rawEvent *event.NodeExecutionEvent) (*event.CloudEventNodeExecution, error) {
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
}, nil
func getNodeExecutionContext(ctx context.Context, identifier *core.NodeExecutionIdentifier) context.Context {
ctx = contextutils.WithProjectDomain(ctx, identifier.ExecutionId.Project, identifier.ExecutionId.Domain)
ctx = contextutils.WithExecutionID(ctx, identifier.ExecutionId.Name)
return contextutils.WithNodeID(ctx, identifier.NodeId)

Check warning on line 219 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L216-L219

Added lines #L216 - L219 were not covered by tests
}

func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Context, rawEvent *event.TaskExecutionEvent) (*event.CloudEventTaskExecution, error) {
// This is a rough copy of the ListTaskExecutions function in TaskExecutionManager. It can be deprecated once we move the processing out of Admin itself.
// Just return the highest retry attempt.
func (c *CloudEventWrappedPublisher) getLatestTaskExecutions(ctx context.Context, nodeExecutionID core.NodeExecutionIdentifier) (*admin.TaskExecution, error) {
ctx = getNodeExecutionContext(ctx, &nodeExecutionID)

Check warning on line 225 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L224-L225

Added lines #L224 - L225 were not covered by tests

if rawEvent == nil {
return nil, fmt.Errorf("nothing to publish, TaskExecution event is nil")
identifierFilters, err := util.GetNodeExecutionIdentifierFilters(ctx, nodeExecutionID)
if err != nil {
return nil, err

Check warning on line 229 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L227-L229

Added lines #L227 - L229 were not covered by tests
}

// For now, don't append any additional information unless succeeded
if rawEvent.Phase != core.TaskExecution_SUCCEEDED {
return &event.CloudEventTaskExecution{
RawEvent: rawEvent,
OutputData: nil,
OutputInterface: nil,
sort := admin.Sort{
Key: "retry_attempt",
Direction: 0,
}
sortParameter, err := common.NewSortParameter(&sort, models.TaskExecutionColumns)
if err != nil {
return nil, err
}

Check warning on line 239 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L232-L239

Added lines #L232 - L239 were not covered by tests

output, err := c.db.TaskExecutionRepo().List(ctx, repositoryInterfaces.ListResourceInput{
InlineFilters: identifierFilters,
Offset: 0,
Limit: 1,
SortParameter: sortParameter,
})
if err != nil {
return nil, err
}
if output.TaskExecutions == nil || len(output.TaskExecutions) == 0 {
logger.Debugf(ctx, "no task executions found for node exec id [%+v]", nodeExecutionID)
return nil, nil
}

Check warning on line 253 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L241-L253

Added lines #L241 - L253 were not covered by tests

taskExecutionList, err := transformers.FromTaskExecutionModels(output.TaskExecutions, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "failed to transform task execution models for node exec id [%+v] with err: %v", nodeExecutionID, err)
return nil, err
}

Check warning on line 259 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L255-L259

Added lines #L255 - L259 were not covered by tests

return taskExecutionList[0], nil

Check warning on line 261 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L261

Added line #L261 was not covered by tests
}

func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Context, rawEvent *event.NodeExecutionEvent) (*event.CloudEventNodeExecution, error) {
if rawEvent == nil || rawEvent.Id == nil {
return nil, fmt.Errorf("nothing to publish, NodeExecution event or ID is nil")
}

Check warning on line 267 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L264-L267

Added lines #L264 - L267 were not covered by tests

// Skip nodes unless they're succeeded and not start nodes
if rawEvent.Phase != core.NodeExecution_SUCCEEDED {
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
}, nil
} else if rawEvent.Id.NodeId == "start-node" {
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,

Check warning on line 276 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L270-L276

Added lines #L270 - L276 were not covered by tests
}, nil
}
// metric

// This gets the parent workflow execution metadata
executionModel, err := c.db.ExecutionRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: rawEvent.ParentNodeExecutionId.ExecutionId.Project,
Domain: rawEvent.ParentNodeExecutionId.ExecutionId.Domain,
Name: rawEvent.ParentNodeExecutionId.ExecutionId.Name,
Project: rawEvent.Id.ExecutionId.Project,
Domain: rawEvent.Id.ExecutionId.Domain,
Name: rawEvent.Id.ExecutionId.Name,

Check warning on line 285 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L283-L285

Added lines #L283 - L285 were not covered by tests
})
if err != nil {
logger.Infof(ctx, "couldn't find execution [%+v] to save termination cause", rawEvent.ParentNodeExecutionId)
logger.Infof(ctx, "couldn't find execution [%+v] for cloud event processing", rawEvent.Id.ExecutionId)

Check warning on line 288 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L288

Added line #L288 was not covered by tests
return nil, err
}

Expand All @@ -250,19 +295,9 @@
fmt.Printf("there was an error with spec %v %v", err, executionModel.Spec)
}

taskModel, err := c.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: rawEvent.TaskId.Project,
Domain: rawEvent.TaskId.Domain,
Name: rawEvent.TaskId.Name,
Version: rawEvent.TaskId.Version,
})
if err != nil {
// TODO: metric this
logger.Debugf(ctx, "Failed to get task with task id [%+v] with err %v", rawEvent.TaskId, err)
return nil, err
}
task, err := transformers.FromTaskModel(taskModel)

// Get inputs/outputs
// This will likely need to move to the artifact service side, given message size limits.
// Replace with call to GetNodeExecutionData
var inputs *core.LiteralMap
if rawEvent.GetInputData() != nil {
inputs = rawEvent.GetInputData()
Expand All @@ -273,9 +308,10 @@
fmt.Printf("Error fetching input literal map %v", rawEvent)
}
} else {
logger.Infof(ctx, "Task execution for node exec [%+v] has no input data", rawEvent.ParentNodeExecutionId)
logger.Infof(ctx, "Node execution for node exec [%+v] has no input data", rawEvent.Id)

Check warning on line 311 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L311

Added line #L311 was not covered by tests
}

// This will likely need to move to the artifact service side, given message size limits.
var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
outputs = rawEvent.GetOutputData()
Expand All @@ -289,16 +325,53 @@
}
}

// Fetch the latest task execution if any, and pull out the task interface, if applicable.
// These are optional fields... if the node execution doesn't have a task execution then these will be empty.
var taskExecID *core.TaskExecutionIdentifier
var typedInterface *core.TypedInterface

lte, err := c.getLatestTaskExecutions(ctx, *rawEvent.Id)
if err != nil {
logger.Errorf(ctx, "failed to get latest task execution for node exec id [%+v] with err: %v", rawEvent.Id, err)
return nil, err
}
if lte != nil {
taskModel, err := c.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: lte.Id.TaskId.Project,
Domain: lte.Id.TaskId.Domain,
Name: lte.Id.TaskId.Name,
Version: lte.Id.TaskId.Version,
})
if err != nil {
// TODO: metric this
// metric
logger.Debugf(ctx, "Failed to get task with task id [%+v] with err %v", lte.Id.TaskId, err)
return nil, err
}
task, err := transformers.FromTaskModel(taskModel)
typedInterface = task.Closure.CompiledTask.Template.Interface
taskExecID = lte.Id

Check warning on line 353 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L330-L353

Added lines #L330 - L353 were not covered by tests
}

return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
TaskExecId: taskExecID,
OutputData: outputs,
OutputInterface: typedInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
LaunchPlanId: spec.LaunchPlan,
}, nil

Check warning on line 364 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L356-L364

Added lines #L356 - L364 were not covered by tests
}

func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Context, rawEvent *event.TaskExecutionEvent) (*event.CloudEventTaskExecution, error) {

if rawEvent == nil {
return nil, fmt.Errorf("nothing to publish, TaskExecution event is nil")
}

Check warning on line 371 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L367-L371

Added lines #L367 - L371 were not covered by tests

return &event.CloudEventTaskExecution{
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: task.Closure.CompiledTask.Template.Interface,
InputData: inputs,
ScheduledAt: spec.GetMetadata().GetScheduledAt(),
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ParentNodeExecution: spec.GetMetadata().GetParentNodeExecution(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,
RawEvent: rawEvent,

Check warning on line 374 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L374

Added line #L374 was not covered by tests
}, nil
}

Expand Down Expand Up @@ -359,6 +432,7 @@
phase = e.Phase.String()
eventTime = e.OccurredAt.AsTime()
eventID = fmt.Sprintf("%v.%v", executionID, phase)
eventSource = common.FlyteURLKeyFromNodeExecutionID(*msgType.Event.Id)

Check warning on line 435 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L435

Added line #L435 was not covered by tests
finalMsg, err = c.TransformNodeExecutionEvent(ctx, e)
case *event.CloudEventExecutionStart:
topic = "cloudevents.ExecutionStart"
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
}

go func() {
ceCtx := context.Background()
ceCtx := context.TODO()
if err := m.cloudEventsPublisher.Publish(ceCtx, proto.MessageName(&request), &request); err != nil {
logger.Errorf(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err)
}
Expand Down
63 changes: 46 additions & 17 deletions flyteartifacts/pkg/server/processor/events_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ func (s *ServiceCallHandler) HandleEvent(ctx context.Context, cloudEvent *event2
return s.HandleEventTaskExec(ctx, source, msgType)
case *event.CloudEventNodeExecution:
logger.Debugf(ctx, "Handling CloudEventNodeExecution [%v]", msgType.RawEvent.Id)
return s.HandleEventNodeExec(ctx, msgType)
return s.HandleEventNodeExec(ctx, source, msgType)
default:
return fmt.Errorf("HandleEvent found unknown message type [%T]", msgType)
}
}

func (s *ServiceCallHandler) HandleEventExecStart(_ context.Context, _ *event.CloudEventExecutionStart) error {
// metric
return nil
}

Expand Down Expand Up @@ -171,18 +172,57 @@ func getPartitionsAndTag(ctx context.Context, partialID core.ArtifactID, variabl
return partitions, tag, nil
}

func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source string, evt *event.CloudEventTaskExecution) error {
func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, _ string, evt *event.CloudEventTaskExecution) error {

if evt.RawEvent.Phase != core.TaskExecution_SUCCEEDED {
logger.Debug(ctx, "Skipping non-successful task execution event")
return nil
}
// metric

execID := evt.RawEvent.ParentNodeExecutionId.ExecutionId
return nil
}

func (s *ServiceCallHandler) HandleEventNodeExec(ctx context.Context, source string, evt *event.CloudEventNodeExecution) error {
if evt.RawEvent.Phase != core.NodeExecution_SUCCEEDED {
logger.Debug(ctx, "Skipping non-successful task execution event")
return nil
}
if evt.RawEvent.Id.NodeId == "end-node" {
logger.Debug(ctx, "Skipping end node for %s", evt.RawEvent.Id.ExecutionId.Name)
return nil
}
// metric

execID := evt.RawEvent.Id.ExecutionId
if evt.GetOutputData().GetLiterals() == nil || len(evt.OutputData.Literals) == 0 {
logger.Debugf(ctx, "No output data to process for task event from [%v]: %s", execID, evt.RawEvent.TaskId.Name)
logger.Debugf(ctx, "No output data to process for task event from [%s] node %s", execID, evt.RawEvent.Id.NodeId)
}

if evt.OutputInterface == nil {
if evt.GetOutputData() != nil {
// metric this as error
logger.Errorf(ctx, "No output interface to process for task event from [%s] node %s, but output data is not nil", execID, evt.RawEvent.Id.NodeId)
}
logger.Debugf(ctx, "No output interface to process for task event from [%s] node %s", execID, evt.RawEvent.Id.NodeId)
return nil
}

if evt.RawEvent.GetTaskNodeMetadata() != nil {
if evt.RawEvent.GetTaskNodeMetadata().CacheStatus == core.CatalogCacheStatus_CACHE_HIT {
logger.Debugf(ctx, "Skipping cache hit for %s", evt.RawEvent.Id)
return nil
}
}
var taskExecID *core.TaskExecutionIdentifier
if taskExecID = evt.GetTaskExecId(); taskExecID == nil {
logger.Debugf(ctx, "No task execution id to process for task event from [%s] node %s", execID, evt.RawEvent.Id.NodeId)
return nil
}

// See note on the cloudevent_publisher side, we'll have to call one of the get data endpoints to get the actual data
// rather than reading them here. But read here for now.

// Iterate through the output interface. For any outputs that have an artifact ID specified, grab the
// output Literal and construct a Create request and call the service.
for varName, variable := range evt.OutputInterface.Outputs.Variables {
Expand All @@ -191,14 +231,8 @@ func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source str

output := evt.OutputData.Literals[varName]

taskExecID := core.TaskExecutionIdentifier{
TaskId: evt.RawEvent.TaskId,
NodeExecutionId: evt.RawEvent.ParentNodeExecutionId,
RetryAttempt: evt.RawEvent.RetryAttempt,
}

// Add a tracking tag to the Literal before saving.
version := fmt.Sprintf("%s/%s", source, varName)
version := fmt.Sprintf("%s/%d/%s", source, taskExecID.RetryAttempt, varName)
trackingTag := fmt.Sprintf("%s/%s/%s", execID.Project, execID.Domain, version)
if output.Metadata == nil {
output.Metadata = make(map[string]string, 1)
Expand All @@ -208,7 +242,7 @@ func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source str
spec := artifact.ArtifactSpec{
Value: output,
Type: evt.OutputInterface.Outputs.Variables[varName].Type,
TaskExecution: &taskExecID,
TaskExecution: taskExecID,
Execution: execID,
}

Expand Down Expand Up @@ -253,11 +287,6 @@ func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source str
logger.Debugf(ctx, "Created artifact id [%+v] for key %s", resp.Artifact.ArtifactId, varName)
}
}

return nil
}

func (s *ServiceCallHandler) HandleEventNodeExec(_ context.Context, _ *event.CloudEventNodeExecution) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions flyteidl/gen/pb-cpp/flyteidl/artifact/artifacts.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading