diff --git a/cmd/single/start.go b/cmd/single/start.go index 5bfad53228..47234658a2 100644 --- a/cmd/single/start.go +++ b/cmd/single/start.go @@ -85,8 +85,13 @@ func startArtifact(ctx context.Context, cfg Artifacts) error { g.Go(func() error { cfg := configuration.GetApplicationConfig() serverCfg := &cfg.ArtifactServerConfig - return sharedCmd.ServeGateway(childCtx, "artifacts", serverCfg, artifactsServer.GrpcRegistrationHook, + err := sharedCmd.ServeGateway(childCtx, "artifacts", serverCfg, artifactsServer.GrpcRegistrationHook, artifactsServer.HttpRegistrationHook) + if err != nil { + logger.Errorf(childCtx, "Failed to start Artifacts server. Error: %v", err) + return err + } + return nil }) return g.Wait() diff --git a/flyte-single-binary-local.yaml b/flyte-single-binary-local.yaml index 63db4580ba..7fca092002 100644 --- a/flyte-single-binary-local.yaml +++ b/flyte-single-binary-local.yaml @@ -17,7 +17,7 @@ cluster_resources: logger: show-source: true - level: 6 + level: 3 propeller: create-flyteworkflow-crd: true @@ -82,7 +82,7 @@ database: # options: "sslmode=disable" cloudEvents: enable: true - cloudEventVersion: 2 + cloudEventVersion: v2 type: sandbox # For admin to find artifacts service artifacts: @@ -90,9 +90,6 @@ artifacts: port: 50051 insecure: true # For artifact service itself -sharedServer: - port: 50051 - httpPort: 50050 artifactsServer: artifactBlobStoreConfig: type: stow @@ -105,6 +102,8 @@ artifactsServer: auth_type: accesskey access_key_id: minio secret_key: miniostorage +artifactsProcessor: + cloudProvider: Sandbox storage: type: stow stow: diff --git a/flyteadmin/pkg/async/cloudevent/factory.go b/flyteadmin/pkg/async/cloudevent/factory.go index bfa94689f4..423ff4e7eb 100644 --- a/flyteadmin/pkg/async/cloudevent/factory.go +++ b/flyteadmin/pkg/async/cloudevent/factory.go @@ -92,7 +92,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi case common.Sandbox: var publisher pubsub.Publisher - publisher = sandbox_utils.CloudEventsPublisher + publisher = sandbox_utils.NewCloudEventsPublisher() sender = &cloudEventImplementations.PubSubSender{ Pub: publisher, } diff --git a/flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go b/flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go index ae3fe7c3fb..cba75d4141 100644 --- a/flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go +++ b/flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go @@ -190,10 +190,8 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context // Get outputs from the workflow execution var outputs *core.LiteralMap if rawEvent.GetOutputData() != nil { - fmt.Printf("remove this - Got output data") outputs = rawEvent.GetOutputData() } else if len(rawEvent.GetOutputUri()) > 0 { - fmt.Printf("remove this - Got output URI") // GetInputs actually fetches the data, even though this is an output outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig, c.storageClient, rawEvent.GetOutputUri()) if err != nil { @@ -282,10 +280,8 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con var outputs *core.LiteralMap if rawEvent.GetOutputData() != nil { - fmt.Printf("remove this - task Got output data") outputs = rawEvent.GetOutputData() } else if len(rawEvent.GetOutputUri()) > 0 { - fmt.Printf("remove this - task Got output URI") // GetInputs actually fetches the data, even though this is an output outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig, c.storageClient, rawEvent.GetOutputUri()) @@ -375,6 +371,7 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy executionID = msgType.ExecutionId.String() eventID = fmt.Sprintf("%v", executionID) eventTime = time.Now() + // CloudEventExecutionStart don't have a nested event finalMsg = msgType default: return fmt.Errorf("unsupported event types [%+v]", reflect.TypeOf(msg)) diff --git a/flyteadmin/pkg/manager/impl/task_execution_manager.go b/flyteadmin/pkg/manager/impl/task_execution_manager.go index 6c4fb72e16..0e6b527ccb 100644 --- a/flyteadmin/pkg/manager/impl/task_execution_manager.go +++ b/flyteadmin/pkg/manager/impl/task_execution_manager.go @@ -126,13 +126,18 @@ func (m *TaskExecutionManager) updateTaskExecutionModelState( func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, request admin.TaskExecutionEventRequest) ( *admin.TaskExecutionEventResponse, error) { + + logger.Warningf(ctx, "HERE!!!123") + if err := validation.ValidateTaskExecutionRequest(request, m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes); err != nil { return nil, err } + logger.Warningf(ctx, "HERE!!!123-1") if err := validation.ValidateClusterForExecutionID(ctx, m.db, request.Event.ParentNodeExecutionId.ExecutionId, request.Event.ProducerId); err != nil { return nil, err } + logger.Warningf(ctx, "HERE!!!123-2") // Get the parent node execution, if none found a MissingEntityError will be returned nodeExecutionID := request.Event.ParentNodeExecutionId @@ -204,10 +209,12 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err) } + logger.Warningf(ctx, "HERE!!!123-3") go func() { - ceCtx := context.TODO() + ceCtx := context.Background() + logger.Warningf(ctx, "HERE!!!123-4") if err := m.cloudEventsPublisher.Publish(ceCtx, proto.MessageName(&request), &request); err != nil { - logger.Infof(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err) + logger.Errorf(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err) } }() diff --git a/flyteadmin/pkg/runtime/interfaces/application_configuration.go b/flyteadmin/pkg/runtime/interfaces/application_configuration.go index cb351062d5..b24d831e6e 100644 --- a/flyteadmin/pkg/runtime/interfaces/application_configuration.go +++ b/flyteadmin/pkg/runtime/interfaces/application_configuration.go @@ -536,8 +536,8 @@ type ExternalEventsConfig struct { ReconnectDelaySeconds int `json:"reconnectDelaySeconds"` } -//go:generate enumer -type=CloudEventVersion -trimprefix=CloudEventVersion -type CloudEventVersion int +//go:generate enumer -type=CloudEventVersion -json -yaml -trimprefix=CloudEventVersion +type CloudEventVersion uint8 const ( // This is the initial version of the cloud events diff --git a/flyteadmin/pkg/runtime/interfaces/cloudeventversion_enumer.go b/flyteadmin/pkg/runtime/interfaces/cloudeventversion_enumer.go index 5a3403ee72..5ec7e3dc87 100644 --- a/flyteadmin/pkg/runtime/interfaces/cloudeventversion_enumer.go +++ b/flyteadmin/pkg/runtime/interfaces/cloudeventversion_enumer.go @@ -1,8 +1,9 @@ -// Code generated by "enumer -type=CloudEventVersion -trimprefix=CloudEventVersion"; DO NOT EDIT. +// Code generated by "enumer -type=CloudEventVersion -json -yaml -trimprefix=CloudEventVersion"; DO NOT EDIT. package interfaces import ( + "encoding/json" "fmt" ) @@ -11,7 +12,7 @@ const _CloudEventVersionName = "v1v2" var _CloudEventVersionIndex = [...]uint8{0, 2, 4} func (i CloudEventVersion) String() string { - if i < 0 || i >= CloudEventVersion(len(_CloudEventVersionIndex)-1) { + if i >= CloudEventVersion(len(_CloudEventVersionIndex)-1) { return fmt.Sprintf("CloudEventVersion(%d)", i) } return _CloudEventVersionName[_CloudEventVersionIndex[i]:_CloudEventVersionIndex[i+1]] @@ -47,3 +48,37 @@ func (i CloudEventVersion) IsACloudEventVersion() bool { } return false } + +// MarshalJSON implements the json.Marshaler interface for CloudEventVersion +func (i CloudEventVersion) MarshalJSON() ([]byte, error) { + return json.Marshal(i.String()) +} + +// UnmarshalJSON implements the json.Unmarshaler interface for CloudEventVersion +func (i *CloudEventVersion) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return fmt.Errorf("CloudEventVersion should be a string, got %s", data) + } + + var err error + *i, err = CloudEventVersionString(s) + return err +} + +// MarshalYAML implements a YAML Marshaler for CloudEventVersion +func (i CloudEventVersion) MarshalYAML() (interface{}, error) { + return i.String(), nil +} + +// UnmarshalYAML implements a YAML Unmarshaler for CloudEventVersion +func (i *CloudEventVersion) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + + var err error + *i, err = CloudEventVersionString(s) + return err +} diff --git a/flyteartifacts/pkg/lib/constants.go b/flyteartifacts/pkg/lib/constants.go new file mode 100644 index 0000000000..845b570525 --- /dev/null +++ b/flyteartifacts/pkg/lib/constants.go @@ -0,0 +1,4 @@ +package lib + +// ArtifactKey - This is used to tag Literals as a tracking bit. +const ArtifactKey = "_ua" diff --git a/flyteartifacts/pkg/lib/string_converter.go b/flyteartifacts/pkg/lib/string_converter.go new file mode 100644 index 0000000000..dd3f2133d1 --- /dev/null +++ b/flyteartifacts/pkg/lib/string_converter.go @@ -0,0 +1,56 @@ +package lib + +import ( + "fmt" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + "strings" + "time" +) + +func RenderLiteral(lit *core.Literal) (string, error) { + if lit == nil { + return "", fmt.Errorf("can't RenderLiteral, input is nil") + } + + switch lit.Value.(type) { + case *core.Literal_Scalar: + scalar := lit.GetScalar() + if scalar.GetPrimitive() == nil { + return "", fmt.Errorf("rendering only works for primitives, got [%v]", scalar) + } + // todo: figure out how to expose more formatting + // todo: maybe add a metric to each one of these, or this whole block. + switch scalar.GetPrimitive().GetValue().(type) { + case *core.Primitive_StringValue: + return scalar.GetPrimitive().GetStringValue(), nil + case *core.Primitive_Integer: + return fmt.Sprintf("%d", scalar.GetPrimitive().GetInteger()), nil + case *core.Primitive_FloatValue: + return fmt.Sprintf("%v", scalar.GetPrimitive().GetFloatValue()), nil + case *core.Primitive_Boolean: + if scalar.GetPrimitive().GetBoolean() { + return "true", nil + } + return "false", nil + case *core.Primitive_Datetime: + // just date for now, not sure if we should support time... + dt := scalar.GetPrimitive().GetDatetime().AsTime() + txt := dt.Format("2006-01-02") + return txt, nil + case *core.Primitive_Duration: + dur := scalar.GetPrimitive().GetDuration().AsDuration() + // Found somewhere as iso8601 representation of duration, but there's still lots of + // possibilities for formatting. + txt := "PT" + strings.ToUpper(dur.Truncate(time.Millisecond).String()) + return txt, nil + default: + return "", fmt.Errorf("unknown primitive type [%v]", scalar.GetPrimitive()) + } + case *core.Literal_Collection: + return "", fmt.Errorf("can't RenderLiteral for collections") + case *core.Literal_Map: + return "", fmt.Errorf("can't RenderLiteral for maps") + } + + return "", fmt.Errorf("unknown literal type [%v]", lit) +} diff --git a/flyteartifacts/pkg/lib/string_converter_test.go b/flyteartifacts/pkg/lib/string_converter_test.go new file mode 100644 index 0000000000..ec50b1f46d --- /dev/null +++ b/flyteartifacts/pkg/lib/string_converter_test.go @@ -0,0 +1,24 @@ +package lib + +import ( + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestRenderDate(t *testing.T) { + dt := time.Date(2020, 12, 8, 0, 0, 0, 0, time.UTC) + pt := timestamp.Timestamp{ + Seconds: dt.Unix(), + Nanos: 0, + } + lit := core.Literal{ + Value: &core.Literal_Scalar{Scalar: &core.Scalar{Value: &core.Scalar_Primitive{Primitive: &core.Primitive{Value: &core.Primitive_Datetime{Datetime: &pt}}}}}, + } + + txt, err := RenderLiteral(&lit) + assert.NoError(t, err) + assert.Equal(t, "2020-12-08", txt) +} diff --git a/flyteartifacts/pkg/server/processor/channel_processor.go b/flyteartifacts/pkg/server/processor/channel_processor.go index c445caf5ea..625e1701b1 100644 --- a/flyteartifacts/pkg/server/processor/channel_processor.go +++ b/flyteartifacts/pkg/server/processor/channel_processor.go @@ -1,11 +1,16 @@ package processor import ( + "bytes" "context" + "fmt" pbcloudevents "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" "github.com/cloudevents/sdk-go/v2/event" + flyteEvents "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/sandbox_utils" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" "time" ) @@ -27,31 +32,80 @@ func (p *SandboxCloudEventsReceiver) StartProcessing(ctx context.Context) { logger.Warning(context.Background(), "Sandbox cloud event processor has stopped because context cancelled") } +func (p *SandboxCloudEventsReceiver) handleMessage(ctx context.Context, sandboxMsg sandbox_utils.SandboxMessage) error { + ce := &event.Event{} + err := pbcloudevents.Protobuf.Unmarshal(sandboxMsg.Raw, ce) + if err != nil { + logger.Errorf(context.Background(), "error with unmarshalling message [%v]", err) + return err + } + logger.Debugf(ctx, "Cloud event received message [%+v]", ce) + // ce data should be a jsonpb Marshaled proto message, one of + // - event.CloudEventTaskExecution + // - event.CloudEventNodeExecution + // - event.CloudEventWorkflowExecution + // - event.CloudEventExecutionStart + ceData := bytes.NewReader(ce.Data()) + unmarshaler := jsonpb.Unmarshaler{} + + // Use the type to determine which proto message to unmarshal to. + var flyteEvent proto.Message + if ce.Type() == "com.flyte.resource.cloudevents.TaskExecution" { + flyteEvent = &flyteEvents.CloudEventTaskExecution{} + err = unmarshaler.Unmarshal(ceData, flyteEvent) + } else if ce.Type() == "cloudevents.WorkflowExecution" { + flyteEvent = &flyteEvents.CloudEventWorkflowExecution{} + err = unmarshaler.Unmarshal(ceData, flyteEvent) + } else if ce.Type() == "cloudevents.NodeExecution" { + flyteEvent = &flyteEvents.CloudEventNodeExecution{} + err = unmarshaler.Unmarshal(ceData, flyteEvent) + } else if ce.Type() == "cloudevents.ExecutionStart" { + flyteEvent = &flyteEvents.CloudEventExecutionStart{} + err = unmarshaler.Unmarshal(ceData, flyteEvent) + } else { + logger.Warningf(ctx, "Ignoring cloud event type [%s]", ce.Type()) + return nil + } + if err != nil { + logger.Errorf(ctx, "error unmarshalling message on topic [%s] [%v]", sandboxMsg.Topic, err) + return err + } + + err = p.Handler.HandleEvent(ctx, ce, flyteEvent) + if err != nil { + logger.Errorf(context.Background(), "error handling event on topic [%s] [%v]", sandboxMsg.Topic, err) + return err + } + return nil +} + func (p *SandboxCloudEventsReceiver) run(ctx context.Context) error { for { select { case <-ctx.Done(): - logger.Debug(context.Background(), "Context cancelled, stopping processing.") + logger.Warning(context.Background(), "Context cancelled, stopping processing.") return nil case sandboxMsg := <-p.subChan: + fmt.Println("HERE222!!!!!!!!!!-receive") logger.Debugf(ctx, "received message [%v]", sandboxMsg) if sandboxMsg.Raw != nil { - ce := &event.Event{} - err := pbcloudevents.Protobuf.Unmarshal(sandboxMsg.Raw, ce) + err := p.handleMessage(ctx, sandboxMsg) if err != nil { - logger.Errorf(context.Background(), "error with unmarshalling message [%v]", err) - return err + // Assuming that handle message will return a fair number of errors + // add metric + logger.Infof(ctx, "error processing sandbox cloud event [%v] with err [%v]", sandboxMsg, err) } - logger.Debugf(ctx, "Cloud event received message [%+v]", ce) + fmt.Println("HERE222!!!!!!!!!!-success") + } else { + logger.Infof(ctx, "sandbox receiver ignoring message [%v]", sandboxMsg) } - logger.Infof(ctx, "sandbox receiver ignoring message [%v]", sandboxMsg) } } } func (p *SandboxCloudEventsReceiver) StopProcessing() error { - logger.Debug(context.Background(), "call to sandbox stop processing.") + logger.Warning(context.Background(), "StopProcessing called on SandboxCloudEventsReceiver") return nil } diff --git a/flyteartifacts/pkg/server/processor/events_handler.go b/flyteartifacts/pkg/server/processor/events_handler.go index 68be5f6b4c..e9a1cd575c 100644 --- a/flyteartifacts/pkg/server/processor/events_handler.go +++ b/flyteartifacts/pkg/server/processor/events_handler.go @@ -2,9 +2,14 @@ package processor import ( "context" + "fmt" + event2 "github.com/cloudevents/sdk-go/v2/event" + "github.com/flyteorg/flyte/flyteartifacts/pkg/lib" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/golang/protobuf/proto" ) // ServiceCallHandler will take events and call the grpc endpoints directly. The service should most likely be local. @@ -12,24 +17,173 @@ type ServiceCallHandler struct { service artifact.ArtifactRegistryServer } -func (s *ServiceCallHandler) HandleEventExecStart(ctx context.Context, start *event.CloudEventExecutionStart) error { - //TODO implement me - panic("implement me") +func (s *ServiceCallHandler) HandleEvent(ctx context.Context, cloudEvent *event2.Event, msg proto.Message) error { + source := cloudEvent.Source() + + switch msgType := msg.(type) { + case *event.CloudEventExecutionStart: + logger.Debugf(ctx, "Handling CloudEventExecutionStart [%v]", msgType.ExecutionId) + return s.HandleEventExecStart(ctx, msgType) + case *event.CloudEventWorkflowExecution: + logger.Debugf(ctx, "Handling CloudEventWorkflowExecution [%v]", msgType.RawEvent.ExecutionId) + return s.HandleEventWorkflowExec(ctx, source, msgType) + case *event.CloudEventTaskExecution: + logger.Debugf(ctx, "Handling CloudEventTaskExecution [%v]", msgType.RawEvent.ParentNodeExecutionId) + return s.HandleEventTaskExec(ctx, source, msgType) + case *event.CloudEventNodeExecution: + logger.Debugf(ctx, "Handling CloudEventNodeExecution [%v]", msgType.RawEvent.Id) + return s.HandleEventNodeExec(ctx, msgType) + default: + return fmt.Errorf("HandleEvent found unknown message type [%T]", msgType) + } +} + +func (s *ServiceCallHandler) HandleEventExecStart(_ context.Context, _ *event.CloudEventExecutionStart) error { + return nil +} + +func (s *ServiceCallHandler) HandleEventWorkflowExec(ctx context.Context, source string, evt *event.CloudEventWorkflowExecution) error { + + if evt.RawEvent.Phase != core.WorkflowExecution_SUCCEEDED { + logger.Debug(ctx, "Skipping non-successful workflow execution event") + return nil + } + return nil } -func (s *ServiceCallHandler) HandleEventWorkflowExec(ctx context.Context, execution *event.CloudEventWorkflowExecution) error { - //TODO implement me - panic("implement me") +func getPartitionsAndTag(ctx context.Context, partialID core.ArtifactID, variable *core.Variable, inputData *core.LiteralMap) (map[string]string, string, error) { + if variable == nil || inputData == nil { + return nil, "", fmt.Errorf("variable or input data is nil") + } + + var partitions map[string]string + // todo: consider updating idl to make CreateArtifactRequest just take a full Partitions + // object rather than a mapstrstr @eapolinario @enghabu + if partialID.GetPartitions().GetValue() != nil && len(partialID.GetPartitions().GetValue()) > 0 { + partitions = make(map[string]string, len(partialID.GetPartitions().GetValue())) + for k, lv := range partialID.GetPartitions().GetValue() { + if lv.GetStaticValue() != "" { + partitions[k] = lv.GetStaticValue() + } else if lv.GetInputBinding() != nil { + if lit, ok := inputData.Literals[lv.GetInputBinding().GetVar()]; ok { + // todo: figure out formatting. Maybe we can add formatting directives to the input binding + // @enghabu @eapolinario + renderedStr, err := lib.RenderLiteral(lit) + if err != nil { + logger.Errorf(ctx, "failed to render literal for input [%s] partition [%s] with error: %v", lv.GetInputBinding().GetVar(), k, err) + return nil, "", err + } + partitions[k] = renderedStr + } else { + return nil, "", fmt.Errorf("input binding [%s] not found in input data", lv.GetInputBinding().GetVar()) + } + } else { + return nil, "", fmt.Errorf("unknown binding found in context of a materialized artifact") + } + } + } + + var tag = "" + var err error + if lv := variable.GetArtifactTag().GetValue(); lv != nil { + if lv.GetStaticValue() != "" { + tag = lv.GetStaticValue() + } else if lv.GetInputBinding() != nil { + tag, err = lib.RenderLiteral(inputData.Literals[lv.GetInputBinding().GetVar()]) + if err != nil { + logger.Errorf(ctx, "failed to render input [%s] for tag with error: %v", lv.GetInputBinding().GetVar(), err) + return nil, "", err + } + } else { + return nil, "", fmt.Errorf("triggered binding found in context of a materialized artifact when rendering tag") + } + } + + return partitions, tag, nil } -func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, execution *event.CloudEventTaskExecution) error { - //TODO implement me - panic("implement me") +func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source string, evt *event.CloudEventTaskExecution) error { + fmt.Println("-------->> TASK EVENT 1") + + if evt.RawEvent.Phase != core.TaskExecution_SUCCEEDED { + logger.Debug(ctx, "Skipping non-successful task execution event") + return nil + } + fmt.Println("-------->> TASK EVENT 1 - start") + + execID := evt.RawEvent.ParentNodeExecutionId.ExecutionId + if evt.GetOutputData().GetLiterals() == nil || len(evt.OutputData.Literals) == 0 { + logger.Debugf(ctx, "No output data to process for task event from [%v]: %s", execID, evt.RawEvent.TaskId.Name) + } + // Iterate through the output interface. For any outputs that have an artifact ID specified, grab the + // output Literal and construct a Create request and call the service. + for varName, variable := range evt.OutputInterface.Outputs.Variables { + if variable.GetArtifactPartialId() != nil { + logger.Debugf(ctx, "Processing output for %s, artifact name %s, from %v", varName, variable.GetArtifactPartialId().ArtifactKey.Name, execID) + + output := evt.OutputData.Literals[varName] + + taskExecID := core.TaskExecutionIdentifier{ + TaskId: evt.RawEvent.TaskId, + NodeExecutionId: evt.RawEvent.ParentNodeExecutionId, + RetryAttempt: evt.RawEvent.RetryAttempt, + } + + // Add a tracking tag to the Literal before saving. + version := fmt.Sprintf("%s/%s", source, varName) + trackingTag := fmt.Sprintf("%s/%s/%s", execID.Project, execID.Domain, version) + if output.Metadata == nil { + output.Metadata = make(map[string]string, 1) + } + output.Metadata[lib.ArtifactKey] = trackingTag + + spec := artifact.ArtifactSpec{ + Value: output, + Type: evt.OutputInterface.Outputs.Variables[varName].Type, + TaskExecution: &taskExecID, + Execution: execID, + } + + partitions, tag, err := getPartitionsAndTag( + ctx, + *variable.GetArtifactPartialId(), + variable, + evt.InputData, + ) + if err != nil { + logger.Errorf(ctx, "failed processing [%s] variable [%v] with error: %v", varName, variable, err) + return err + } + ak := core.ArtifactKey{ + Project: execID.Project, + Domain: execID.Domain, + Name: variable.GetArtifactPartialId().ArtifactKey.Name, + } + + req := artifact.CreateArtifactRequest{ + ArtifactKey: &ak, + Version: version, + Spec: &spec, + Partitions: partitions, + Tag: tag, + } + + fmt.Println("-------->> TASK EVENT 2 - create") + resp, err := s.service.CreateArtifact(ctx, &req) + if err != nil { + logger.Errorf(ctx, "failed to create artifact for [%s] with error: %v", varName, err) + return err + } + fmt.Println("-------->> TASK EVENT 3 - end") + logger.Debugf(ctx, "Created artifact id [%+v] for key %s", resp.Artifact.ArtifactId, varName) + } + } + + return nil } -func (s *ServiceCallHandler) HandleEventNodeExec(ctx context.Context, execution *event.CloudEventNodeExecution) error { - //TODO implement me - panic("implement me") +func (s *ServiceCallHandler) HandleEventNodeExec(_ context.Context, _ *event.CloudEventNodeExecution) error { + return nil } func NewServiceCallHandler(ctx context.Context, svc artifact.ArtifactRegistryServer) EventsHandlerInterface { diff --git a/flyteartifacts/pkg/server/processor/events_handler_test.go b/flyteartifacts/pkg/server/processor/events_handler_test.go new file mode 100644 index 0000000000..61d668215b --- /dev/null +++ b/flyteartifacts/pkg/server/processor/events_handler_test.go @@ -0,0 +1,17 @@ +package processor + +import ( + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestMetaDataWrite(t *testing.T) { + lit := core.Literal{ + Value: &core.Literal_Scalar{Scalar: &core.Scalar{Value: &core.Scalar_Primitive{Primitive: &core.Primitive{Value: &core.Primitive_StringValue{StringValue: "test"}}}}}, + } + lit.Metadata = make(map[string]string) + + lit.Metadata["test"] = "test" + assert.Equal(t, "test", lit.Metadata["test"]) +} diff --git a/flyteartifacts/pkg/server/processor/interfaces.go b/flyteartifacts/pkg/server/processor/interfaces.go index cf6bfd8e42..ff61126adb 100644 --- a/flyteartifacts/pkg/server/processor/interfaces.go +++ b/flyteartifacts/pkg/server/processor/interfaces.go @@ -2,14 +2,14 @@ package processor import ( "context" - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/golang/protobuf/proto" ) type EventsHandlerInterface interface { - HandleEventExecStart(context.Context, *event.CloudEventExecutionStart) error - HandleEventWorkflowExec(context.Context, *event.CloudEventWorkflowExecution) error - HandleEventTaskExec(context.Context, *event.CloudEventTaskExecution) error - HandleEventNodeExec(context.Context, *event.CloudEventNodeExecution) error + // HandleEvent The cloudEvent here is the original deserialized event and the proto msg is message + // that's been unmarshalled already from the cloudEvent.Data() field. + HandleEvent(ctx context.Context, cloudEvent *event.Event, msg proto.Message) error } // EventsProcessorInterface is a copy of the notifications processor in admin except that start takes a context diff --git a/flyteartifacts/pkg/server/server.go b/flyteartifacts/pkg/server/server.go index 37ca295ca3..e63fecb5a8 100644 --- a/flyteartifacts/pkg/server/server.go +++ b/flyteartifacts/pkg/server/server.go @@ -9,6 +9,7 @@ import ( "github.com/flyteorg/flyte/flyteartifacts/pkg/server/processor" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" "github.com/flyteorg/flyte/flytestdlib/database" + "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/go-gormigrate/gormigrate/v2" "github.com/grpc-ecosystem/grpc-gateway/runtime" @@ -35,7 +36,10 @@ func NewArtifactService(ctx context.Context, scope promutils.Scope) *ArtifactSer coreService := NewCoreService(storage, &blobStore, scope.NewSubScope("server")) eventsReceiverAndHandler := processor.NewBackgroundProcessor(ctx, *eventsCfg, &coreService, scope.NewSubScope("events")) if eventsReceiverAndHandler != nil { - eventsReceiverAndHandler.StartProcessing(ctx) + go func() { + logger.Info(ctx, "Starting Artifact service background processing...") + eventsReceiverAndHandler.StartProcessing(ctx) + }() } return &ArtifactService{ diff --git a/flytestdlib/sandbox_utils/processor.go b/flytestdlib/sandbox_utils/processor.go index 4594342627..a452d9c296 100644 --- a/flytestdlib/sandbox_utils/processor.go +++ b/flytestdlib/sandbox_utils/processor.go @@ -6,6 +6,7 @@ import ( "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/golang/protobuf/proto" "sync" + "time" ) var MsgChan chan SandboxMessage @@ -13,7 +14,7 @@ var once sync.Once func init() { once.Do(func() { - MsgChan = make(chan SandboxMessage) + MsgChan = make(chan SandboxMessage, 1000) }) } @@ -29,36 +30,39 @@ type CloudEventsSandboxOnlyPublisher struct { func (z *CloudEventsSandboxOnlyPublisher) Publish(ctx context.Context, topic string, msg proto.Message) error { logger.Debugf(ctx, "Sandbox cloud event publisher, sending to topic [%s]", topic) - - select { - case z.subChan <- SandboxMessage{ + sm := SandboxMessage{ Msg: msg, Topic: topic, - }: + } + select { + case z.subChan <- sm: return nil case <-ctx.Done(): return ctx.Err() - default: - return fmt.Errorf("channel has been closed") } } // PublishRaw will publish a raw byte array as a message with context. func (z *CloudEventsSandboxOnlyPublisher) PublishRaw(ctx context.Context, topic string, msgRaw []byte) error { - select { - case z.subChan <- SandboxMessage{ + logger.Debugf(ctx, "Sandbox cloud event publisher, sending raw to topic [%s]", topic) + sm := SandboxMessage{ Raw: msgRaw, Topic: topic, - }: + } + select { + case z.subChan <- sm: + fmt.Println("HERE222!!!!!!!!!!-send") + logger.Debugf(ctx, "Sandbox publisher sent message to %s", topic) return nil - case <-ctx.Done(): - return ctx.Err() - default: - return fmt.Errorf("channel has been closed, can't publish raw") + case <-time.After(10000 * time.Millisecond): + logger.Errorf(context.Background(), "CloudEventsSandboxOnlyPublisher PublishRaw timed out") + return fmt.Errorf("CloudEventsSandboxOnlyPublisher PublishRaw timed out") } } -var CloudEventsPublisher = &CloudEventsSandboxOnlyPublisher{ - subChan: MsgChan, +func NewCloudEventsPublisher() *CloudEventsSandboxOnlyPublisher { + return &CloudEventsSandboxOnlyPublisher{ + subChan: MsgChan, + } }