Skip to content

Commit

Permalink
feat(sync): specific images
Browse files Browse the repository at this point in the history
added the option to sync specific images(docker/oci)
specific repo:tags can be set in sync config

Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Oct 30, 2023
1 parent 4cb7a6c commit ac152fc
Show file tree
Hide file tree
Showing 12 changed files with 887 additions and 85 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ run-blackbox-ci: check-blackbox-prerequisites binary binary-minimal cli
$(BATS) $(BATS_FLAGS) test/blackbox/cve.bats && \
$(BATS) $(BATS_FLAGS) test/blackbox/sync.bats && \
$(BATS) $(BATS_FLAGS) test/blackbox/sync_docker.bats && \
$(BATS) $(BATS_FLAGS) test/blackbox/sync_specific_images.bats && \
$(BATS) $(BATS_FLAGS) test/blackbox/sync_replica_cluster.bats && \
$(BATS) $(BATS_FLAGS) test/blackbox/scrub.bats && \
$(BATS) $(BATS_FLAGS) test/blackbox/garbage_collect.bats && \
Expand Down
1 change: 1 addition & 0 deletions pkg/extensions/config/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type RegistryConfig struct {
}

type Content struct {
Images map[string][]string
Prefix string
Tags *Tags
Destination string `mapstructure:",omitempty"`
Expand Down
35 changes: 33 additions & 2 deletions pkg/extensions/extension_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
return nil, zerr.ErrSyncNoURLsLeft
}

isPeriodical := len(registryConfig.Content) != 0 && registryConfig.PollInterval != 0
isPeriodical := isPeriodical(registryConfig)
isOnDemand := registryConfig.OnDemand
hasSpecificImagesToSync := hasSpecificImages(registryConfig)

if isPeriodical || isOnDemand {
if isPeriodical || isOnDemand || hasSpecificImagesToSync {
service, err := sync.New(registryConfig, config.Extensions.Sync.CredentialsFile,
storeController, metaDB, log)
if err != nil {
Expand All @@ -54,6 +55,12 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority)
}

if hasSpecificImagesToSync {
// add to task scheduler periodically sync specific images
gen := sync.NewSpecificImagesGenerator(service, log)
sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority)
}

if isOnDemand {
// onDemand services used in routes.go
onDemand.Add(service)
Expand Down Expand Up @@ -171,3 +178,27 @@ func removeSelfURLs(config *config.Config, registryConfig *syncconf.RegistryConf

return nil
}

func hasSpecificImages(registryConfig syncconf.RegistryConfig) bool {
if registryConfig.PollInterval != 0 {
for _, content := range registryConfig.Content {
if len(content.Images) > 0 {
return true
}
}
}

return false
}

func isPeriodical(registryConfig syncconf.RegistryConfig) bool {
if registryConfig.PollInterval != 0 {
for _, content := range registryConfig.Content {
if content.Prefix != "" {
return true
}
}
}

return false
}
25 changes: 19 additions & 6 deletions pkg/extensions/sync/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Masterminds/semver"
glob "github.com/bmatcuk/doublestar/v4"

zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/common"
syncconf "zotregistry.io/zot/pkg/extensions/config/sync"
"zotregistry.io/zot/pkg/log"
Expand Down Expand Up @@ -75,27 +76,35 @@ func (cm ContentManager) FilterTags(repo string, tags []string) ([]string, error
GetRepoDestination applies content destination config rule and returns the final repo namespace.
- used by periodically sync.
*/
func (cm ContentManager) GetRepoDestination(repo string) string {
func (cm ContentManager) GetRepoDestination(repo string) (string, error) {
if len(cm.contents) == 0 {
return repo, nil
}

content := cm.getContentByUpstreamRepo(repo)
if content == nil {
return ""
return "", zerr.ErrSyncImageFilteredOut
}

return getRepoDestination(repo, *content)
return getRepoDestination(repo, *content), nil
}

/*
GetRepoSource is the inverse function of GetRepoDestination, needed in on demand to find out
the remote name of a repo given a local repo.
- used by on demand sync.
*/
func (cm ContentManager) GetRepoSource(repo string) string {
func (cm ContentManager) GetRepoSource(repo string) (string, error) {
if len(cm.contents) == 0 {
return repo, nil
}

content := cm.getContentByLocalRepo(repo)
if content == nil {
return ""
return "", zerr.ErrSyncImageFilteredOut
}

return getRepoSource(repo, *content)
return getRepoSource(repo, *content), nil
}

// utilies functions.
Expand All @@ -109,6 +118,10 @@ func (cm ContentManager) getContentByUpstreamRepo(repo string) *syncconf.Content
prefix = content.Prefix
}

if _, ok := content.Images[repo]; ok {
return &content
}

matched, err := glob.Match(prefix, repo)
if err != nil {
cm.log.Error().Str("errorType", common.TypeOf(err)).
Expand Down
11 changes: 9 additions & 2 deletions pkg/extensions/sync/content_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,23 @@ func TestContentManager(t *testing.T) {
Convey("Test GetRepoDestination()", t, func() {
for _, test := range testCases {
cm := NewContentManager([]syncconf.Content{test.content}, log.Logger{})
actualResult := cm.GetRepoDestination(test.expected)
actualResult, _ := cm.GetRepoDestination(test.expected)
So(actualResult, ShouldEqual, test.repo)
}
})

Convey("Test GetRepoDestination() with empty contents", t, func() {
cm := NewContentManager(nil, log.Logger{})
repoName := "test"
actualResult, _ := cm.GetRepoDestination(repoName)
So(actualResult, ShouldEqual, repoName)
})

// this is the inverse function of getRepoDestination()
Convey("Test GetRepoSource()", t, func() {
for _, test := range testCases {
cm := NewContentManager([]syncconf.Content{test.content}, log.Logger{})
actualResult := cm.GetRepoSource(test.repo)
actualResult, _ := cm.GetRepoSource(test.repo)
So(actualResult, ShouldEqual, test.expected)
}
})
Expand Down
42 changes: 32 additions & 10 deletions pkg/extensions/sync/on_demand.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (onDemand *BaseOnDemand) SyncImage(ctx context.Context, repo, reference str
return err
}

func (onDemand *BaseOnDemand) SyncReference(ctx context.Context, repo string,
func (onDemand *BaseOnDemand) SyncReference(ctx context.Context, localRepo string,
subjectDigestStr string, referenceType string,
) error {
var err error
Expand All @@ -94,7 +94,17 @@ func (onDemand *BaseOnDemand) SyncReference(ctx context.Context, repo string,
return err
}

err = service.SyncReference(ctx, repo, subjectDigestStr, referenceType)
remoteRepo, err := service.GetRemoteRepoName(localRepo)
if err != nil {
// filtered out
onDemand.log.Info().Str("local repository", localRepo).
Str("repository", localRepo).Str("subject", subjectDigestStr).
Msg("will not sync image, filtered out by content")

continue
}

err = service.SyncReference(ctx, localRepo, remoteRepo, subjectDigestStr, referenceType)
if err != nil {
continue
} else {
Expand All @@ -105,7 +115,7 @@ func (onDemand *BaseOnDemand) SyncReference(ctx context.Context, repo string,
return err
}

func (onDemand *BaseOnDemand) syncImage(ctx context.Context, repo, reference string, syncResult chan error) {
func (onDemand *BaseOnDemand) syncImage(ctx context.Context, localRepo, reference string, syncResult chan error) {
var err error
for serviceID, service := range onDemand.services {
err = service.SetNextAvailableURL()
Expand All @@ -115,16 +125,25 @@ func (onDemand *BaseOnDemand) syncImage(ctx context.Context, repo, reference str
return
}

err = service.SyncImage(ctx, repo, reference)
remoteRepo, err := service.GetRemoteRepoName(localRepo)
if err != nil {
// filtered out
onDemand.log.Info().Str("local repository", localRepo).
Str("remote repository", remoteRepo).Str("repository", localRepo).Str("reference", reference).
Msg("will not sync image, filtered out by content")

continue
}

err = service.SyncImage(ctx, localRepo, remoteRepo, reference)
if err != nil {
if errors.Is(err, zerr.ErrManifestNotFound) ||
errors.Is(err, zerr.ErrSyncImageFilteredOut) ||
errors.Is(err, zerr.ErrSyncImageNotSigned) {
continue
}

req := request{
repo: repo,
repo: localRepo,
reference: reference,
serviceID: serviceID,
isBackground: true,
Expand All @@ -143,22 +162,25 @@ func (onDemand *BaseOnDemand) syncImage(ctx context.Context, repo, reference str
// remove image after syncing
defer func() {
onDemand.requestStore.Delete(req)
onDemand.log.Info().Str("repo", repo).Str("reference", reference).
onDemand.log.Info().Str("local repository", localRepo).
Str("remote repository", remoteRepo).Str("reference", reference).
Msg("sync routine for image exited")
}()

onDemand.log.Info().Str("repo", repo).Str(reference, "reference").Str("err", err.Error()).
onDemand.log.Info().Str("local repository", localRepo).
Str("remote repository", remoteRepo).Str(reference, "reference").Str("err", err.Error()).
Msg("sync routine: starting routine to copy image, because of error")

time.Sleep(retryOptions.Delay)

// retrying in background, can't use the same context which should be cancelled by now.
if err = retry.RetryIfNecessary(context.Background(), func() error {
err := service.SyncImage(context.Background(), repo, reference)
err := service.SyncImage(context.Background(), localRepo, remoteRepo, reference)

return err
}, retryOptions); err != nil {
onDemand.log.Error().Str("errorType", common.TypeOf(err)).Str("repo", repo).Str("reference", reference).
onDemand.log.Error().Str("errorType", common.TypeOf(err)).Str("local repository", localRepo).
Str("remote repository", remoteRepo).Str("reference", reference).
Err(err).Msg("sync routine: error while copying image")
}
}(service)
Expand Down
Loading

0 comments on commit ac152fc

Please sign in to comment.