Skip to content

Commit

Permalink
feat(startup): update logic for metadb update on startup, skip unmodi…
Browse files Browse the repository at this point in the history
…fied repos

- MetaDB stores the time of the last update of a repo
- During startup we check if the layout has been updated after the last recorded change in the db
- If this is the case, the repo is parsed and updated in the DB otherwise it's skipped

Signed-off-by: Laurentiu Niculae <[email protected]>
  • Loading branch information
laurentiuNiculae committed Nov 16, 2023
1 parent 8dd06c6 commit 19770a8
Show file tree
Hide file tree
Showing 21 changed files with 762 additions and 158 deletions.
8 changes: 4 additions & 4 deletions pkg/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,22 +264,22 @@ func TestCreateMetaDBDriver(t *testing.T) {

const perms = 0o600

boltDB, err := bbolt.Open(path.Join(dir, "repo.db"), perms, &bbolt.Options{Timeout: time.Second * 10})
boltDB, err := bbolt.Open(path.Join(dir, "meta.db"), perms, &bbolt.Options{Timeout: time.Second * 10})
So(err, ShouldBeNil)

err = boltDB.Close()
So(err, ShouldBeNil)

err = os.Chmod(path.Join(dir, "repo.db"), 0o200)
err = os.Chmod(path.Join(dir, "meta.db"), 0o200)
So(err, ShouldBeNil)

_, err = meta.New(conf.Storage.StorageConfig, log)
So(err, ShouldNotBeNil)

err = os.Chmod(path.Join(dir, "repo.db"), 0o600)
err = os.Chmod(path.Join(dir, "meta.db"), 0o600)
So(err, ShouldBeNil)

defer os.Remove(path.Join(dir, "repo.db"))
defer os.Remove(path.Join(dir, "meta.db"))
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/extensions/search/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import (

const (
graphqlQueryPrefix = constants.FullSearchPrefix
DBFileName = "repo.db"
DBFileName = "meta.db"
)

var (
Expand Down
139 changes: 135 additions & 4 deletions pkg/meta/boltdb/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ func New(boltDB *bbolt.DB, log log.Logger) (*BoltDB, error) {
return err
}

_, err = transaction.CreateBucketIfNotExists([]byte(RepoBlobsBuck))
repoBlobsBuck, err := transaction.CreateBucketIfNotExists([]byte(RepoBlobsBuck))
if err != nil {
return err
}

Check warning on line 71 in pkg/meta/boltdb/boltdb.go

View check run for this annotation

Codecov / codecov/patch

pkg/meta/boltdb/boltdb.go#L70-L71

Added lines #L70 - L71 were not covered by tests

_, err = repoBlobsBuck.CreateBucketIfNotExists([]byte(RepoLastUpdatedBuck))
if err != nil {
return err
}
Expand All @@ -84,6 +89,53 @@ func New(boltDB *bbolt.DB, log log.Logger) (*BoltDB, error) {
}, nil
}

func (bdw *BoltDB) GetAllRepoNames() ([]string, error) {
repoNames := []string{}

err := bdw.DB.View(func(tx *bbolt.Tx) error {
repoMetaBuck := tx.Bucket([]byte(RepoMetaBuck))

return repoMetaBuck.ForEach(func(repo, _ []byte) error {
repoNames = append(repoNames, string(repo))

return nil
})
})

return repoNames, err
}

func (bdw *BoltDB) GetRepoLastUpdated(repo string) time.Time {
lastUpdated := time.Time{}

err := bdw.DB.View(func(tx *bbolt.Tx) error {
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))

lastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))

lastUpdatedBlob := lastUpdatedBuck.Get([]byte(repo))
if len(lastUpdatedBlob) == 0 {
return zerr.ErrRepoMetaNotFound
}

protoTime := &timestamppb.Timestamp{}

err := proto.Unmarshal(lastUpdatedBlob, protoTime)
if err != nil {
return err
}

lastUpdated = *mConvert.GetTime(protoTime)

return nil
})
if err != nil {
return time.Time{}
}

return lastUpdated
}

func (bdw *BoltDB) SetImageMeta(digest godigest.Digest, imageMeta mTypes.ImageMeta) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
buck := tx.Bucket([]byte(ImageMetaBuck))
Expand Down Expand Up @@ -132,6 +184,7 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference
err = bdw.DB.Update(func(tx *bbolt.Tx) error {
repoBuck := tx.Bucket([]byte(RepoMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))
imageBuck := tx.Bucket([]byte(ImageMetaBuck))

// 1. Add image data to db if needed
Expand Down Expand Up @@ -226,6 +279,11 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference

protoRepoMeta, repoBlobs = common.AddImageMetaToRepoMeta(protoRepoMeta, repoBlobs, reference, imageMeta)

err = setRepoLastUpdated(repo, time.Now(), repoLastUpdatedBuck)
if err != nil {
return err
}

Check warning on line 285 in pkg/meta/boltdb/boltdb.go

View check run for this annotation

Codecov / codecov/patch

pkg/meta/boltdb/boltdb.go#L284-L285

Added lines #L284 - L285 were not covered by tests

err = setProtoRepoBlobs(repoBlobs, repoBlobsBuck)
if err != nil {
return err
Expand All @@ -237,6 +295,17 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference
return err
}

func setRepoLastUpdated(repo string, lastUpdated time.Time, repoLastUpdatedBuck *bbolt.Bucket) error {
protoTime := timestamppb.New(lastUpdated)

protoTimeBlob, err := proto.Marshal(protoTime)
if err != nil {
return err
}

Check warning on line 304 in pkg/meta/boltdb/boltdb.go

View check run for this annotation

Codecov / codecov/patch

pkg/meta/boltdb/boltdb.go#L303-L304

Added lines #L303 - L304 were not covered by tests

return repoLastUpdatedBuck.Put([]byte(repo), protoTimeBlob)
}

func unmarshalProtoRepoBlobs(repo string, repoBlobsBytes []byte) (*proto_go.RepoBlobs, error) {
repoBlobs := &proto_go.RepoBlobs{
Name: repo,
Expand Down Expand Up @@ -1010,6 +1079,7 @@ func (bdw *BoltDB) DecrementRepoStars(repo string) error {
func (bdw *BoltDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
buck := tx.Bucket([]byte(RepoMetaBuck))
repoLastUpdatedBuck := tx.Bucket([]byte(RepoBlobsBuck)).Bucket([]byte(RepoLastUpdatedBuck))

repoMeta.Name = repo

Expand All @@ -1018,7 +1088,35 @@ func (bdw *BoltDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error {
return err
}

return buck.Put([]byte(repo), repoMetaBlob)
err = buck.Put([]byte(repo), repoMetaBlob)
if err != nil {
return err
}

Check warning on line 1094 in pkg/meta/boltdb/boltdb.go

View check run for this annotation

Codecov / codecov/patch

pkg/meta/boltdb/boltdb.go#L1093-L1094

Added lines #L1093 - L1094 were not covered by tests

// The last update time is set to 0 in order to force an update in case of a next storage parsing
return setRepoLastUpdated(repo, time.Time{}, repoLastUpdatedBuck)
})

return err
}

func (bdw *BoltDB) DeleteRepoMeta(repo string) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
repoBuck := tx.Bucket([]byte(RepoMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))

err := repoBuck.Delete([]byte(repo))
if err != nil {
return err
}

Check warning on line 1112 in pkg/meta/boltdb/boltdb.go

View check run for this annotation

Codecov / codecov/patch

pkg/meta/boltdb/boltdb.go#L1111-L1112

Added lines #L1111 - L1112 were not covered by tests

err = repoBlobsBuck.Delete([]byte(repo))
if err != nil {
return err
}

Check warning on line 1117 in pkg/meta/boltdb/boltdb.go

View check run for this annotation

Codecov / codecov/patch

pkg/meta/boltdb/boltdb.go#L1116-L1117

Added lines #L1116 - L1117 were not covered by tests

return repoLastUpdatedBuck.Delete([]byte(repo))
})

return err
Expand Down Expand Up @@ -1212,6 +1310,7 @@ func (bdw *BoltDB) RemoveRepoReference(repo, reference string, manifestDigest go
repoMetaBuck := tx.Bucket([]byte(RepoMetaBuck))
imageMetaBuck := tx.Bucket([]byte(ImageMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))

protoRepoMeta, err := getProtoRepoMeta(repo, repoMetaBuck)
if err != nil {
Expand Down Expand Up @@ -1292,6 +1391,11 @@ func (bdw *BoltDB) RemoveRepoReference(repo, reference string, manifestDigest go
return err
}

err = setRepoLastUpdated(repo, time.Now(), repoLastUpdatedBuck)
if err != nil {
return err
}

Check warning on line 1397 in pkg/meta/boltdb/boltdb.go

View check run for this annotation

Codecov / codecov/patch

pkg/meta/boltdb/boltdb.go#L1396-L1397

Added lines #L1396 - L1397 were not covered by tests

protoRepoMeta, repoBlobs = common.RemoveImageFromRepoMeta(protoRepoMeta, repoBlobs, reference)

repoBlobsBytes, err = proto.Marshal(repoBlobs)
Expand Down Expand Up @@ -1934,12 +2038,39 @@ func (bdw *BoltDB) ResetDB() error {
}

func resetBucket(transaction *bbolt.Tx, bucketName string) error {
err := transaction.DeleteBucket([]byte(bucketName))
bucket := transaction.Bucket([]byte(bucketName))
if bucket == nil {
return nil
}

Check warning on line 2044 in pkg/meta/boltdb/boltdb.go

View check run for this annotation

Codecov / codecov/patch

pkg/meta/boltdb/boltdb.go#L2043-L2044

Added lines #L2043 - L2044 were not covered by tests

// we need to create the sub buckets if they exits, we'll presume the sub-buckets are not nested more than 1 layer
subBuckets := [][]byte{}

err := bucket.ForEachBucket(func(bucketName []byte) error {
subBuckets = append(subBuckets, bucketName)

return nil
})
if err != nil {
return err
}

_, err = transaction.CreateBucketIfNotExists([]byte(bucketName))
err = transaction.DeleteBucket([]byte(bucketName))
if err != nil {
return err
}

Check warning on line 2061 in pkg/meta/boltdb/boltdb.go

View check run for this annotation

Codecov / codecov/patch

pkg/meta/boltdb/boltdb.go#L2060-L2061

Added lines #L2060 - L2061 were not covered by tests

bucket, err = transaction.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return err
}

Check warning on line 2066 in pkg/meta/boltdb/boltdb.go

View check run for this annotation

Codecov / codecov/patch

pkg/meta/boltdb/boltdb.go#L2065-L2066

Added lines #L2065 - L2066 were not covered by tests

for _, subBucket := range subBuckets {
_, err := bucket.CreateBucketIfNotExists(subBucket)
if err != nil {
return err
}

Check warning on line 2072 in pkg/meta/boltdb/boltdb.go

View check run for this annotation

Codecov / codecov/patch

pkg/meta/boltdb/boltdb.go#L2071-L2072

Added lines #L2071 - L2072 were not covered by tests
}

return err
}
67 changes: 15 additions & 52 deletions pkg/meta/boltdb/boltdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,58 +63,6 @@ func TestWrapperErrors(t *testing.T) {

ctx := userAc.DeriveContext(context.Background())

Convey("ResetDB", func() {
Convey("RepoMetaBuck", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.RepoMetaBuck))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})

Convey("ImageMetaBuck", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.ImageMetaBuck))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})

Convey("RepoBlobsBuck", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.RepoBlobsBuck))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})

Convey("UserAPIKeysBucket", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.UserAPIKeysBucket))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})

Convey("UserDataBucket", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.UserDataBucket))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})
})

Convey("RemoveRepoReference", func() {
Convey("getProtoRepoMeta errors", func() {
err := setRepoMeta("repo", badProtoBlob, boltdbWrapper.DB)
Expand Down Expand Up @@ -192,6 +140,21 @@ func TestWrapperErrors(t *testing.T) {
})
})

Convey("GetRepoLastUpdated", func() {
Convey("bad blob in db", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
repoBlobsBuck := tx.Bucket([]byte(boltdb.RepoBlobsBuck))
lastUpdatedBuck := repoBlobsBuck.Bucket([]byte(boltdb.RepoLastUpdatedBuck))

return lastUpdatedBuck.Put([]byte("repo"), []byte("bad-blob"))
})
So(err, ShouldBeNil)

lastUpdated := boltdbWrapper.GetRepoLastUpdated("repo")
So(lastUpdated, ShouldEqual, time.Time{})
})
})

Convey("UpdateStatsOnDownload", func() {
Convey("repo meta not found", func() {
err = boltdbWrapper.UpdateStatsOnDownload("repo", "ref")
Expand Down
6 changes: 5 additions & 1 deletion pkg/meta/boltdb/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ package boltdb
const (
ImageMetaBuck = "ImageMeta"
RepoMetaBuck = "RepoMeta"
RepoBlobsBuck = "RepoBlobsMeta"
UserDataBucket = "UserData"
VersionBucket = "Version"
UserAPIKeysBucket = "UserAPIKeys"
)

const (
RepoBlobsBuck = "RepoBlobsMeta"
RepoLastUpdatedBuck = "RepoLastUpdated" // Sub-bucket
)
2 changes: 1 addition & 1 deletion pkg/meta/boltdb/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type DBParameters struct {
func GetBoltDriver(params DBParameters) (*bolt.DB, error) {
const perms = 0o600

boltDB, err := bolt.Open(path.Join(params.RootDir, "repo.db"), perms, &bolt.Options{Timeout: time.Second * 10})
boltDB, err := bolt.Open(path.Join(params.RootDir, "meta.db"), perms, &bolt.Options{Timeout: time.Second * 10})
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 19770a8

Please sign in to comment.