Skip to content

Commit

Permalink
feat(startup): update logic for metadb update on startup, skip unmodi…
Browse files Browse the repository at this point in the history
…fied repos

- MetaDB stores the time of the last update of a repo
- During startup we check if the layout has been updated after the last recorded change in the db
- If this is the case, the repo is parsed and updated in the DB otherwise it's skipped

Signed-off-by: Laurentiu Niculae <[email protected]>
  • Loading branch information
laurentiuNiculae committed Nov 15, 2023
1 parent dd079bf commit 98c2b47
Show file tree
Hide file tree
Showing 18 changed files with 748 additions and 146 deletions.
139 changes: 135 additions & 4 deletions pkg/meta/boltdb/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ func New(boltDB *bbolt.DB, log log.Logger) (*BoltDB, error) {
return err
}

_, err = transaction.CreateBucketIfNotExists([]byte(RepoBlobsBuck))
repoBlobsBuck, err := transaction.CreateBucketIfNotExists([]byte(RepoBlobsBuck))
if err != nil {
return err
}

_, err = repoBlobsBuck.CreateBucketIfNotExists([]byte(RepoLastUpdatedBuck))
if err != nil {
return err
}
Expand All @@ -84,6 +89,53 @@ func New(boltDB *bbolt.DB, log log.Logger) (*BoltDB, error) {
}, nil
}

func (bdw *BoltDB) GetAllRepoNames() ([]string, error) {
repoNames := []string{}

err := bdw.DB.View(func(tx *bbolt.Tx) error {
repoMetaBuck := tx.Bucket([]byte(RepoMetaBuck))

return repoMetaBuck.ForEach(func(repo, _ []byte) error {
repoNames = append(repoNames, string(repo))

return nil
})
})

return repoNames, err
}

func (bdw *BoltDB) GetRepoLastUpdated(repo string) time.Time {
lastUpdated := time.Time{}

err := bdw.DB.View(func(tx *bbolt.Tx) error {
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))

lastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))

lastUpdatedBlob := lastUpdatedBuck.Get([]byte(repo))
if len(lastUpdatedBlob) == 0 {
return zerr.ErrRepoMetaNotFound
}

protoTime := &timestamppb.Timestamp{}

err := proto.Unmarshal(lastUpdatedBlob, protoTime)
if err != nil {
return err
}

lastUpdated = *mConvert.GetTime(protoTime)

return nil
})
if err != nil {
return time.Time{}
}

return lastUpdated
}

func (bdw *BoltDB) SetImageMeta(digest godigest.Digest, imageMeta mTypes.ImageMeta) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
buck := tx.Bucket([]byte(ImageMetaBuck))
Expand Down Expand Up @@ -132,6 +184,7 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference
err = bdw.DB.Update(func(tx *bbolt.Tx) error {
repoBuck := tx.Bucket([]byte(RepoMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))
imageBuck := tx.Bucket([]byte(ImageMetaBuck))

// 1. Add image data to db if needed
Expand Down Expand Up @@ -226,6 +279,11 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference

protoRepoMeta, repoBlobs = common.AddImageMetaToRepoMeta(protoRepoMeta, repoBlobs, reference, imageMeta)

err = setRepoLastUpdated(repo, time.Now(), repoLastUpdatedBuck)
if err != nil {
return err
}

err = setProtoRepoBlobs(repoBlobs, repoBlobsBuck)
if err != nil {
return err
Expand All @@ -237,6 +295,17 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference
return err
}

func setRepoLastUpdated(repo string, lastUpdated time.Time, repoLastUpdatedBuck *bbolt.Bucket) error {
protoTime := timestamppb.New(lastUpdated)

protoTimeBlob, err := proto.Marshal(protoTime)
if err != nil {
return err
}

return repoLastUpdatedBuck.Put([]byte(repo), protoTimeBlob)
}

func unmarshalProtoRepoBlobs(repo string, repoBlobsBytes []byte) (*proto_go.RepoBlobs, error) {
repoBlobs := &proto_go.RepoBlobs{
Name: repo,
Expand Down Expand Up @@ -1010,6 +1079,7 @@ func (bdw *BoltDB) DecrementRepoStars(repo string) error {
func (bdw *BoltDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
buck := tx.Bucket([]byte(RepoMetaBuck))
repoLastUpdatedBuck := tx.Bucket([]byte(RepoBlobsBuck)).Bucket([]byte(RepoLastUpdatedBuck))

repoMeta.Name = repo

Expand All @@ -1018,7 +1088,35 @@ func (bdw *BoltDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error {
return err
}

return buck.Put([]byte(repo), repoMetaBlob)
err = buck.Put([]byte(repo), repoMetaBlob)
if err != nil {
return err
}

// The last update time is set to 0 in order to force an update in case of a next storage parsing
return setRepoLastUpdated(repo, time.Time{}, repoLastUpdatedBuck)
})

return err
}

func (bdw *BoltDB) DeleteRepoMeta(repo string) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
repoBuck := tx.Bucket([]byte(RepoMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))

err := repoBuck.Delete([]byte(repo))
if err != nil {
return err
}

err = repoBlobsBuck.Delete([]byte(repo))
if err != nil {
return err
}

return repoLastUpdatedBuck.Delete([]byte(repo))
})

return err
Expand Down Expand Up @@ -1212,6 +1310,7 @@ func (bdw *BoltDB) RemoveRepoReference(repo, reference string, manifestDigest go
repoMetaBuck := tx.Bucket([]byte(RepoMetaBuck))
imageMetaBuck := tx.Bucket([]byte(ImageMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))

protoRepoMeta, err := getProtoRepoMeta(repo, repoMetaBuck)
if err != nil {
Expand Down Expand Up @@ -1292,6 +1391,11 @@ func (bdw *BoltDB) RemoveRepoReference(repo, reference string, manifestDigest go
return err
}

err = setRepoLastUpdated(repo, time.Now(), repoLastUpdatedBuck)
if err != nil {
return err
}

protoRepoMeta, repoBlobs = common.RemoveImageFromRepoMeta(protoRepoMeta, repoBlobs, reference)

repoBlobsBytes, err = proto.Marshal(repoBlobs)
Expand Down Expand Up @@ -1934,12 +2038,39 @@ func (bdw *BoltDB) ResetDB() error {
}

func resetBucket(transaction *bbolt.Tx, bucketName string) error {
err := transaction.DeleteBucket([]byte(bucketName))
bucket := transaction.Bucket([]byte(bucketName))
if bucket == nil {
return nil
}

// we need to create the sub buckets if they exits, we'll presume the sub-buckets are not nested more than 1 layer
subBuckets := [][]byte{}

err := bucket.ForEachBucket(func(bucketName []byte) error {
subBuckets = append(subBuckets, bucketName)

return nil
})
if err != nil {
return err
}

_, err = transaction.CreateBucketIfNotExists([]byte(bucketName))
err = transaction.DeleteBucket([]byte(bucketName))
if err != nil {
return err
}

bucket, err = transaction.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return err
}

for _, subBucket := range subBuckets {
_, err := bucket.CreateBucketIfNotExists(subBucket)
if err != nil {
return err
}
}

return err
}
67 changes: 15 additions & 52 deletions pkg/meta/boltdb/boltdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,58 +63,6 @@ func TestWrapperErrors(t *testing.T) {

ctx := userAc.DeriveContext(context.Background())

Convey("ResetDB", func() {
Convey("RepoMetaBuck", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.RepoMetaBuck))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})

Convey("ImageMetaBuck", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.ImageMetaBuck))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})

Convey("RepoBlobsBuck", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.RepoBlobsBuck))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})

Convey("UserAPIKeysBucket", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.UserAPIKeysBucket))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})

Convey("UserDataBucket", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.UserDataBucket))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})
})

Convey("RemoveRepoReference", func() {
Convey("getProtoRepoMeta errors", func() {
err := setRepoMeta("repo", badProtoBlob, boltdbWrapper.DB)
Expand Down Expand Up @@ -192,6 +140,21 @@ func TestWrapperErrors(t *testing.T) {
})
})

Convey("GetRepoLastUpdated", func() {
Convey("bad blob in db", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
repoBlobsBuck := tx.Bucket([]byte(boltdb.RepoBlobsBuck))
lastUpdatedBuck := repoBlobsBuck.Bucket([]byte(boltdb.RepoLastUpdatedBuck))

return lastUpdatedBuck.Put([]byte("repo"), []byte("bad-blob"))
})
So(err, ShouldBeNil)

lastUpdated := boltdbWrapper.GetRepoLastUpdated("repo")
So(lastUpdated, ShouldEqual, time.Time{})
})
})

Convey("UpdateStatsOnDownload", func() {
Convey("repo meta not found", func() {
err = boltdbWrapper.UpdateStatsOnDownload("repo", "ref")
Expand Down
6 changes: 5 additions & 1 deletion pkg/meta/boltdb/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ package boltdb
const (
ImageMetaBuck = "ImageMeta"
RepoMetaBuck = "RepoMeta"
RepoBlobsBuck = "RepoBlobsMeta"
UserDataBucket = "UserData"
VersionBucket = "Version"
UserAPIKeysBucket = "UserAPIKeys"
)

const (
RepoBlobsBuck = "RepoBlobsMeta"
RepoLastUpdatedBuck = "RepoLastUpdated" // Sub-bucket
)
Loading

0 comments on commit 98c2b47

Please sign in to comment.