Skip to content

Commit

Permalink
feat(startup): update logic for metadb update on startup, skip unmodi…
Browse files Browse the repository at this point in the history
…fied repos

Signed-off-by: Laurentiu Niculae <[email protected]>
  • Loading branch information
laurentiuNiculae committed Nov 10, 2023
1 parent 4ed4661 commit b4621f5
Show file tree
Hide file tree
Showing 17 changed files with 602 additions and 79 deletions.
140 changes: 136 additions & 4 deletions pkg/meta/boltdb/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ func New(boltDB *bbolt.DB, log log.Logger) (*BoltDB, error) {
return err
}

_, err = transaction.CreateBucketIfNotExists([]byte(RepoBlobsBuck))
repoBlobsBuck, err := transaction.CreateBucketIfNotExists([]byte(RepoBlobsBuck))
if err != nil {
return err
}

_, err = repoBlobsBuck.CreateBucketIfNotExists([]byte(RepoLastUpdatedBuck))
if err != nil {
return err
}
Expand All @@ -84,6 +89,53 @@ func New(boltDB *bbolt.DB, log log.Logger) (*BoltDB, error) {
}, nil
}

func (bdw *BoltDB) GetAllRepoNames() ([]string, error) {
repoNames := []string{}

err := bdw.DB.View(func(tx *bbolt.Tx) error {
repoMetaBuck := tx.Bucket([]byte(RepoMetaBuck))

return repoMetaBuck.ForEach(func(repo, _ []byte) error {
repoNames = append(repoNames, string(repo))

return nil
})
})

return repoNames, err
}

func (bdw *BoltDB) GetRepoLastUpdated(repo string) time.Time {
lastUpdated := time.Time{}

err := bdw.DB.View(func(tx *bbolt.Tx) error {
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))

lastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))

lastUpdatedBlob := lastUpdatedBuck.Get([]byte(repo))
if len(lastUpdatedBlob) == 0 {
return zerr.ErrRepoMetaNotFound
}

protoTime := &timestamppb.Timestamp{}

err := proto.Unmarshal(lastUpdatedBlob, protoTime)
if err != nil {
return err
}

lastUpdated = *mConvert.GetTime(protoTime)

return nil
})
if err != nil {
return time.Time{}
}

return lastUpdated
}

func (bdw *BoltDB) SetImageMeta(digest godigest.Digest, imageMeta mTypes.ImageMeta) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
buck := tx.Bucket([]byte(ImageMetaBuck))
Expand Down Expand Up @@ -132,6 +184,7 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference
err = bdw.DB.Update(func(tx *bbolt.Tx) error {
repoBuck := tx.Bucket([]byte(RepoMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))
imageBuck := tx.Bucket([]byte(ImageMetaBuck))

// 1. Add image data to db if needed
Expand Down Expand Up @@ -226,6 +279,11 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference

protoRepoMeta, repoBlobs = common.AddImageMetaToRepoMeta(protoRepoMeta, repoBlobs, reference, imageMeta)

err = updatedRepoLastUpdated(repo, repoLastUpdatedBuck)
if err != nil {
return err
}

err = setProtoRepoBlobs(repoBlobs, repoBlobsBuck)
if err != nil {
return err
Expand All @@ -237,6 +295,17 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference
return err
}

func updatedRepoLastUpdated(repo string, repoLastUpdatedBuck *bbolt.Bucket) error {
protoTime := timestamppb.Now()

protoTimeBlob, err := proto.Marshal(protoTime)
if err != nil {
return err
}

return repoLastUpdatedBuck.Put([]byte(repo), protoTimeBlob)
}

func unmarshalProtoRepoBlobs(repo string, repoBlobsBytes []byte) (*proto_go.RepoBlobs, error) {
repoBlobs := &proto_go.RepoBlobs{
Name: repo,
Expand Down Expand Up @@ -1010,6 +1079,7 @@ func (bdw *BoltDB) DecrementRepoStars(repo string) error {
func (bdw *BoltDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
buck := tx.Bucket([]byte(RepoMetaBuck))
repLastUpdatedBuck := tx.Bucket([]byte(RepoBlobsBuck)).Bucket([]byte(RepoLastUpdatedBuck))

repoMeta.Name = repo

Expand All @@ -1018,7 +1088,42 @@ func (bdw *BoltDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error {
return err
}

return buck.Put([]byte(repo), repoMetaBlob)
err = buck.Put([]byte(repo), repoMetaBlob)
if err != nil {
return err
}

// The last update time is set to 0 in order to force an update in case of a next storage parsing
protoTime := timestamppb.New(time.Time{})

protoTimeBlob, err := proto.Marshal(protoTime)
if err != nil {
return err
}

return repLastUpdatedBuck.Put([]byte(repo), protoTimeBlob)
})

return err
}

func (bdw *BoltDB) DeleteRepoMeta(repo string) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
repoBuck := tx.Bucket([]byte(RepoMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))

err := repoBuck.Delete([]byte(repo))
if err != nil {
return err
}

err = repoBlobsBuck.Delete([]byte(repo))
if err != nil {
return err
}

return repoLastUpdatedBuck.Delete([]byte(repo))
})

return err
Expand Down Expand Up @@ -1934,12 +2039,39 @@ func (bdw *BoltDB) ResetDB() error {
}

func resetBucket(transaction *bbolt.Tx, bucketName string) error {
err := transaction.DeleteBucket([]byte(bucketName))
bucket := transaction.Bucket([]byte(bucketName))
if bucket == nil {
return nil
}

// we need to create the sub buckets if they exits, we'll presume the sub-buckets are not nested more than 1 layer
subBuckets := [][]byte{}

err := bucket.ForEachBucket(func(bucketName []byte) error {
subBuckets = append(subBuckets, bucketName)

return nil
})
if err != nil {
return err
}

_, err = transaction.CreateBucketIfNotExists([]byte(bucketName))
err = transaction.DeleteBucket([]byte(bucketName))
if err != nil {
return err
}

bucket, err = transaction.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return err
}

for _, subBucket := range subBuckets {
_, err := bucket.CreateBucketIfNotExists(subBucket)
if err != nil {
return err
}
}

return err
}
6 changes: 5 additions & 1 deletion pkg/meta/boltdb/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ package boltdb
const (
ImageMetaBuck = "ImageMeta"
RepoMetaBuck = "RepoMeta"
RepoBlobsBuck = "RepoBlobsMeta"
UserDataBucket = "UserData"
VersionBucket = "Version"
UserAPIKeysBucket = "UserAPIKeys"
)

const (
RepoBlobsBuck = "RepoBlobsMeta"
RepoLastUpdatedBuck = "RepoLastUpdated" // Sub-bucket
)
Loading

0 comments on commit b4621f5

Please sign in to comment.