Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[caching] fix issues that execution metadata with overwrite cache is not written to artifacts table #4550

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions datacatalog/pkg/manager/impl/artifact_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,16 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal
return nil, err
}

// artifactModel needs to be updated with new SerializedMetadata
serializedMetadata, err := transformers.SerializedMetadata(request.Metadata)
if err != nil {
logger.Errorf(ctx, "Error in transforming Metadata from request %+v, err %v", request.Metadata, err)
m.systemMetrics.transformerErrorCounter.Inc(ctx)
m.systemMetrics.updateFailureCounter.Inc(ctx)
return nil, err
}
artifactModel.SerializedMetadata = serializedMetadata

artifact, err := transformers.FromArtifactModel(artifactModel)
if err != nil {
logger.Errorf(ctx, "Error in transforming update artifact request %+v, err %v", artifactModel, err)
Expand Down Expand Up @@ -369,6 +379,10 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal

// update artifact in DB, also replaces/upserts associated artifact data
artifactModel.ArtifactData = artifactDataModels
logger.Debugf(ctx, "ArtifactModel is %+v", artifactModel)
logger.Debugf(ctx, "ArtifactModel ArtifactData is %+v", artifactModel.ArtifactData)
logger.Debugf(ctx, "ArtifactModel SerializedMetadata is %+v", artifactModel.SerializedMetadata)
Comment on lines +382 to +384
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're going to leave this, can it be a single log line rather than 3? The first line should print the entire artifactModel, then the next two are printing fields that should already be exposed in the first.


err = m.repo.ArtifactRepo().Update(ctx, artifactModel)
if err != nil {
if errors.IsDoesNotExistError(err) {
Expand Down
28 changes: 25 additions & 3 deletions datacatalog/pkg/manager/impl/artifact_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"os"
"testing"
"time"
"reflect"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/flyteorg/flyte/datacatalog/pkg/repositories/transformers"
"github.com/flyteorg/flyte/datacatalog/pkg/common"
"github.com/flyteorg/flyte/datacatalog/pkg/errors"
repoErrors "github.com/flyteorg/flyte/datacatalog/pkg/repositories/errors"
Expand Down Expand Up @@ -639,16 +640,23 @@ func TestUpdateArtifact(t *testing.T) {
artifactKey.DatasetVersion == expectedArtifact.Dataset.Version
})).Return(mockArtifactModel, nil)

metaData := &datacatalog.Metadata{
KeyMap: map[string]string{"key2": "value2"},
}
serializedMetadata, err := transformers.SerializedMetadata(metaData)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like there is a lint err here because err is not used. maybe add assert.Nil(...) on the err here.


dcRepo.MockArtifactRepo.On("Update",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(artifact models.Artifact) bool {
return artifact.ArtifactID == expectedArtifact.Id &&
artifact.ArtifactKey.DatasetProject == expectedArtifact.Dataset.Project &&
artifact.ArtifactKey.DatasetDomain == expectedArtifact.Dataset.Domain &&
artifact.ArtifactKey.DatasetName == expectedArtifact.Dataset.Name &&
artifact.ArtifactKey.DatasetVersion == expectedArtifact.Dataset.Version
artifact.ArtifactKey.DatasetVersion == expectedArtifact.Dataset.Version &&
reflect.DeepEqual(artifact.SerializedMetadata, serializedMetadata)
})).Return(nil)


request := &datacatalog.UpdateArtifactRequest{
Dataset: expectedDataset.Id,
QueryHandle: &datacatalog.UpdateArtifactRequest_ArtifactId{
Expand All @@ -664,13 +672,17 @@ func TestUpdateArtifact(t *testing.T) {
Value: getTestStringLiteralWithValue("value3"),
},
},
Metadata: &datacatalog.Metadata{
KeyMap: map[string]string{"key2": "value2"},
},
}

artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
artifactResponse, err := artifactManager.UpdateArtifact(ctx, request)
assert.NoError(t, err)
assert.NotNil(t, artifactResponse)
assert.Equal(t, expectedArtifact.Id, artifactResponse.GetArtifactId())
dcRepo.MockArtifactRepo.AssertExpectations(t)

// check that the datastore has the updated artifactData available
// data1 should contain updated value
Expand Down Expand Up @@ -701,6 +713,11 @@ func TestUpdateArtifact(t *testing.T) {
datastore := createInmemoryDataStore(t, mockScope.NewTestScope())
mockArtifactModel := getExpectedArtifactModel(ctx, t, datastore, expectedArtifact)

metaData := &datacatalog.Metadata{
KeyMap: map[string]string{"key2": "value2"},
}
serializedMetadata, err := transformers.SerializedMetadata(metaData)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also assert on error here.


dcRepo := newMockDataCatalogRepo()
dcRepo.MockArtifactRepo.On("Update",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
Expand All @@ -709,7 +726,8 @@ func TestUpdateArtifact(t *testing.T) {
artifact.ArtifactKey.DatasetProject == expectedArtifact.Dataset.Project &&
artifact.ArtifactKey.DatasetDomain == expectedArtifact.Dataset.Domain &&
artifact.ArtifactKey.DatasetName == expectedArtifact.Dataset.Name &&
artifact.ArtifactKey.DatasetVersion == expectedArtifact.Dataset.Version
artifact.ArtifactKey.DatasetVersion == expectedArtifact.Dataset.Version &&
reflect.DeepEqual(artifact.SerializedMetadata, serializedMetadata)
})).Return(nil)

dcRepo.MockTagRepo.On("Get", mock.Anything,
Expand Down Expand Up @@ -747,13 +765,17 @@ func TestUpdateArtifact(t *testing.T) {
Value: getTestStringLiteralWithValue("value3"),
},
},
Metadata: &datacatalog.Metadata{
KeyMap: map[string]string{"key2": "value2"},
},
}

artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
artifactResponse, err := artifactManager.UpdateArtifact(ctx, request)
assert.NoError(t, err)
assert.NotNil(t, artifactResponse)
assert.Equal(t, expectedArtifact.Id, artifactResponse.GetArtifactId())
dcRepo.MockArtifactRepo.AssertExpectations(t)

// check that the datastore has the updated artifactData available
// data1 should contain updated value
Expand Down
8 changes: 8 additions & 0 deletions datacatalog/pkg/repositories/transformers/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/datacatalog"
)

func SerializedMetadata(metadata *datacatalog.Metadata) ([]byte, error) {
serializedMetadata, err := marshalMetadata(metadata)
if err != nil {
return []byte{}, err
}
return serializedMetadata, nil
}

func CreateArtifactModel(request *datacatalog.CreateArtifactRequest, artifactData []models.ArtifactData, dataset models.Dataset) (models.Artifact, error) {
datasetID := request.Artifact.Dataset

Expand Down
6 changes: 6 additions & 0 deletions datacatalog/pkg/repositories/transformers/artifact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/datacatalog"
)

func TestSerializedMetadata(t *testing.T) {
serializedMetadata, err := SerializedMetadata(metadata)
assert.IsType(t, []byte{}, serializedMetadata)
assert.NoError(t, err)
}

func getTestArtifactData() []*datacatalog.ArtifactData {
testInteger := &core.Literal{
Value: &core.Literal_Scalar{
Expand Down
Loading
Loading