Skip to content

Commit

Permalink
update error handling in start.go, make the sandbox publisher create …
Browse files Browse the repository at this point in the history
…a function call, add a ton of debug printing that will be removed later today, update enumer for cloud version, add code to render literal, handle message on the consumer side, add task event handling, workflow to be added later, update the event handler interface to be less verbose, run processing in goroutine, update selects

Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor committed Oct 26, 2023
1 parent e6227ec commit e3df3c4
Show file tree
Hide file tree
Showing 16 changed files with 419 additions and 59 deletions.
7 changes: 6 additions & 1 deletion cmd/single/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,13 @@ func startArtifact(ctx context.Context, cfg Artifacts) error {
g.Go(func() error {
cfg := configuration.GetApplicationConfig()
serverCfg := &cfg.ArtifactServerConfig
return sharedCmd.ServeGateway(childCtx, "artifacts", serverCfg, artifactsServer.GrpcRegistrationHook,
err := sharedCmd.ServeGateway(childCtx, "artifacts", serverCfg, artifactsServer.GrpcRegistrationHook,
artifactsServer.HttpRegistrationHook)
if err != nil {
logger.Errorf(childCtx, "Failed to start Artifacts server. Error: %v", err)
return err
}
return nil
})

return g.Wait()
Expand Down
9 changes: 4 additions & 5 deletions flyte-single-binary-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ cluster_resources:

logger:
show-source: true
level: 6
level: 3

propeller:
create-flyteworkflow-crd: true
Expand Down Expand Up @@ -82,17 +82,14 @@ database:
# options: "sslmode=disable"
cloudEvents:
enable: true
cloudEventVersion: 2
cloudEventVersion: v2
type: sandbox
# For admin to find artifacts service
artifacts:
host: localhost
port: 50051
insecure: true
# For artifact service itself
sharedServer:
port: 50051
httpPort: 50050
artifactsServer:
artifactBlobStoreConfig:
type: stow
Expand All @@ -105,6 +102,8 @@ artifactsServer:
auth_type: accesskey
access_key_id: minio
secret_key: miniostorage
artifactsProcessor:
cloudProvider: Sandbox
storage:
type: stow
stow:
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi

case common.Sandbox:
var publisher pubsub.Publisher
publisher = sandbox_utils.CloudEventsPublisher
publisher = sandbox_utils.NewCloudEventsPublisher()
sender = &cloudEventImplementations.PubSubSender{
Pub: publisher,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,8 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
// Get outputs from the workflow execution
var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
fmt.Printf("remove this - Got output data")
outputs = rawEvent.GetOutputData()
} else if len(rawEvent.GetOutputUri()) > 0 {
fmt.Printf("remove this - Got output URI")
// GetInputs actually fetches the data, even though this is an output
outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig, c.storageClient, rawEvent.GetOutputUri())
if err != nil {
Expand Down Expand Up @@ -282,10 +280,8 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con

var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
fmt.Printf("remove this - task Got output data")
outputs = rawEvent.GetOutputData()
} else if len(rawEvent.GetOutputUri()) > 0 {
fmt.Printf("remove this - task Got output URI")
// GetInputs actually fetches the data, even though this is an output
outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig,
c.storageClient, rawEvent.GetOutputUri())
Expand Down Expand Up @@ -375,6 +371,7 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy
executionID = msgType.ExecutionId.String()
eventID = fmt.Sprintf("%v", executionID)
eventTime = time.Now()
// CloudEventExecutionStart don't have a nested event
finalMsg = msgType
default:
return fmt.Errorf("unsupported event types [%+v]", reflect.TypeOf(msg))
Expand Down
11 changes: 9 additions & 2 deletions flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,18 @@ func (m *TaskExecutionManager) updateTaskExecutionModelState(

func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, request admin.TaskExecutionEventRequest) (
*admin.TaskExecutionEventResponse, error) {

logger.Warningf(ctx, "HERE!!!123")

if err := validation.ValidateTaskExecutionRequest(request, m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes); err != nil {
return nil, err
}
logger.Warningf(ctx, "HERE!!!123-1")

if err := validation.ValidateClusterForExecutionID(ctx, m.db, request.Event.ParentNodeExecutionId.ExecutionId, request.Event.ProducerId); err != nil {
return nil, err
}
logger.Warningf(ctx, "HERE!!!123-2")

// Get the parent node execution, if none found a MissingEntityError will be returned
nodeExecutionID := request.Event.ParentNodeExecutionId
Expand Down Expand Up @@ -204,10 +209,12 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err)
}

logger.Warningf(ctx, "HERE!!!123-3")
go func() {
ceCtx := context.TODO()
ceCtx := context.Background()
logger.Warningf(ctx, "HERE!!!123-4")
if err := m.cloudEventsPublisher.Publish(ceCtx, proto.MessageName(&request), &request); err != nil {
logger.Infof(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err)
logger.Errorf(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err)
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,8 @@ type ExternalEventsConfig struct {
ReconnectDelaySeconds int `json:"reconnectDelaySeconds"`
}

//go:generate enumer -type=CloudEventVersion -trimprefix=CloudEventVersion
type CloudEventVersion int
//go:generate enumer -type=CloudEventVersion -json -yaml -trimprefix=CloudEventVersion
type CloudEventVersion uint8

const (
// This is the initial version of the cloud events
Expand Down
39 changes: 37 additions & 2 deletions flyteadmin/pkg/runtime/interfaces/cloudeventversion_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions flyteartifacts/pkg/lib/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package lib

// ArtifactKey - This is used to tag Literals as a tracking bit.
const ArtifactKey = "_ua"
56 changes: 56 additions & 0 deletions flyteartifacts/pkg/lib/string_converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package lib

import (
"fmt"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"strings"
"time"
)

func RenderLiteral(lit *core.Literal) (string, error) {
if lit == nil {
return "", fmt.Errorf("can't RenderLiteral, input is nil")
}

switch lit.Value.(type) {
case *core.Literal_Scalar:
scalar := lit.GetScalar()
if scalar.GetPrimitive() == nil {
return "", fmt.Errorf("rendering only works for primitives, got [%v]", scalar)
}
// todo: figure out how to expose more formatting
// todo: maybe add a metric to each one of these, or this whole block.
switch scalar.GetPrimitive().GetValue().(type) {
case *core.Primitive_StringValue:
return scalar.GetPrimitive().GetStringValue(), nil
case *core.Primitive_Integer:
return fmt.Sprintf("%d", scalar.GetPrimitive().GetInteger()), nil
case *core.Primitive_FloatValue:
return fmt.Sprintf("%v", scalar.GetPrimitive().GetFloatValue()), nil
case *core.Primitive_Boolean:
if scalar.GetPrimitive().GetBoolean() {
return "true", nil
}
return "false", nil
case *core.Primitive_Datetime:
// just date for now, not sure if we should support time...
dt := scalar.GetPrimitive().GetDatetime().AsTime()
txt := dt.Format("2006-01-02")
return txt, nil
case *core.Primitive_Duration:
dur := scalar.GetPrimitive().GetDuration().AsDuration()
// Found somewhere as iso8601 representation of duration, but there's still lots of
// possibilities for formatting.
txt := "PT" + strings.ToUpper(dur.Truncate(time.Millisecond).String())
return txt, nil
default:
return "", fmt.Errorf("unknown primitive type [%v]", scalar.GetPrimitive())
}
case *core.Literal_Collection:
return "", fmt.Errorf("can't RenderLiteral for collections")
case *core.Literal_Map:
return "", fmt.Errorf("can't RenderLiteral for maps")
}

return "", fmt.Errorf("unknown literal type [%v]", lit)
}
24 changes: 24 additions & 0 deletions flyteartifacts/pkg/lib/string_converter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package lib

import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestRenderDate(t *testing.T) {
dt := time.Date(2020, 12, 8, 0, 0, 0, 0, time.UTC)
pt := timestamp.Timestamp{
Seconds: dt.Unix(),
Nanos: 0,
}
lit := core.Literal{
Value: &core.Literal_Scalar{Scalar: &core.Scalar{Value: &core.Scalar_Primitive{Primitive: &core.Primitive{Value: &core.Primitive_Datetime{Datetime: &pt}}}}},
}

txt, err := RenderLiteral(&lit)
assert.NoError(t, err)
assert.Equal(t, "2020-12-08", txt)
}
70 changes: 62 additions & 8 deletions flyteartifacts/pkg/server/processor/channel_processor.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package processor

import (
"bytes"
"context"
"fmt"
pbcloudevents "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"
"github.com/cloudevents/sdk-go/v2/event"
flyteEvents "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/sandbox_utils"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"time"
)

Expand All @@ -27,31 +32,80 @@ func (p *SandboxCloudEventsReceiver) StartProcessing(ctx context.Context) {
logger.Warning(context.Background(), "Sandbox cloud event processor has stopped because context cancelled")
}

func (p *SandboxCloudEventsReceiver) handleMessage(ctx context.Context, sandboxMsg sandbox_utils.SandboxMessage) error {
ce := &event.Event{}
err := pbcloudevents.Protobuf.Unmarshal(sandboxMsg.Raw, ce)
if err != nil {
logger.Errorf(context.Background(), "error with unmarshalling message [%v]", err)
return err
}
logger.Debugf(ctx, "Cloud event received message [%+v]", ce)
// ce data should be a jsonpb Marshaled proto message, one of
// - event.CloudEventTaskExecution
// - event.CloudEventNodeExecution
// - event.CloudEventWorkflowExecution
// - event.CloudEventExecutionStart
ceData := bytes.NewReader(ce.Data())
unmarshaler := jsonpb.Unmarshaler{}

// Use the type to determine which proto message to unmarshal to.
var flyteEvent proto.Message
if ce.Type() == "com.flyte.resource.cloudevents.TaskExecution" {
flyteEvent = &flyteEvents.CloudEventTaskExecution{}
err = unmarshaler.Unmarshal(ceData, flyteEvent)
} else if ce.Type() == "cloudevents.WorkflowExecution" {
flyteEvent = &flyteEvents.CloudEventWorkflowExecution{}
err = unmarshaler.Unmarshal(ceData, flyteEvent)
} else if ce.Type() == "cloudevents.NodeExecution" {
flyteEvent = &flyteEvents.CloudEventNodeExecution{}
err = unmarshaler.Unmarshal(ceData, flyteEvent)
} else if ce.Type() == "cloudevents.ExecutionStart" {
flyteEvent = &flyteEvents.CloudEventExecutionStart{}
err = unmarshaler.Unmarshal(ceData, flyteEvent)
} else {
logger.Warningf(ctx, "Ignoring cloud event type [%s]", ce.Type())
return nil
}
if err != nil {
logger.Errorf(ctx, "error unmarshalling message on topic [%s] [%v]", sandboxMsg.Topic, err)
return err
}

err = p.Handler.HandleEvent(ctx, ce, flyteEvent)
if err != nil {
logger.Errorf(context.Background(), "error handling event on topic [%s] [%v]", sandboxMsg.Topic, err)
return err
}
return nil
}

func (p *SandboxCloudEventsReceiver) run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
logger.Debug(context.Background(), "Context cancelled, stopping processing.")
logger.Warning(context.Background(), "Context cancelled, stopping processing.")
return nil

case sandboxMsg := <-p.subChan:
fmt.Println("HERE222!!!!!!!!!!-receive")
logger.Debugf(ctx, "received message [%v]", sandboxMsg)
if sandboxMsg.Raw != nil {
ce := &event.Event{}
err := pbcloudevents.Protobuf.Unmarshal(sandboxMsg.Raw, ce)
err := p.handleMessage(ctx, sandboxMsg)
if err != nil {
logger.Errorf(context.Background(), "error with unmarshalling message [%v]", err)
return err
// Assuming that handle message will return a fair number of errors
// add metric
logger.Infof(ctx, "error processing sandbox cloud event [%v] with err [%v]", sandboxMsg, err)
}
logger.Debugf(ctx, "Cloud event received message [%+v]", ce)
fmt.Println("HERE222!!!!!!!!!!-success")
} else {
logger.Infof(ctx, "sandbox receiver ignoring message [%v]", sandboxMsg)
}
logger.Infof(ctx, "sandbox receiver ignoring message [%v]", sandboxMsg)
}
}
}

func (p *SandboxCloudEventsReceiver) StopProcessing() error {
logger.Debug(context.Background(), "call to sandbox stop processing.")
logger.Warning(context.Background(), "StopProcessing called on SandboxCloudEventsReceiver")
return nil
}

Expand Down
Loading

0 comments on commit e3df3c4

Please sign in to comment.