From a739fa484ee9b0a0a4f358108f566da06f29e74e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nick=20M=C3=BCller?= Date: Fri, 4 Nov 2022 12:21:50 +0100 Subject: [PATCH] datacatalog client now handles NotFound errors gracefully while overwriting artifact MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nick Müller --- .../task/catalog/datacatalog/datacatalog.go | 16 ++- .../catalog/datacatalog/datacatalog_test.go | 97 ++++++++++++++++++- 2 files changed, 109 insertions(+), 4 deletions(-) diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go index c26dd2a06..b08649dae 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go @@ -326,7 +326,21 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp // Overwrite existing artifact if overwrite { - return m.UpdateArtifact(ctx, key, datasetID, inputs, outputs, metadata) + catalogStatus, err := m.UpdateArtifact(ctx, key, datasetID, inputs, outputs, metadata) + if err != nil { + if status.Code(err) == codes.NotFound { + // No existing artifact found (e.g. initial execution of task with overwrite flag already set), + // silently ignore error and create artifact instead to make overwriting an idempotent operation. + logger.Debugf(ctx, "Artifact %+v for dataset %+v does not exist while updating, creating instead", key, datasetID) + return m.CreateArtifact(ctx, key, datasetID, inputs, outputs, metadata) + } + + logger.Errorf(ctx, "Failed to update artifact %+v for dataset %+v: %v", key, datasetID, err) + return catalog.Status{}, err + } + + logger.Debugf(ctx, "Successfully updated artifact %+v for dataset %+v", key, datasetID) + return catalogStatus, nil } // Artifact does not exist yet, create new one diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go index e4b628cf7..dc35b08e9 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go @@ -618,6 +618,97 @@ func TestCatalog_Put(t *testing.T) { client: mockClient, } + createDatasetCalled := false + mockClient.On("CreateDataset", + ctx, + mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool { + assert.True(t, proto.Equal(o.Dataset.Id, datasetID)) + createDatasetCalled = true + return true + }), + ).Return(&datacatalog.CreateDatasetResponse{}, nil) + + updateArtifactCalled := false + mockClient.On("UpdateArtifact", ctx, mock.Anything).Run(func(args mock.Arguments) { + updateArtifactCalled = true + }).Return(nil, status.New(codes.NotFound, "missing entity of type Artifact with identifier id").Err()) + + createArtifactCalled := false + mockClient.On("CreateArtifact", + ctx, + mock.MatchedBy(func(o *datacatalog.CreateArtifactRequest) bool { + _, parseErr := uuid.Parse(o.Artifact.Id) + assert.NoError(t, parseErr) + assert.True(t, proto.Equal(o.Artifact.Dataset, datasetID)) + createArtifactCalled = true + return true + }), + ).Return(&datacatalog.CreateArtifactResponse{}, nil) + + addTagCalled := false + mockClient.On("AddTag", + ctx, + mock.MatchedBy(func(o *datacatalog.AddTagRequest) bool { + assert.EqualValues(t, "flyte_cached-BE6CZsMk6N3ExR_4X9EuwBgj2Jh2UwasXK3a_pM9xlY", o.Tag.Name) + addTagCalled = true + return true + }), + ).Return(&datacatalog.AddTagResponse{}, nil) + + taskID := &core.TaskExecutionIdentifier{ + TaskId: &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Name: sampleKey.Identifier.Name, + Project: sampleKey.Identifier.Project, + Domain: sampleKey.Identifier.Domain, + Version: "version", + }, + NodeExecutionId: &core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{ + Name: "wf", + Project: "p1", + Domain: "d1", + }, + NodeId: "unknown", // not set in Put request below --> defaults to "unknown" + }, + RetryAttempt: 0, + } + + newKey := sampleKey + newKey.InputReader = ir + or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil) + s, err := discovery.Put(ctx, newKey, or, catalog.Metadata{ + WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ + Name: taskID.NodeExecutionId.ExecutionId.Name, + Domain: taskID.NodeExecutionId.ExecutionId.Domain, + Project: taskID.NodeExecutionId.ExecutionId.Project, + }, + TaskExecutionIdentifier: &core.TaskExecutionIdentifier{ + TaskId: &sampleKey.Identifier, + NodeExecutionId: taskID.NodeExecutionId, + RetryAttempt: 0, + }, + }, true) + assert.NoError(t, err) + assert.Equal(t, core.CatalogCacheStatus_CACHE_POPULATED, s.GetCacheStatus()) + assert.NotNil(t, s.GetMetadata()) + assert.Equal(t, tagName, s.GetMetadata().ArtifactTag.Name) + assert.Nil(t, s.GetMetadata().GetSourceTaskExecution()) + assert.True(t, createDatasetCalled) + assert.True(t, updateArtifactCalled) + assert.True(t, createArtifactCalled) + assert.True(t, addTagCalled) + }) + + t.Run("Error while overwriting execution", func(t *testing.T) { + ir := &mocks2.InputReader{} + ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil) + + mockClient := &mocks.DataCatalogClient{} + discovery := &CatalogClient{ + client: mockClient, + } + mockClient.On("CreateDataset", ctx, mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool { @@ -626,8 +717,8 @@ func TestCatalog_Put(t *testing.T) { }), ).Return(&datacatalog.CreateDatasetResponse{}, nil) - notFound := errors.New("not found") - mockClient.On("UpdateArtifact", ctx, mock.Anything).Return(nil, notFound) + genericErr := errors.New("generic error") + mockClient.On("UpdateArtifact", ctx, mock.Anything).Return(nil, genericErr) newKey := sampleKey newKey.InputReader = ir @@ -639,7 +730,7 @@ func TestCatalog_Put(t *testing.T) { TaskExecutionIdentifier: nil, }, true) assert.Error(t, err) - assert.Equal(t, notFound, err) + assert.Equal(t, genericErr, err) assert.Equal(t, core.CatalogCacheStatus_CACHE_DISABLED, s.GetCacheStatus()) assert.Nil(t, s.GetMetadata()) })