Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
datacatalog client now handles NotFound errors gracefully while overw…
Browse files Browse the repository at this point in the history
…riting artifact

Signed-off-by: Nick Müller <[email protected]>
  • Loading branch information
Nick Müller committed Nov 4, 2022
1 parent 78f8710 commit a739fa4
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 4 deletions.
16 changes: 15 additions & 1 deletion pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,21 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp

// Overwrite existing artifact
if overwrite {
return m.UpdateArtifact(ctx, key, datasetID, inputs, outputs, metadata)
catalogStatus, err := m.UpdateArtifact(ctx, key, datasetID, inputs, outputs, metadata)
if err != nil {
if status.Code(err) == codes.NotFound {
// No existing artifact found (e.g. initial execution of task with overwrite flag already set),
// silently ignore error and create artifact instead to make overwriting an idempotent operation.
logger.Debugf(ctx, "Artifact %+v for dataset %+v does not exist while updating, creating instead", key, datasetID)
return m.CreateArtifact(ctx, key, datasetID, inputs, outputs, metadata)
}

logger.Errorf(ctx, "Failed to update artifact %+v for dataset %+v: %v", key, datasetID, err)
return catalog.Status{}, err
}

logger.Debugf(ctx, "Successfully updated artifact %+v for dataset %+v", key, datasetID)
return catalogStatus, nil
}

// Artifact does not exist yet, create new one
Expand Down
97 changes: 94 additions & 3 deletions pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,97 @@ func TestCatalog_Put(t *testing.T) {
client: mockClient,
}

createDatasetCalled := false
mockClient.On("CreateDataset",
ctx,
mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool {
assert.True(t, proto.Equal(o.Dataset.Id, datasetID))
createDatasetCalled = true
return true
}),
).Return(&datacatalog.CreateDatasetResponse{}, nil)

updateArtifactCalled := false
mockClient.On("UpdateArtifact", ctx, mock.Anything).Run(func(args mock.Arguments) {
updateArtifactCalled = true
}).Return(nil, status.New(codes.NotFound, "missing entity of type Artifact with identifier id").Err())

createArtifactCalled := false
mockClient.On("CreateArtifact",
ctx,
mock.MatchedBy(func(o *datacatalog.CreateArtifactRequest) bool {
_, parseErr := uuid.Parse(o.Artifact.Id)
assert.NoError(t, parseErr)
assert.True(t, proto.Equal(o.Artifact.Dataset, datasetID))
createArtifactCalled = true
return true
}),
).Return(&datacatalog.CreateArtifactResponse{}, nil)

addTagCalled := false
mockClient.On("AddTag",
ctx,
mock.MatchedBy(func(o *datacatalog.AddTagRequest) bool {
assert.EqualValues(t, "flyte_cached-BE6CZsMk6N3ExR_4X9EuwBgj2Jh2UwasXK3a_pM9xlY", o.Tag.Name)
addTagCalled = true
return true
}),
).Return(&datacatalog.AddTagResponse{}, nil)

taskID := &core.TaskExecutionIdentifier{
TaskId: &core.Identifier{
ResourceType: core.ResourceType_TASK,
Name: sampleKey.Identifier.Name,
Project: sampleKey.Identifier.Project,
Domain: sampleKey.Identifier.Domain,
Version: "version",
},
NodeExecutionId: &core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Name: "wf",
Project: "p1",
Domain: "d1",
},
NodeId: "unknown", // not set in Put request below --> defaults to "unknown"
},
RetryAttempt: 0,
}

newKey := sampleKey
newKey.InputReader = ir
or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil)
s, err := discovery.Put(ctx, newKey, or, catalog.Metadata{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Name: taskID.NodeExecutionId.ExecutionId.Name,
Domain: taskID.NodeExecutionId.ExecutionId.Domain,
Project: taskID.NodeExecutionId.ExecutionId.Project,
},
TaskExecutionIdentifier: &core.TaskExecutionIdentifier{
TaskId: &sampleKey.Identifier,
NodeExecutionId: taskID.NodeExecutionId,
RetryAttempt: 0,
},
}, true)
assert.NoError(t, err)
assert.Equal(t, core.CatalogCacheStatus_CACHE_POPULATED, s.GetCacheStatus())
assert.NotNil(t, s.GetMetadata())
assert.Equal(t, tagName, s.GetMetadata().ArtifactTag.Name)
assert.Nil(t, s.GetMetadata().GetSourceTaskExecution())
assert.True(t, createDatasetCalled)
assert.True(t, updateArtifactCalled)
assert.True(t, createArtifactCalled)
assert.True(t, addTagCalled)
})

t.Run("Error while overwriting execution", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil)

mockClient := &mocks.DataCatalogClient{}
discovery := &CatalogClient{
client: mockClient,
}

mockClient.On("CreateDataset",
ctx,
mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool {
Expand All @@ -626,8 +717,8 @@ func TestCatalog_Put(t *testing.T) {
}),
).Return(&datacatalog.CreateDatasetResponse{}, nil)

notFound := errors.New("not found")
mockClient.On("UpdateArtifact", ctx, mock.Anything).Return(nil, notFound)
genericErr := errors.New("generic error")
mockClient.On("UpdateArtifact", ctx, mock.Anything).Return(nil, genericErr)

newKey := sampleKey
newKey.InputReader = ir
Expand All @@ -639,7 +730,7 @@ func TestCatalog_Put(t *testing.T) {
TaskExecutionIdentifier: nil,
}, true)
assert.Error(t, err)
assert.Equal(t, notFound, err)
assert.Equal(t, genericErr, err)
assert.Equal(t, core.CatalogCacheStatus_CACHE_DISABLED, s.GetCacheStatus())
assert.Nil(t, s.GetMetadata())
})
Expand Down

0 comments on commit a739fa4

Please sign in to comment.