Skip to content

Commit

Permalink
Fixing cache overwrite metadata update (#4617)
Browse files Browse the repository at this point in the history
* add metadata message in idl

add metadata in updateartifact request in propeller

change in catalog

add test

Signed-off-by: Yue Shang <[email protected]>

* add tests

Signed-off-by: Yue Shang <[email protected]>

* address comments

Signed-off-by: Yue Shang <[email protected]>

* fixing tests and updating logging

Signed-off-by: Daniel Rammer <[email protected]>

* correctly calling Update for catalog cache overwrites

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Yue Shang <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Co-authored-by: Yue Shang <[email protected]>
  • Loading branch information
hamersaw and ysysys3074 authored Dec 21, 2023
1 parent 5d199a8 commit 2aab954
Show file tree
Hide file tree
Showing 16 changed files with 850 additions and 379 deletions.
12 changes: 12 additions & 0 deletions datacatalog/pkg/manager/impl/artifact_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,16 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal
return nil, err
}

// artifactModel needs to be updated with new SerializedMetadata
serializedMetadata, err := transformers.SerializedMetadata(request.Metadata)
if err != nil {
logger.Errorf(ctx, "Error in transforming Metadata from request %+v, err %v", request.Metadata, err)
m.systemMetrics.transformerErrorCounter.Inc(ctx)
m.systemMetrics.updateFailureCounter.Inc(ctx)
return nil, err
}
artifactModel.SerializedMetadata = serializedMetadata

artifact, err := transformers.FromArtifactModel(artifactModel)
if err != nil {
logger.Errorf(ctx, "Error in transforming update artifact request %+v, err %v", artifactModel, err)
Expand Down Expand Up @@ -369,6 +379,8 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal

// update artifact in DB, also replaces/upserts associated artifact data
artifactModel.ArtifactData = artifactDataModels
logger.Debugf(ctx, "Updating ArtifactModel with %+v", artifactModel)

err = m.repo.ArtifactRepo().Update(ctx, artifactModel)
if err != nil {
if errors.IsDoesNotExistError(err) {
Expand Down
30 changes: 27 additions & 3 deletions datacatalog/pkg/manager/impl/artifact_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"os"
"testing"
"time"
"reflect"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/flyteorg/flyte/datacatalog/pkg/repositories/transformers"
"github.com/flyteorg/flyte/datacatalog/pkg/common"
"github.com/flyteorg/flyte/datacatalog/pkg/errors"
repoErrors "github.com/flyteorg/flyte/datacatalog/pkg/repositories/errors"
Expand Down Expand Up @@ -639,16 +640,24 @@ func TestUpdateArtifact(t *testing.T) {
artifactKey.DatasetVersion == expectedArtifact.Dataset.Version
})).Return(mockArtifactModel, nil)

metaData := &datacatalog.Metadata{
KeyMap: map[string]string{"key2": "value2"},
}
serializedMetadata, err := transformers.SerializedMetadata(metaData)
assert.NoError(t, err)

dcRepo.MockArtifactRepo.On("Update",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(artifact models.Artifact) bool {
return artifact.ArtifactID == expectedArtifact.Id &&
artifact.ArtifactKey.DatasetProject == expectedArtifact.Dataset.Project &&
artifact.ArtifactKey.DatasetDomain == expectedArtifact.Dataset.Domain &&
artifact.ArtifactKey.DatasetName == expectedArtifact.Dataset.Name &&
artifact.ArtifactKey.DatasetVersion == expectedArtifact.Dataset.Version
artifact.ArtifactKey.DatasetVersion == expectedArtifact.Dataset.Version &&
reflect.DeepEqual(artifact.SerializedMetadata, serializedMetadata)
})).Return(nil)


request := &datacatalog.UpdateArtifactRequest{
Dataset: expectedDataset.Id,
QueryHandle: &datacatalog.UpdateArtifactRequest_ArtifactId{
Expand All @@ -664,13 +673,17 @@ func TestUpdateArtifact(t *testing.T) {
Value: getTestStringLiteralWithValue("value3"),
},
},
Metadata: &datacatalog.Metadata{
KeyMap: map[string]string{"key2": "value2"},
},
}

artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
artifactResponse, err := artifactManager.UpdateArtifact(ctx, request)
assert.NoError(t, err)
assert.NotNil(t, artifactResponse)
assert.Equal(t, expectedArtifact.Id, artifactResponse.GetArtifactId())
dcRepo.MockArtifactRepo.AssertExpectations(t)

// check that the datastore has the updated artifactData available
// data1 should contain updated value
Expand Down Expand Up @@ -701,6 +714,12 @@ func TestUpdateArtifact(t *testing.T) {
datastore := createInmemoryDataStore(t, mockScope.NewTestScope())
mockArtifactModel := getExpectedArtifactModel(ctx, t, datastore, expectedArtifact)

metaData := &datacatalog.Metadata{
KeyMap: map[string]string{"key2": "value2"},
}
serializedMetadata, err := transformers.SerializedMetadata(metaData)
assert.NoError(t, err)

dcRepo := newMockDataCatalogRepo()
dcRepo.MockArtifactRepo.On("Update",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
Expand All @@ -709,7 +728,8 @@ func TestUpdateArtifact(t *testing.T) {
artifact.ArtifactKey.DatasetProject == expectedArtifact.Dataset.Project &&
artifact.ArtifactKey.DatasetDomain == expectedArtifact.Dataset.Domain &&
artifact.ArtifactKey.DatasetName == expectedArtifact.Dataset.Name &&
artifact.ArtifactKey.DatasetVersion == expectedArtifact.Dataset.Version
artifact.ArtifactKey.DatasetVersion == expectedArtifact.Dataset.Version &&
reflect.DeepEqual(artifact.SerializedMetadata, serializedMetadata)
})).Return(nil)

dcRepo.MockTagRepo.On("Get", mock.Anything,
Expand Down Expand Up @@ -747,13 +767,17 @@ func TestUpdateArtifact(t *testing.T) {
Value: getTestStringLiteralWithValue("value3"),
},
},
Metadata: &datacatalog.Metadata{
KeyMap: map[string]string{"key2": "value2"},
},
}

artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
artifactResponse, err := artifactManager.UpdateArtifact(ctx, request)
assert.NoError(t, err)
assert.NotNil(t, artifactResponse)
assert.Equal(t, expectedArtifact.Id, artifactResponse.GetArtifactId())
dcRepo.MockArtifactRepo.AssertExpectations(t)

// check that the datastore has the updated artifactData available
// data1 should contain updated value
Expand Down
8 changes: 8 additions & 0 deletions datacatalog/pkg/repositories/transformers/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/datacatalog"
)

func SerializedMetadata(metadata *datacatalog.Metadata) ([]byte, error) {
serializedMetadata, err := marshalMetadata(metadata)
if err != nil {
return []byte{}, err
}
return serializedMetadata, nil
}

func CreateArtifactModel(request *datacatalog.CreateArtifactRequest, artifactData []models.ArtifactData, dataset models.Dataset) (models.Artifact, error) {
datasetID := request.Artifact.Dataset

Expand Down
6 changes: 6 additions & 0 deletions datacatalog/pkg/repositories/transformers/artifact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/datacatalog"
)

func TestSerializedMetadata(t *testing.T) {
serializedMetadata, err := SerializedMetadata(metadata)
assert.IsType(t, []byte{}, serializedMetadata)
assert.NoError(t, err)
}

func getTestArtifactData() []*datacatalog.ArtifactData {
testInteger := &core.Literal{
Value: &core.Literal_Scalar{
Expand Down
Loading

0 comments on commit 2aab954

Please sign in to comment.