From 3a1eb66a8c3d84cbb35433d2881cca7ce77a789a Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 17 Aug 2023 13:18:33 -0700 Subject: [PATCH 1/6] Create a FileOutput reader if the agent returns no output Signed-off-by: Kevin Su --- go/tasks/plugins/webapi/agent/plugin.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index 128753b74..dc81bed22 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -5,6 +5,8 @@ import ( "crypto/x509" "encoding/gob" "fmt" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" + "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flytestdlib/config" @@ -169,11 +171,17 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase case admin.State_RETRYABLE_FAILURE: return core.PhaseInfoRetryableFailure(pluginErrors.TaskFailedWithError, "failed to run the job", taskInfo), nil case admin.State_SUCCEEDED: + var opReader io.OutputReader if resource.Outputs != nil { - err := taskCtx.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(resource.Outputs, nil, nil)) - if err != nil { - return core.PhaseInfoUndefined, err - } + logger.Infof(ctx, "Agent returned an output") + opReader = ioutils.NewInMemoryOutputReader(resource.Outputs, nil, nil) + } else { + logger.Infof(ctx, "Agent didn't return any output, assuming file based outputs.") + opReader = ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), taskCtx.MaxDatasetSizeBytes()) + } + err := taskCtx.OutputWriter().Put(ctx, opReader) + if err != nil { + return core.PhaseInfoUndefined, err } return core.PhaseInfoSuccess(taskInfo), nil } From c63bd8be665ba45ad1eb8c2168fc111dc0d8b154 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 18 Aug 2023 23:29:09 -0700 Subject: [PATCH 2/6] Mark webapi task failure as retry limit exceeded Signed-off-by: Kevin Su --- .../pluginmachinery/internal/webapi/cache.go | 26 +++++++++++----- .../internal/webapi/cache_test.go | 30 +++++++++++++++++++ .../internal/webapi/monitor.go | 12 ++++++-- .../pluginmachinery/internal/webapi/state.go | 3 ++ 4 files changed, 61 insertions(+), 10 deletions(-) diff --git a/go/tasks/pluginmachinery/internal/webapi/cache.go b/go/tasks/pluginmachinery/internal/webapi/cache.go index 856e2104b..6241157fd 100644 --- a/go/tasks/pluginmachinery/internal/webapi/cache.go +++ b/go/tasks/pluginmachinery/internal/webapi/cache.go @@ -78,31 +78,41 @@ func (q *ResourceCache) SyncResource(ctx context.Context, batch cache.Batch) ( logger.Debugf(ctx, "Sync loop - processing resource with cache key [%s]", resource.GetID()) - if cacheItem.SyncFailureCount > q.cfg.MaxSystemFailures { - logger.Infof(ctx, "Sync loop - Item with key [%v] has failed to sync [%v] time(s). More than the allowed [%v] time(s). Marking as failure.", - cacheItem.SyncFailureCount, q.cfg.MaxSystemFailures) - cacheItem.State.Phase = PhaseSystemFailure - } - if cacheItem.State.Phase.IsTerminal() { - logger.Debugf(ctx, "Sync loop - resource cache key [%v] in terminal state [%s]", + logger.Infof(ctx, "Sync loop - resource cache key [%v] in terminal state [%s]", resource.GetID()) + logger.Infof(ctx, "phase [%v]", cacheItem.Phase) + logger.Infof(ctx, "phase [%v]", resource.GetItem().(CacheItem).Phase) resp = append(resp, cache.ItemSyncResponse{ ID: resource.GetID(), - Item: resource.GetItem(), + Item: cacheItem, Action: cache.Unchanged, }) continue } + if cacheItem.SyncFailureCount > q.cfg.MaxSystemFailures { + logger.Debugf(ctx, "Sync loop - Item with key [%v] has failed to sync [%v] time(s). More than the allowed [%v] time(s). Marking as failure.", + cacheItem.SyncFailureCount, q.cfg.MaxSystemFailures) + cacheItem.State.Phase = PhaseSystemFailure + resp = append(resp, cache.ItemSyncResponse{ + ID: resource.GetID(), + Item: cacheItem, + Action: cache.Update, + }) + + continue + } + // Get an updated status logger.Debugf(ctx, "Querying AsyncPlugin for %s", resource.GetID()) newResource, err := q.client.Get(ctx, newPluginContext(cacheItem.ResourceMeta, cacheItem.Resource, "", nil)) if err != nil { logger.Infof(ctx, "Error retrieving resource [%s]. Error: %v", resource.GetID(), err) cacheItem.SyncFailureCount++ + cacheItem.ErrorMessage = err.Error() // Make sure we don't return nil for the first argument, because that deletes it from the cache. resp = append(resp, cache.ItemSyncResponse{ diff --git a/go/tasks/pluginmachinery/internal/webapi/cache_test.go b/go/tasks/pluginmachinery/internal/webapi/cache_test.go index 49506b530..7d664baa7 100644 --- a/go/tasks/pluginmachinery/internal/webapi/cache_test.go +++ b/go/tasks/pluginmachinery/internal/webapi/cache_test.go @@ -67,6 +67,36 @@ func TestResourceCache_SyncResource(t *testing.T) { assert.Equal(t, cacheItem, newCacheItem[0].Item) }) + t.Run("Retry limit exceeded", func(t *testing.T) { + mockCache := &cacheMocks.AutoRefresh{} + mockClient := &mocks.Client{} + + q := ResourceCache{ + AutoRefresh: mockCache, + client: mockClient, + cfg: webapi.CachingConfig{ + MaxSystemFailures: 2, + }, + } + + cacheItem := CacheItem{ + State: State{ + SyncFailureCount: 5, + ErrorMessage: "some error", + }, + } + + iw := &cacheMocks.ItemWrapper{} + iw.OnGetItem().Return(cacheItem) + iw.OnGetID().Return("some-id") + + newCacheItem, err := q.SyncResource(ctx, []cache.ItemWrapper{iw}) + assert.NoError(t, err) + assert.Equal(t, cache.Update, newCacheItem[0].Action) + cacheItem.State.Phase = PhaseSystemFailure + assert.Equal(t, cacheItem, newCacheItem[0].Item) + }) + t.Run("move to success", func(t *testing.T) { mockCache := &cacheMocks.AutoRefresh{} mockClient := &mocks.Client{} diff --git a/go/tasks/pluginmachinery/internal/webapi/monitor.go b/go/tasks/pluginmachinery/internal/webapi/monitor.go index 6d3d4ac33..2dd03c11e 100644 --- a/go/tasks/pluginmachinery/internal/webapi/monitor.go +++ b/go/tasks/pluginmachinery/internal/webapi/monitor.go @@ -29,8 +29,16 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach errors.CacheFailed, "Failed to cast [%v]", cacheItem) } - // If the cache has not syncd yet, just return + // If the cache has not synced yet, just return if cacheItem.Resource == nil { + if cacheItem.Phase.IsTerminal() { + err = cache.DeleteDelayed(cacheItemID) + if err != nil { + logger.Errorf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v", + cacheItemID, err) + } + return state, core.PhaseInfoFailure(errors.CacheFailed, cacheItem.ErrorMessage, nil), nil + } return state, core.PhaseInfoRunning(0, nil), nil } @@ -54,7 +62,7 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach // Queue item for deletion in the cache. err = cache.DeleteDelayed(cacheItemID) if err != nil { - logger.Warnf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v", + logger.Errorf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v", cacheItemID, err) } } diff --git a/go/tasks/pluginmachinery/internal/webapi/state.go b/go/tasks/pluginmachinery/internal/webapi/state.go index 130c1affc..efce83b30 100644 --- a/go/tasks/pluginmachinery/internal/webapi/state.go +++ b/go/tasks/pluginmachinery/internal/webapi/state.go @@ -56,4 +56,7 @@ type State struct { // The time the execution first requests for an allocation token AllocationTokenRequestStartTime time.Time `json:"allocationTokenRequestStartTime,omitempty"` + + // ErrorMessage generated during cache synchronization. + ErrorMessage string `json:"error_message,omitempty"` } From 33c51d53f1aecdbbf7bfdc5b17cb1ae491d79092 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 18 Aug 2023 23:46:09 -0700 Subject: [PATCH 3/6] nit Signed-off-by: Kevin Su --- go/tasks/pluginmachinery/internal/webapi/cache.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/go/tasks/pluginmachinery/internal/webapi/cache.go b/go/tasks/pluginmachinery/internal/webapi/cache.go index 6241157fd..f3d1df1bf 100644 --- a/go/tasks/pluginmachinery/internal/webapi/cache.go +++ b/go/tasks/pluginmachinery/internal/webapi/cache.go @@ -79,11 +79,8 @@ func (q *ResourceCache) SyncResource(ctx context.Context, batch cache.Batch) ( resource.GetID()) if cacheItem.State.Phase.IsTerminal() { - logger.Infof(ctx, "Sync loop - resource cache key [%v] in terminal state [%s]", + logger.Debugf(ctx, "Sync loop - resource cache key [%v] in terminal state [%s]", resource.GetID()) - - logger.Infof(ctx, "phase [%v]", cacheItem.Phase) - logger.Infof(ctx, "phase [%v]", resource.GetItem().(CacheItem).Phase) resp = append(resp, cache.ItemSyncResponse{ ID: resource.GetID(), Item: cacheItem, From 4306482312eb4f2d1a7230be2636fb0b5633151b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 19 Aug 2023 00:42:57 -0700 Subject: [PATCH 4/6] diff Signed-off-by: Kevin Su --- go/tasks/plugins/webapi/agent/plugin.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index dc81bed22..6e926b2ef 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -5,11 +5,8 @@ import ( "crypto/x509" "encoding/gob" "fmt" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" - "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" - "github.com/flyteorg/flytestdlib/config" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -21,8 +18,11 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/template" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/webapi" + "github.com/flyteorg/flytestdlib/config" + "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" "google.golang.org/grpc" ) From 4b2da50f404da6aaab868eed494ccc3c0653fec2 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 19 Aug 2023 00:44:12 -0700 Subject: [PATCH 5/6] lint Signed-off-by: Kevin Su --- go/tasks/plugins/webapi/agent/plugin.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index 6e926b2ef..c2fca93db 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -6,12 +6,11 @@ import ( "encoding/gob" "fmt" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/grpclog" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" flyteIdl "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" pluginErrors "github.com/flyteorg/flyteplugins/go/tasks/errors" From cd67d9417d13adf29b85d8cad126d6037ba23171 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 12 Sep 2023 15:16:53 -0700 Subject: [PATCH 6/6] revert plugin.go Signed-off-by: Kevin Su --- go/tasks/plugins/webapi/agent/plugin.go | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index c2fca93db..128753b74 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -6,22 +6,21 @@ import ( "encoding/gob" "fmt" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flytestdlib/config" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/grpclog" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" flyteIdl "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" pluginErrors "github.com/flyteorg/flyteplugins/go/tasks/errors" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/template" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/webapi" - "github.com/flyteorg/flytestdlib/config" - "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" "google.golang.org/grpc" ) @@ -170,17 +169,11 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase case admin.State_RETRYABLE_FAILURE: return core.PhaseInfoRetryableFailure(pluginErrors.TaskFailedWithError, "failed to run the job", taskInfo), nil case admin.State_SUCCEEDED: - var opReader io.OutputReader if resource.Outputs != nil { - logger.Infof(ctx, "Agent returned an output") - opReader = ioutils.NewInMemoryOutputReader(resource.Outputs, nil, nil) - } else { - logger.Infof(ctx, "Agent didn't return any output, assuming file based outputs.") - opReader = ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), taskCtx.MaxDatasetSizeBytes()) - } - err := taskCtx.OutputWriter().Put(ctx, opReader) - if err != nil { - return core.PhaseInfoUndefined, err + err := taskCtx.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(resource.Outputs, nil, nil)) + if err != nil { + return core.PhaseInfoUndefined, err + } } return core.PhaseInfoSuccess(taskInfo), nil }