Skip to content

Commit

Permalink
feat(sync): local tmp store
Browse files Browse the repository at this point in the history
Signed-off-by: a <[email protected]>
  • Loading branch information
elee1766 committed Nov 11, 2023
1 parent 6a66a9b commit d700667
Show file tree
Hide file tree
Showing 9 changed files with 623 additions and 55 deletions.
6 changes: 0 additions & 6 deletions pkg/cli/server/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,6 @@ func validateConfiguration(config *config.Config, log zlog.Logger) error {
return zerr.ErrBadConfig
}

Check failure on line 383 in pkg/cli/server/root.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
// enforce filesystem storage in case sync feature is enabled
if config.Extensions != nil && config.Extensions.Sync != nil {
log.Error().Err(zerr.ErrBadConfig).Msg("sync supports only filesystem storage")

return zerr.ErrBadConfig
}
}

Check failure on line 384 in pkg/cli/server/root.go

View workflow job for this annotation

GitHub Actions / lint

block should not end with a whitespace (or comment) (wsl)

// enforce s3 driver on subpaths in case of using storage driver
Expand Down
1 change: 1 addition & 0 deletions pkg/extensions/config/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Credentials struct {
type Config struct {
Enable *bool
CredentialsFile string
TmpDir string
Registries []RegistryConfig
}

Expand Down
39 changes: 24 additions & 15 deletions pkg/extensions/extension_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package extensions
import (
"net"
"net/url"
"os"
"strings"

zerr "zotregistry.io/zot/errors"
Expand Down Expand Up @@ -41,23 +42,31 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
isPeriodical := len(registryConfig.Content) != 0 && registryConfig.PollInterval != 0
isOnDemand := registryConfig.OnDemand

if isPeriodical || isOnDemand {
service, err := sync.New(registryConfig, config.Extensions.Sync.CredentialsFile,
storeController, metaDB, log)
if err != nil {
return nil, err
}
if !(isPeriodical || isOnDemand) {
continue
}

if isPeriodical {
// add to task scheduler periodic sync
gen := sync.NewTaskGenerator(service, log)
sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority)
}
tmpDir := config.Extensions.Sync.TmpDir
if tmpDir == "" {
// use an os tmpdir as tmpdir if not set
tmpDir = os.TempDir()
}

if isOnDemand {
// onDemand services used in routes.go
onDemand.Add(service)
}
service, err := sync.New(registryConfig, config.Extensions.Sync.CredentialsFile, tmpDir,
storeController, metaDB, log)
if err != nil {
return nil, err
}

if isPeriodical {
// add to task scheduler periodic sync
gen := sync.NewTaskGenerator(service, log)
sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority)
}

if isOnDemand {
// onDemand services used in routes.go
onDemand.Add(service)
}
}

Expand Down
29 changes: 19 additions & 10 deletions pkg/extensions/sync/local.go → pkg/extensions/sync/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,34 @@ import (
storageTypes "zotregistry.io/zot/pkg/storage/types"
)

type LocalRegistry struct {
type DestinationRegistry struct {
storeController storage.StoreController
tempStorage OciLayoutStorage
metaDB mTypes.MetaDB
log log.Logger
}

func NewLocalRegistry(storeController storage.StoreController, metaDB mTypes.MetaDB, log log.Logger) Local {
return &LocalRegistry{
func NewDestinationRegistry(
storeController storage.StoreController,
tmpStorage OciLayoutStorage,
metaDB mTypes.MetaDB,
log log.Logger,
) Destination {
if tmpStorage == nil {
// to allow passing nil we can do this, noting that it will only work for a local StoreController
tmpStorage = NewOciLayoutStorage(storeController)
}
return &DestinationRegistry{

Check failure on line 49 in pkg/extensions/sync/destination.go

View workflow job for this annotation

GitHub Actions / lint

return statements should not be cuddled if block has more than two lines (wsl)
storeController: storeController,
metaDB: metaDB,
// first we sync from remote (using containers/image copy from docker:// to oci:) to a temp imageStore
// then we copy the image from tempStorage to zot's storage using ImageStore APIs
tempStorage: NewOciLayoutStorage(storeController),
tempStorage: tmpStorage,
log: log,
}
}

func (registry *LocalRegistry) CanSkipImage(repo, tag string, imageDigest digest.Digest) (bool, error) {
func (registry *DestinationRegistry) CanSkipImage(repo, tag string, imageDigest digest.Digest) (bool, error) {
// check image already synced
imageStore := registry.storeController.GetImageStore(repo)

Expand Down Expand Up @@ -75,16 +84,16 @@ func (registry *LocalRegistry) CanSkipImage(repo, tag string, imageDigest digest
return true, nil
}

func (registry *LocalRegistry) GetContext() *types.SystemContext {
func (registry *DestinationRegistry) GetContext() *types.SystemContext {
return registry.tempStorage.GetContext()
}

func (registry *LocalRegistry) GetImageReference(repo, reference string) (types.ImageReference, error) {
func (registry *DestinationRegistry) GetImageReference(repo, reference string) (types.ImageReference, error) {
return registry.tempStorage.GetImageReference(repo, reference)
}

// finalize a syncing image.
func (registry *LocalRegistry) CommitImage(imageReference types.ImageReference, repo, reference string) error {
func (registry *DestinationRegistry) CommitImage(imageReference types.ImageReference, repo, reference string) error {
imageStore := registry.storeController.GetImageStore(repo)

tempImageStore := getImageStoreFromImageReference(imageReference, repo, reference)
Expand Down Expand Up @@ -180,7 +189,7 @@ func (registry *LocalRegistry) CommitImage(imageReference types.ImageReference,
return nil
}

func (registry *LocalRegistry) copyManifest(repo string, manifestContent []byte, reference string,
func (registry *DestinationRegistry) copyManifest(repo string, manifestContent []byte, reference string,
tempImageStore storageTypes.ImageStore,
) error {
imageStore := registry.storeController.GetImageStore(repo)
Expand Down Expand Up @@ -239,7 +248,7 @@ func (registry *LocalRegistry) copyManifest(repo string, manifestContent []byte,
}

// Copy a blob from one image store to another image store.
func (registry *LocalRegistry) copyBlob(repo string, blobDigest digest.Digest, blobMediaType string,
func (registry *DestinationRegistry) copyBlob(repo string, blobDigest digest.Digest, blobMediaType string,
tempImageStore storageTypes.ImageStore,
) error {
imageStore := registry.storeController.GetImageStore(repo)
Expand Down
47 changes: 30 additions & 17 deletions pkg/extensions/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"zotregistry.io/zot/pkg/log"
mTypes "zotregistry.io/zot/pkg/meta/types"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/local"
)

type BaseService struct {
config syncconf.RegistryConfig
credentials syncconf.CredentialsFile
remote Remote
local Local
destination Destination
retryOptions *retry.RetryOptions
contentManager ContentManager
storeController storage.StoreController
Expand All @@ -37,8 +38,13 @@ type BaseService struct {
log log.Logger
}

func New(opts syncconf.RegistryConfig, credentialsFilepath string,
storeController storage.StoreController, metadb mTypes.MetaDB, log log.Logger,
func New(
opts syncconf.RegistryConfig,
credentialsFilepath string,
tmpDir string,
storeController storage.StoreController,
metadb mTypes.MetaDB,
log log.Logger,
) (Service, error) {
service := &BaseService{}

Expand All @@ -60,7 +66,14 @@ func New(opts syncconf.RegistryConfig, credentialsFilepath string,
service.credentials = credentialsFile

service.contentManager = NewContentManager(opts.Content, log)
service.local = NewLocalRegistry(storeController, metadb, log)

tmpImageStore := local.NewImageStore(tmpDir,
false, false, log, nil, nil, nil,
)

tmpStorage := NewOciLayoutStorage(storage.StoreController{DefaultStore: tmpImageStore})

service.destination = NewDestinationRegistry(storeController, tmpStorage, metadb, log)

retryOptions := &retry.RetryOptions{}

Expand Down Expand Up @@ -289,7 +302,7 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
service.log.Info().Str("repo", repo).Msgf("sync: syncing tags %v", tags)

// apply content.destination rule
localRepo := service.contentManager.GetRepoDestination(repo)
destinationRepo := service.contentManager.GetRepoDestination(repo)

for _, tag := range tags {
select {
Expand All @@ -305,7 +318,7 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
var manifestDigest digest.Digest

if err = retry.RetryIfNecessary(ctx, func() error {
manifestDigest, err = service.syncTag(ctx, localRepo, repo, tag)
manifestDigest, err = service.syncTag(ctx, destinationRepo, repo, tag)

return err
}, service.retryOptions); err != nil {
Expand All @@ -322,7 +335,7 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {

if manifestDigest != "" {
if err = retry.RetryIfNecessary(ctx, func() error {
err = service.references.SyncAll(ctx, localRepo, repo, manifestDigest.String())
err = service.references.SyncAll(ctx, destinationRepo, repo, manifestDigest.String())
if errors.Is(err, zerr.ErrSyncReferrerNotFound) {
return nil
}
Expand All @@ -342,8 +355,8 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
return nil
}

func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo, tag string) (digest.Digest, error) {
copyOptions := getCopyOptions(service.remote.GetContext(), service.local.GetContext())
func (service *BaseService) syncTag(ctx context.Context, destinationRepo, remoteRepo, tag string) (digest.Digest, error) {

Check failure on line 358 in pkg/extensions/sync/service.go

View workflow job for this annotation

GitHub Actions / lint

line is 122 characters (lll)
copyOptions := getCopyOptions(service.remote.GetContext(), service.destination.GetContext())

policyContext, err := getPolicyContext(service.log)
if err != nil {
Expand Down Expand Up @@ -386,38 +399,38 @@ func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo,
}
}

skipImage, err := service.local.CanSkipImage(localRepo, tag, manifestDigest)
skipImage, err := service.destination.CanSkipImage(destinationRepo, tag, manifestDigest)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("repo", localRepo).Str("reference", tag).
Str("repo", destinationRepo).Str("reference", tag).
Msg("couldn't check if the local image can be skipped")
}

if !skipImage {
localImageRef, err := service.local.GetImageReference(localRepo, tag)
localImageRef, err := service.destination.GetImageReference(destinationRepo, tag)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("repo", localRepo).Str("reference", tag).Msg("couldn't get a local image reference")
Str("repo", destinationRepo).Str("reference", tag).Msg("couldn't get a local image reference")

return "", err
}

service.log.Info().Str("remote image", remoteImageRef.DockerReference().String()).
Str("local image", fmt.Sprintf("%s:%s", localRepo, tag)).Msg("syncing image")
Str("local image", fmt.Sprintf("%s:%s", destinationRepo, tag)).Msg("syncing image")

_, err = copy.Image(ctx, policyContext, localImageRef, remoteImageRef, &copyOptions)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("remote image", remoteImageRef.DockerReference().String()).
Str("local image", fmt.Sprintf("%s:%s", localRepo, tag)).Msg("coulnd't sync image")
Str("local image", fmt.Sprintf("%s:%s", destinationRepo, tag)).Msg("coulnd't sync image")

return "", err
}

err = service.local.CommitImage(localImageRef, localRepo, tag)
err = service.destination.CommitImage(localImageRef, destinationRepo, tag)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("repo", localRepo).Str("reference", tag).Msg("couldn't commit image to local image store")
Str("repo", destinationRepo).Str("reference", tag).Msg("couldn't commit image to local image store")

return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/extensions/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Remote interface {
}

// Local registry.
type Local interface {
type Destination interface {
Registry
// Check if an image is already synced
CanSkipImage(repo, tag string, imageDigest digest.Digest) (bool, error)
Expand Down
12 changes: 6 additions & 6 deletions pkg/extensions/sync/sync_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ func TestService(t *testing.T) {
URLs: []string{"http://localhost"},
}

service, err := New(conf, "", storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
service, err := New(conf, "", os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
So(err, ShouldBeNil)

err = service.SyncRepo(context.Background(), "repo")
So(err, ShouldNotBeNil)
})
}

func TestLocalRegistry(t *testing.T) {
func TestDestinationRegistry(t *testing.T) {
Convey("make StoreController", t, func() {
dir := t.TempDir()

Expand All @@ -185,7 +185,7 @@ func TestLocalRegistry(t *testing.T) {
syncImgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver)
repoName := "repo"

registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, log)
registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, nil, log)
imageReference, err := registry.GetImageReference(repoName, "1.0")
So(err, ShouldBeNil)
So(imageReference, ShouldNotBeNil)
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestLocalRegistry(t *testing.T) {
syncImgStore := local.NewImageStore(dir, true, true, log, metrics, linter, cacheDriver)
repoName := "repo"

registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, log)
registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, nil, log)

err = registry.CommitImage(imageReference, repoName, "1.0")
So(err, ShouldBeNil)
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestLocalRegistry(t *testing.T) {
})

Convey("trigger metaDB error on index manifest in CommitImage()", func() {
registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, mocks.MetaDBMock{
registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, mocks.MetaDBMock{
SetRepoReferenceFn: func(ctx context.Context, repo string, reference string, imageMeta mTypes.ImageMeta) error {
if reference == "1.0" {
return zerr.ErrRepoMetaNotFound
Expand All @@ -351,7 +351,7 @@ func TestLocalRegistry(t *testing.T) {
})

Convey("trigger metaDB error on image manifest in CommitImage()", func() {
registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, mocks.MetaDBMock{
registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, mocks.MetaDBMock{
SetRepoReferenceFn: func(ctx context.Context, repo, reference string, imageMeta mTypes.ImageMeta) error {
return zerr.ErrRepoMetaNotFound
},
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/imagestore/imagestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,14 @@ func (is *ImageStore) GetNextRepository(repo string) (string, error) {

driverErr := &driver.Error{}

// some s3 implementations (eg, digitalocean spaces) will return pathnotfounderror for walk but not list
// therefore, we must also catch that error here.
if errors.As(err, &driver.PathNotFoundError{}) {
is.log.Debug().Msg("empty rootDir")

return "", nil
}

if errors.Is(err, io.EOF) ||
(errors.As(err, driverErr) && errors.Is(driverErr.Enclosed, io.EOF)) {
return store, nil
Expand Down
Loading

0 comments on commit d700667

Please sign in to comment.