diff --git a/Makefile b/Makefile index c0e51a9ad4..e9e0dd4953 100644 --- a/Makefile +++ b/Makefile @@ -430,6 +430,7 @@ run-blackbox-ci: check-blackbox-prerequisites binary binary-minimal cli $(BATS) $(BATS_FLAGS) test/blackbox/cve.bats && \ $(BATS) $(BATS_FLAGS) test/blackbox/sync.bats && \ $(BATS) $(BATS_FLAGS) test/blackbox/sync_docker.bats && \ + $(BATS) $(BATS_FLAGS) test/blackbox/sync_specific_images.bats && \ $(BATS) $(BATS_FLAGS) test/blackbox/sync_replica_cluster.bats && \ $(BATS) $(BATS_FLAGS) test/blackbox/scrub.bats && \ $(BATS) $(BATS_FLAGS) test/blackbox/garbage_collect.bats && \ diff --git a/pkg/extensions/config/sync/config.go b/pkg/extensions/config/sync/config.go index 96c1361d49..ff5619b825 100644 --- a/pkg/extensions/config/sync/config.go +++ b/pkg/extensions/config/sync/config.go @@ -31,6 +31,7 @@ type RegistryConfig struct { } type Content struct { + Images map[string][]string Prefix string Tags *Tags Destination string `mapstructure:",omitempty"` diff --git a/pkg/extensions/extension_sync.go b/pkg/extensions/extension_sync.go index 852112dc0d..3c6f272581 100644 --- a/pkg/extensions/extension_sync.go +++ b/pkg/extensions/extension_sync.go @@ -38,10 +38,11 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB, return nil, zerr.ErrSyncNoURLsLeft } - isPeriodical := len(registryConfig.Content) != 0 && registryConfig.PollInterval != 0 + isPeriodical := isPeriodical(registryConfig) isOnDemand := registryConfig.OnDemand + hasSpecificImagesToSync := hasSpecificImages(registryConfig) - if isPeriodical || isOnDemand { + if isPeriodical || isOnDemand || hasSpecificImagesToSync { service, err := sync.New(registryConfig, config.Extensions.Sync.CredentialsFile, storeController, metaDB, log) if err != nil { @@ -54,6 +55,12 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB, sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority) } + if hasSpecificImagesToSync { + // add to task scheduler periodically sync specific images + gen := sync.NewSpecificImagesGenerator(service, log) + sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority) + } + if isOnDemand { // onDemand services used in routes.go onDemand.Add(service) @@ -171,3 +178,27 @@ func removeSelfURLs(config *config.Config, registryConfig *syncconf.RegistryConf return nil } + +func hasSpecificImages(registryConfig syncconf.RegistryConfig) bool { + if registryConfig.PollInterval != 0 { + for _, content := range registryConfig.Content { + if len(content.Images) > 0 { + return true + } + } + } + + return false +} + +func isPeriodical(registryConfig syncconf.RegistryConfig) bool { + if registryConfig.PollInterval != 0 { + for _, content := range registryConfig.Content { + if content.Prefix != "" { + return true + } + } + } + + return false +} diff --git a/pkg/extensions/sync/content.go b/pkg/extensions/sync/content.go index 7ef2c9eaf2..7fddc95670 100644 --- a/pkg/extensions/sync/content.go +++ b/pkg/extensions/sync/content.go @@ -10,6 +10,7 @@ import ( "github.com/Masterminds/semver" glob "github.com/bmatcuk/doublestar/v4" + zerr "zotregistry.io/zot/errors" "zotregistry.io/zot/pkg/common" syncconf "zotregistry.io/zot/pkg/extensions/config/sync" "zotregistry.io/zot/pkg/log" @@ -75,13 +76,17 @@ func (cm ContentManager) FilterTags(repo string, tags []string) ([]string, error GetRepoDestination applies content destination config rule and returns the final repo namespace. - used by periodically sync. */ -func (cm ContentManager) GetRepoDestination(repo string) string { +func (cm ContentManager) GetRepoDestination(repo string) (string, error) { + if len(cm.contents) == 0 { + return repo, nil + } + content := cm.getContentByUpstreamRepo(repo) if content == nil { - return "" + return "", zerr.ErrSyncImageFilteredOut } - return getRepoDestination(repo, *content) + return getRepoDestination(repo, *content), nil } /* @@ -89,13 +94,17 @@ GetRepoSource is the inverse function of GetRepoDestination, needed in on demand the remote name of a repo given a local repo. - used by on demand sync. */ -func (cm ContentManager) GetRepoSource(repo string) string { +func (cm ContentManager) GetRepoSource(repo string) (string, error) { + if len(cm.contents) == 0 { + return repo, nil + } + content := cm.getContentByLocalRepo(repo) if content == nil { - return "" + return "", zerr.ErrSyncImageFilteredOut } - return getRepoSource(repo, *content) + return getRepoSource(repo, *content), nil } // utilies functions. @@ -109,6 +118,10 @@ func (cm ContentManager) getContentByUpstreamRepo(repo string) *syncconf.Content prefix = content.Prefix } + if _, ok := content.Images[repo]; ok { + return &content + } + matched, err := glob.Match(prefix, repo) if err != nil { cm.log.Error().Str("errorType", common.TypeOf(err)). diff --git a/pkg/extensions/sync/content_internal_test.go b/pkg/extensions/sync/content_internal_test.go index 74177a85aa..b8549f00b2 100644 --- a/pkg/extensions/sync/content_internal_test.go +++ b/pkg/extensions/sync/content_internal_test.go @@ -68,16 +68,23 @@ func TestContentManager(t *testing.T) { Convey("Test GetRepoDestination()", t, func() { for _, test := range testCases { cm := NewContentManager([]syncconf.Content{test.content}, log.Logger{}) - actualResult := cm.GetRepoDestination(test.expected) + actualResult, _ := cm.GetRepoDestination(test.expected) So(actualResult, ShouldEqual, test.repo) } }) + Convey("Test GetRepoDestination() with empty contents", t, func() { + cm := NewContentManager(nil, log.Logger{}) + repoName := "test" + actualResult, _ := cm.GetRepoDestination(repoName) + So(actualResult, ShouldEqual, repoName) + }) + // this is the inverse function of getRepoDestination() Convey("Test GetRepoSource()", t, func() { for _, test := range testCases { cm := NewContentManager([]syncconf.Content{test.content}, log.Logger{}) - actualResult := cm.GetRepoSource(test.repo) + actualResult, _ := cm.GetRepoSource(test.repo) So(actualResult, ShouldEqual, test.expected) } }) diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index 062128042d..8985e69bd3 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -83,7 +83,7 @@ func (onDemand *BaseOnDemand) SyncImage(ctx context.Context, repo, reference str return err } -func (onDemand *BaseOnDemand) SyncReference(ctx context.Context, repo string, +func (onDemand *BaseOnDemand) SyncReference(ctx context.Context, localRepo string, subjectDigestStr string, referenceType string, ) error { var err error @@ -94,7 +94,17 @@ func (onDemand *BaseOnDemand) SyncReference(ctx context.Context, repo string, return err } - err = service.SyncReference(ctx, repo, subjectDigestStr, referenceType) + remoteRepo, err := service.GetRemoteRepoName(localRepo) + if err != nil { + // filtered out + onDemand.log.Info().Str("local repository", localRepo). + Str("repository", localRepo).Str("subject", subjectDigestStr). + Msg("will not sync image, filtered out by content") + + continue + } + + err = service.SyncReference(ctx, localRepo, remoteRepo, subjectDigestStr, referenceType) if err != nil { continue } else { @@ -105,7 +115,7 @@ func (onDemand *BaseOnDemand) SyncReference(ctx context.Context, repo string, return err } -func (onDemand *BaseOnDemand) syncImage(ctx context.Context, repo, reference string, syncResult chan error) { +func (onDemand *BaseOnDemand) syncImage(ctx context.Context, localRepo, reference string, syncResult chan error) { var err error for serviceID, service := range onDemand.services { err = service.SetNextAvailableURL() @@ -115,16 +125,25 @@ func (onDemand *BaseOnDemand) syncImage(ctx context.Context, repo, reference str return } - err = service.SyncImage(ctx, repo, reference) + remoteRepo, err := service.GetRemoteRepoName(localRepo) + if err != nil { + // filtered out + onDemand.log.Info().Str("local repository", localRepo). + Str("remote repository", remoteRepo).Str("repository", localRepo).Str("reference", reference). + Msg("will not sync image, filtered out by content") + + continue + } + + err = service.SyncImage(ctx, localRepo, remoteRepo, reference) if err != nil { if errors.Is(err, zerr.ErrManifestNotFound) || - errors.Is(err, zerr.ErrSyncImageFilteredOut) || errors.Is(err, zerr.ErrSyncImageNotSigned) { continue } req := request{ - repo: repo, + repo: localRepo, reference: reference, serviceID: serviceID, isBackground: true, @@ -143,22 +162,25 @@ func (onDemand *BaseOnDemand) syncImage(ctx context.Context, repo, reference str // remove image after syncing defer func() { onDemand.requestStore.Delete(req) - onDemand.log.Info().Str("repo", repo).Str("reference", reference). + onDemand.log.Info().Str("local repository", localRepo). + Str("remote repository", remoteRepo).Str("reference", reference). Msg("sync routine for image exited") }() - onDemand.log.Info().Str("repo", repo).Str(reference, "reference").Str("err", err.Error()). + onDemand.log.Info().Str("local repository", localRepo). + Str("remote repository", remoteRepo).Str(reference, "reference").Str("err", err.Error()). Msg("sync routine: starting routine to copy image, because of error") time.Sleep(retryOptions.Delay) // retrying in background, can't use the same context which should be cancelled by now. if err = retry.RetryIfNecessary(context.Background(), func() error { - err := service.SyncImage(context.Background(), repo, reference) + err := service.SyncImage(context.Background(), localRepo, remoteRepo, reference) return err }, retryOptions); err != nil { - onDemand.log.Error().Str("errorType", common.TypeOf(err)).Str("repo", repo).Str("reference", reference). + onDemand.log.Error().Str("errorType", common.TypeOf(err)).Str("local repository", localRepo). + Str("remote repository", remoteRepo).Str("reference", reference). Err(err).Msg("sync routine: error while copying image") } }(service) diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index a121c7cef1..858dbfce37 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -174,6 +174,35 @@ func (service *BaseService) getNextRepoFromCatalog(lastRepo string) string { return nextRepo } +// get next repo with tags from specific image content config (content.Images). +func (service *BaseService) GetNextRepoTags(lastRepo string) (string, []string) { + var found bool + + var nextRepo string + + var repoTags []string + + for _, content := range service.config.Content { + for repo, tags := range content.Images { + if lastRepo == "" { + return repo, tags + } + + if repo == lastRepo { + found = true + + continue + } + + if found { + return repo, tags + } + } + } + + return nextRepo, repoTags +} + func (service *BaseService) GetNextRepo(lastRepo string) (string, error) { var err error @@ -204,55 +233,41 @@ func (service *BaseService) GetNextRepo(lastRepo string) (string, error) { return lastRepo, nil } +func (service *BaseService) GetLocalRepoName(remoteRepo string) (string, error) { + return service.contentManager.GetRepoDestination(remoteRepo) +} + +func (service *BaseService) GetRemoteRepoName(localRepo string) (string, error) { + return service.contentManager.GetRepoSource(localRepo) +} + // SyncReference on demand. -func (service *BaseService) SyncReference(ctx context.Context, repo string, - subjectDigestStr string, referenceType string, +func (service *BaseService) SyncReference(ctx context.Context, localRepo string, + remoteRepo string, subjectDigestStr string, referenceType string, ) error { - remoteRepo := repo - remoteURL := service.client.GetConfig().URL - if len(service.config.Content) > 0 { - remoteRepo = service.contentManager.GetRepoSource(repo) - if remoteRepo == "" { - service.log.Info().Str("remote", remoteURL).Str("repo", repo).Str("subject", subjectDigestStr). - Str("reference type", referenceType).Msg("will not sync reference for image, filtered out by content") - - return zerr.ErrSyncImageFilteredOut - } - } - - service.log.Info().Str("remote", remoteURL).Str("repo", repo).Str("subject", subjectDigestStr). + service.log.Info().Str("remote", remoteURL).Str("remote repository", remoteRepo). + Str("local repository", localRepo).Str("subject", subjectDigestStr). Str("reference type", referenceType).Msg("sync: syncing reference for image") - return service.references.SyncReference(ctx, repo, remoteRepo, subjectDigestStr, referenceType) + return service.references.SyncReference(ctx, localRepo, remoteRepo, subjectDigestStr, referenceType) } // SyncImage on demand. -func (service *BaseService) SyncImage(ctx context.Context, repo, reference string) error { - remoteRepo := repo - +func (service *BaseService) SyncImage(ctx context.Context, localRepo, remoteRepo, reference string) error { remoteURL := service.client.GetConfig().URL - if len(service.config.Content) > 0 { - remoteRepo = service.contentManager.GetRepoSource(repo) - if remoteRepo == "" { - service.log.Info().Str("remote", remoteURL).Str("repo", repo).Str("reference", reference). - Msg("will not sync image, filtered out by content") - - return zerr.ErrSyncImageFilteredOut - } - } - - service.log.Info().Str("remote", remoteURL).Str("repo", repo).Str("reference", reference). + service.log.Info().Str("remote", remoteURL).Str("remote repository", remoteRepo). + Str("local repository", localRepo).Str("reference", reference). Msg("sync: syncing image") - manifestDigest, err := service.syncTag(ctx, repo, remoteRepo, reference) + manifestDigest, err := service.syncTag(ctx, localRepo, remoteRepo, reference) if err != nil { return err } - err = service.references.SyncAll(ctx, repo, remoteRepo, manifestDigest.String()) + err = service.references.SyncAll(ctx, localRepo, remoteRepo, manifestDigest.String()) if err != nil && !errors.Is(err, zerr.ErrSyncReferrerNotFound) { return err } @@ -261,8 +276,9 @@ func (service *BaseService) SyncImage(ctx context.Context, repo, reference strin } // sync repo periodically. -func (service *BaseService) SyncRepo(ctx context.Context, repo string) error { - service.log.Info().Str("repo", repo).Str("registry", service.client.GetConfig().URL). +func (service *BaseService) SyncRepo(ctx context.Context, localRepo, remoteRepo string) error { + service.log.Info().Str("remote repository", remoteRepo). + Str("local repository", localRepo).Str("registry", service.client.GetConfig().URL). Msg("sync: syncing repo") var err error @@ -270,26 +286,26 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error { var tags []string if err = retry.RetryIfNecessary(ctx, func() error { - tags, err = service.remote.GetRepoTags(repo) + tags, err = service.remote.GetRepoTags(remoteRepo) return err }, service.retryOptions); err != nil { - service.log.Error().Str("errorType", common.TypeOf(err)).Str("repo", repo). + service.log.Error().Str("errorType", common.TypeOf(err)). + Str("remote repository", remoteRepo). + Str("local repository", localRepo). Err(err).Msg("error while getting tags for repo") return err } // filter tags - tags, err = service.contentManager.FilterTags(repo, tags) + tags, err = service.contentManager.FilterTags(localRepo, tags) if err != nil { return err } - service.log.Info().Str("repo", repo).Msgf("sync: syncing tags %v", tags) - - // apply content.destination rule - localRepo := service.contentManager.GetRepoDestination(repo) + service.log.Info().Str("remote repository", remoteRepo). + Str("local repository", localRepo).Msgf("sync: syncing tags %v", tags) for _, tag := range tags { select { @@ -305,7 +321,7 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error { var manifestDigest digest.Digest if err = retry.RetryIfNecessary(ctx, func() error { - manifestDigest, err = service.syncTag(ctx, localRepo, repo, tag) + manifestDigest, err = service.syncTag(ctx, localRepo, remoteRepo, tag) return err }, service.retryOptions); err != nil { @@ -314,7 +330,8 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error { continue } - service.log.Error().Str("errorType", common.TypeOf(err)).Str("repo", repo). + service.log.Error().Str("errorType", common.TypeOf(err)).Str("remote repository", remoteRepo). + Str("local repository", localRepo). Err(err).Msg("error while syncing tags for repo") return err @@ -322,22 +339,25 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error { if manifestDigest != "" { if err = retry.RetryIfNecessary(ctx, func() error { - err = service.references.SyncAll(ctx, localRepo, repo, manifestDigest.String()) + err = service.references.SyncAll(ctx, localRepo, remoteRepo, manifestDigest.String()) if errors.Is(err, zerr.ErrSyncReferrerNotFound) { return nil } return err }, service.retryOptions); err != nil { - service.log.Error().Str("errorType", common.TypeOf(err)).Str("repo", repo). - Err(err).Msg("error while syncing tags for repo") + service.log.Error().Str("errorType", common.TypeOf(err)).Str("remote repository", remoteRepo). + Str("local repository", localRepo).Str("subject", manifestDigest.String()). + Err(err).Msg("error while syncing references for image") - return err + // ignore refs errors for now, otherwise syncing specific images for docker would not work. will fix + continue } } } - service.log.Info().Str("repo", repo).Msg("sync: finished syncing repo") + service.log.Info().Str("remote repository", remoteRepo). + Str("local repository", localRepo).Msg("sync: finished syncing repo") return nil } @@ -357,14 +377,17 @@ func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo, remoteImageRef, err := service.remote.GetImageReference(remoteRepo, tag) if err != nil { service.log.Error().Err(err).Str("errortype", common.TypeOf(err)). - Str("repo", remoteRepo).Str("reference", tag).Msg("couldn't get a remote image reference") + Str("remote repository", remoteRepo). + Str("local repository", localRepo).Str("reference", tag). + Msg("couldn't get a remote image reference") return "", err } _, mediaType, manifestDigest, err := service.remote.GetManifestContent(remoteImageRef) if err != nil { - service.log.Error().Err(err).Str("repo", remoteRepo).Str("reference", tag). + service.log.Error().Err(err).Str("remote repository", remoteRepo). + Str("local repository", localRepo).Str("reference", tag). Msg("couldn't get upstream image manifest details") return "", err @@ -389,7 +412,7 @@ func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo, skipImage, err := service.local.CanSkipImage(localRepo, tag, manifestDigest) if err != nil { service.log.Error().Err(err).Str("errortype", common.TypeOf(err)). - Str("repo", localRepo).Str("reference", tag). + Str("local repository", localRepo).Str("reference", tag). Msg("couldn't check if the local image can be skipped") } @@ -397,7 +420,7 @@ func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo, localImageRef, err := service.local.GetImageReference(localRepo, tag) if err != nil { service.log.Error().Err(err).Str("errortype", common.TypeOf(err)). - Str("repo", localRepo).Str("reference", tag).Msg("couldn't get a local image reference") + Str("local repository", localRepo).Str("reference", tag).Msg("couldn't get a local image reference") return "", err } @@ -417,7 +440,7 @@ func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo, err = service.local.CommitImage(localImageRef, localRepo, tag) if err != nil { service.log.Error().Err(err).Str("errortype", common.TypeOf(err)). - Str("repo", localRepo).Str("reference", tag).Msg("couldn't commit image to local image store") + Str("local repository", localRepo).Str("reference", tag).Msg("couldn't commit image to local image store") return "", err } @@ -426,7 +449,7 @@ func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo, Msg("skipping image because it's already synced") } - service.log.Info().Str("image", remoteImageRef.DockerReference().String()).Msg("sync: finished syncing image") + service.log.Info().Str("remote image", remoteImageRef.DockerReference().String()).Msg("sync: finished syncing image") return manifestDigest, nil } diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index bb4dd0bf4e..1101f661bc 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -5,6 +5,7 @@ package sync import ( "context" + "sync" "github.com/containers/common/pkg/retry" "github.com/containers/image/v5/types" @@ -20,14 +21,21 @@ import ( // Sync general functionalities, one service per registry config. type Service interface { + // GetLocalRepoName applies sync config (destination && stripPrefix) on a remote repo to determine where to sync it. + GetLocalRepoName(remoteRepo string) (string, error) + /* GetRemoteRepoName applies sync config (destination && stripPrefix) on a local repo + to determine from where to sync it. */ + GetRemoteRepoName(localRepo string) (string, error) // Get next repo from remote /v2/_catalog, will return empty string when there is no repo left. GetNextRepo(lastRepo string) (string, error) // used by task scheduler + // Get next repo with its tags from specific images config (content.Images) + GetNextRepoTags(lastRepo string) (string, []string) // used by task scheduler // Sync a repo with all of its tags and references (signatures, artifacts, sboms) into ImageStore. - SyncRepo(ctx context.Context, repo string) error // used by periodically sync + SyncRepo(ctx context.Context, localRepo, remoteRepo string) error // used by periodically sync // Sync an image (repo:tag || repo:digest) into ImageStore. - SyncImage(ctx context.Context, repo, reference string) error // used by sync on demand + SyncImage(ctx context.Context, localRepo, remoteRepo, reference string) error // used by sync on demand // Sync a single reference for an image. - SyncReference(ctx context.Context, repo string, subjectDigestStr string, + SyncReference(ctx context.Context, localRepo, remoteRepo, subjectDigestStr string, referenceType string) error // used by sync on demand // Remove all internal catalog entries. ResetCatalog() // used by scheduler to empty out the catalog after a sync periodically roundtrip finishes @@ -96,7 +104,11 @@ func (gen *TaskGenerator) Next() (scheduler.Task, error) { repo, err := gen.Service.GetNextRepo(gen.lastRepo) if err != nil { - return nil, err + // unreachable registry task scheduler will retry at intervals + gen.done = true + + //nolint: nilerr + return nil, nil } if repo == "" { @@ -135,5 +147,90 @@ func newSyncRepoTask(repo string, service Service) *syncRepoTask { } func (srt *syncRepoTask) DoWork(ctx context.Context) error { - return srt.service.SyncRepo(ctx, srt.repo) + localRepo, err := srt.service.GetLocalRepoName(srt.repo) + // image filtered out by sync config content + if err != nil { + return nil //nolint: nilerr + } + + return srt.service.SyncRepo(ctx, localRepo, srt.repo) +} + +// periodically sync specific images generator. +type SpecificImagesGenerator struct { + Service Service + lastRepo string + done bool + log log.Logger + lock *sync.Mutex +} + +func NewSpecificImagesGenerator(service Service, log log.Logger) *SpecificImagesGenerator { + return &SpecificImagesGenerator{ + Service: service, + done: false, + lastRepo: "", + log: log, + lock: &sync.Mutex{}, + } +} + +func (gen *SpecificImagesGenerator) Next() (scheduler.Task, error) { + gen.lock.Lock() + defer gen.lock.Unlock() + + lastRepo, tags := gen.Service.GetNextRepoTags(gen.lastRepo) + if lastRepo == "" { + gen.log.Info().Msg("sync: finished syncing all repos from config") + gen.done = true + + return nil, nil + } + + gen.lastRepo = lastRepo + + return newSpecificImageTask(gen.lastRepo, tags, gen.Service, gen.log), nil +} + +func (gen *SpecificImagesGenerator) IsDone() bool { + return gen.done +} + +func (gen *SpecificImagesGenerator) IsReady() bool { + return true +} + +func (gen *SpecificImagesGenerator) Reset() { + gen.lastRepo = "" + gen.done = false +} + +type specificImageTask struct { + repo string + tags []string + service Service + log log.Logger +} + +func newSpecificImageTask(repo string, tags []string, service Service, log log.Logger) *specificImageTask { + return &specificImageTask{repo, tags, service, log} +} + +func (sit *specificImageTask) DoWork(ctx context.Context) error { + localRepo, _ := sit.service.GetLocalRepoName(sit.repo) + + // if no references provided, sync all repo's tags + if len(sit.tags) == 0 { + return sit.service.SyncRepo(ctx, localRepo, sit.repo) + } + + // otherwise sync only given references + for _, reference := range sit.tags { + _ = sit.service.SyncImage(ctx, localRepo, sit.repo, reference) + } + + sit.log.Info().Str("local repo", localRepo).Str("remote repo", sit.repo).Interface("tags", sit.tags). + Msg("sync: finished syncing specific images for repo") + + return nil } diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index 9b6c2968a8..339b4e1bbf 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -164,7 +164,7 @@ func TestService(t *testing.T) { service, err := New(conf, "", storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{}) So(err, ShouldBeNil) - err = service.SyncRepo(context.Background(), "repo") + err = service.SyncRepo(context.Background(), "repo", "repo") So(err, ShouldNotBeNil) }) } diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index b0519c5560..aa4690b6a3 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -2263,6 +2263,268 @@ func TestBadTLS(t *testing.T) { }) } +func TestSpecificImages(t *testing.T) { + Convey("Setup upstream", t, func() { + updateDuration, _ := time.ParseDuration("30m") + + sctlr, srcBaseURL, _, _, _ := makeUpstreamServer(t, false, false) + + scm := test.NewControllerManager(sctlr) + scm.StartAndWait(sctlr.Config.HTTP.Port) + defer scm.StopServer() + + repo1 := "repo1" + image1 := CreateRandomImage() + image2 := CreateRandomImage() + image3 := CreateRandomImage() + + err := UploadImage(image1, srcBaseURL, repo1, "v1") + So(err, ShouldBeNil) + + err = UploadImage(image2, srcBaseURL, repo1, "v2") + So(err, ShouldBeNil) + + err = UploadImage(image3, srcBaseURL, repo1, "v3") + So(err, ShouldBeNil) + + repo2 := "repo2" + + index1 := CreateRandomMultiarch() + index2 := CreateRandomMultiarch() + image4 := CreateRandomImage() + + err = UploadMultiarchImage(index1, srcBaseURL, repo2, "v1") + So(err, ShouldBeNil) + + err = UploadMultiarchImage(index2, srcBaseURL, repo2, "v2") + So(err, ShouldBeNil) + + err = UploadImage(image4, srcBaseURL, repo2, "v3") + So(err, ShouldBeNil) + + Convey("Verify sync specific images works", func() { + tlsVerify := false + + syncRegistryConfig := syncconf.RegistryConfig{ + Content: []syncconf.Content{ + { + Images: map[string][]string{ + "repo1": {"v1", "v2", "v3"}, // add some missing tags + "repo2": {"v1", "v2", "v3"}, + }, + }, + }, + URLs: []string{srcBaseURL}, + OnDemand: false, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + } + + defaultVal := true + syncConfig := &syncconf.Config{ + Enable: &defaultVal, + Registries: []syncconf.RegistryConfig{syncRegistryConfig}, + } + + dctlr, destBaseURL, _, destClient := makeDownstreamServer(t, false, syncConfig) + dctlr.Config.Log.Output = path.Join(dctlr.Config.Storage.RootDirectory, "sync.log") + + dcm := test.NewControllerManager(dctlr) + dcm.StartAndWait(dctlr.Config.HTTP.Port) + defer dcm.StopServer() + + found, err := test.ReadLogFileAndCountStringOccurence(dctlr.Config.Log.Output, + "sync: finished syncing specific images for repo", 100*time.Second, 2) + if err != nil { + panic(err) + } + + if !found { + data, err := os.ReadFile(dctlr.Config.Log.Output) + So(err, ShouldBeNil) + + t.Logf("downstream log: %s", string(data)) + + panic(err) + } + + resp, _ := destClient.R().Get(destBaseURL + "/v2/" + repo1 + "/manifests/" + "v1") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo1 + "/manifests/" + "v2") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo1 + "/manifests/" + "v3") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo2 + "/manifests/" + "v1") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo2 + "/manifests/" + "v2") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo2 + "/manifests/" + "v3") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + }) + + Convey("Verify sync specific images with tagRegex works", func() { + tlsVerify := false + regex := "v.*" + + syncRegistryConfig := syncconf.RegistryConfig{ + Content: []syncconf.Content{ + { + Images: map[string][]string{ + "repo1": {}, // add some missing tags + "repo2": {}, + }, + Tags: &syncconf.Tags{ + Regex: ®ex, + }, + }, + }, + URLs: []string{srcBaseURL}, + OnDemand: false, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + } + + defaultVal := true + syncConfig := &syncconf.Config{ + Enable: &defaultVal, + Registries: []syncconf.RegistryConfig{syncRegistryConfig}, + } + + dctlr, destBaseURL, _, destClient := makeDownstreamServer(t, false, syncConfig) + dctlr.Config.Log.Output = path.Join(dctlr.Config.Storage.RootDirectory, "sync.log") + + dcm := test.NewControllerManager(dctlr) + dcm.StartAndWait(dctlr.Config.HTTP.Port) + defer dcm.StopServer() + + found, err := test.ReadLogFileAndCountStringOccurence(dctlr.Config.Log.Output, + "sync: finished syncing specific images for repo", 100*time.Second, 2) + if err != nil { + panic(err) + } + + time.Sleep(1 * time.Second) + + if !found { + data, err := os.ReadFile(dctlr.Config.Log.Output) + So(err, ShouldBeNil) + + t.Logf("downstream log: %s", string(data)) + + panic(err) + } + + resp, _ := destClient.R().Get(destBaseURL + "/v2/" + repo1 + "/manifests/" + "v1") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo1 + "/manifests/" + "v2") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo1 + "/manifests/" + "v3") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo2 + "/manifests/" + "v1") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo2 + "/manifests/" + "v2") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo2 + "/manifests/" + "v3") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + }) + + Convey("Verify sync specific repo works", func() { + tlsVerify := false + + syncRegistryConfig := syncconf.RegistryConfig{ + Content: []syncconf.Content{ + { + Images: map[string][]string{ + "repo1": {}, + "repo2": {}, + }, + }, + }, + URLs: []string{srcBaseURL}, + OnDemand: false, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + } + + defaultVal := true + syncConfig := &syncconf.Config{ + Enable: &defaultVal, + Registries: []syncconf.RegistryConfig{syncRegistryConfig}, + } + + dctlr, destBaseURL, _, destClient := makeDownstreamServer(t, false, syncConfig) + dctlr.Config.Log.Output = path.Join(dctlr.Config.Storage.RootDirectory, "sync.log") + + dcm := test.NewControllerManager(dctlr) + dcm.StartAndWait(dctlr.Config.HTTP.Port) + defer dcm.StopServer() + + found, err := test.ReadLogFileAndCountStringOccurence(dctlr.Config.Log.Output, + "sync: finished syncing specific images for repo", 160*time.Second, 2) + if err != nil { + panic(err) + } + + time.Sleep(1 * time.Second) + + if !found { + data, err := os.ReadFile(dctlr.Config.Log.Output) + So(err, ShouldBeNil) + + t.Logf("downstream log: %s", string(data)) + + panic(err) + } + + resp, _ := destClient.R().Get(destBaseURL + "/v2/" + repo1 + "/manifests/" + "v1") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo1 + "/manifests/" + "v2") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo1 + "/manifests/" + "v3") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo2 + "/manifests/" + "v1") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo2 + "/manifests/" + "v2") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + resp, _ = destClient.R().Get(destBaseURL + "/v2/" + repo2 + "/manifests/" + "v3") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + }) + }) +} + func TestTLS(t *testing.T) { Convey("Verify sync TLS feature", t, func() { updateDuration, _ := time.ParseDuration("1h") @@ -4150,7 +4412,7 @@ func TestSignatures(t *testing.T) { syncRegistryConfig := syncconf.RegistryConfig{ Content: []syncconf.Content{ { - Prefix: "**", + Prefix: repoName, Tags: &syncconf.Tags{ Regex: ®ex, Semver: &semver, @@ -4293,6 +4555,14 @@ func TestSignatures(t *testing.T) { So(len(refs.References), ShouldEqual, 1) So(refs.References[0].Digest, ShouldEqual, ORASRefManifestDigest) + // try to get referrers for a non existing image (repo should be filtered out by content) + // get oci references from downstream, should be synced + getOCIReferrersURL = destBaseURL + path.Join("/v2", "test", "referrers", digest.String()) + resp, err = resty.R().Get(getOCIReferrersURL) + So(err, ShouldBeNil) + So(resp, ShouldNotBeEmpty) + So(resp.StatusCode(), ShouldEqual, http.StatusNotFound) + // test negative cases (trigger errors) // test notary signatures errors @@ -6088,10 +6358,6 @@ func TestSyncWithDestination(t *testing.T) { expected: "zot-fold/zot-test", content: syncconf.Content{Prefix: "zot-fold/zot-test", Destination: "/", StripPrefix: false}, }, - { - expected: "zot-test", - content: syncconf.Content{Prefix: "zot-fold/zot-test", Destination: "/zot-test", StripPrefix: true}, - }, { expected: "zot-test", content: syncconf.Content{Prefix: "zot-fold/*", Destination: "/", StripPrefix: true}, @@ -6718,5 +6984,7 @@ func waitSyncFinish(logPath string) bool { panic(err) } + time.Sleep(1 * time.Second) + return found } diff --git a/test/blackbox/helpers_wait.bash b/test/blackbox/helpers_wait.bash index 12b95e19b6..715f1ea8ef 100644 --- a/test/blackbox/helpers_wait.bash +++ b/test/blackbox/helpers_wait.bash @@ -8,6 +8,26 @@ function wait_for_string() { wait_str "$filepath" "$search_term" "$wait_time" } +function wait_for_string_count() { + local search_term="$1" + local filepath="$2" + local wait_time="${3:-60}" + local count="$4" + + wait_file "$filepath" 60 || { echo "server log file missing: '$filepath'"; return 1; } + + timeout_func $wait_time wait_string_count "$search_term" $filepath $count +} + +function wait_string_count() { + local search_term="$1" + local filepath="$2" + local count="$3" +while ! [[ $(grep -c "$search_term" "$filepath") -ge $count ]]; do + sleep 1 +done +} + function wait_str() { local filepath="$1" local search_term="$2" @@ -26,3 +46,15 @@ function wait_file() { until test $((wait_seconds--)) -eq 0 -o -f "$file" ; do sleep 1; done } + +function timeout_func() { + local cmd_pid sleep_pid retval + (shift; "$@") & # shift out sleep value and run rest as command in background job + cmd_pid=$! + (sleep "$1"; kill "$cmd_pid" 2>/dev/null) & + sleep_pid=$! + wait "$cmd_pid" + retval=$? + kill "$sleep_pid" 2>/dev/null + return "$retval" +} diff --git a/test/blackbox/sync_specific_images.bats b/test/blackbox/sync_specific_images.bats new file mode 100644 index 0000000000..42142c48e7 --- /dev/null +++ b/test/blackbox/sync_specific_images.bats @@ -0,0 +1,307 @@ +# Note: Intended to be run as "make run-blackbox-tests" or "make run-blackbox-ci" +# Makefile target installs & checks all necessary tooling +# Extra tools that are not covered in Makefile target needs to be added in verify_prerequisites() + +load helpers_zot +load helpers_wait + +function verify_prerequisites() { + if [ ! $(command -v curl) ]; then + echo "you need to install curl as a prerequisite to running the tests" >&3 + return 1 + fi + + if [ ! $(command -v jq) ]; then + echo "you need to install jq as a prerequisite to running the tests" >&3 + return 1 + fi + + if [ ! $(command -v docker) ]; then + echo "you need to install docker as a prerequisite to running the tests" >&3 + return 1 + fi + + return 0 +} + +function setup_file() { + # Verify prerequisites are available + if ! $(verify_prerequisites); then + exit 1 + fi + + # Setup zot server + local zot_root_dir=${BATS_FILE_TMPDIR}/zot + local zot_sync_config_file=${BATS_FILE_TMPDIR}/zot_sync_config.json + local ZOT_LOG_FILE=${BATS_FILE_TMPDIR}/zot.log + + mkdir -p ${zot_root_dir} + + cat >${zot_sync_config_file} <&3 + + start=`date +%s` + echo "waiting for sync to finish" >&3 + + wait_for_string_count "sync: finished syncing specific images" ${ZOT_LOG_FILE} 600 11 // repo:tag entries + + end=`date +%s` + + runtime=$((end-start)) + echo "sync finished in $runtime sec" >&3 +} + +# sync image +@test "check docker image list was synced" { + run skopeo --insecure-policy copy --multi-arch=all --src-tls-verify=false \ + docker://127.0.0.1:8090/registry \ + oci:${TEST_DATA_DIR} + [ "$status" -eq 0 ] + + run curl http://127.0.0.1:8090/v2/registry/tags/list + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.tags[]') = '"latest"' ] +} + +@test "check docker image was synced" { + run skopeo --insecure-policy copy --src-tls-verify=false \ + docker://127.0.0.1:8090/archlinux \ + oci:${TEST_DATA_DIR} + [ "$status" -eq 0 ] + + run curl http://127.0.0.1:8090/v2/archlinux/tags/list + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.tags[]') = '"latest"' ] +} + +@test "check k8s image list was synced" { + run skopeo --insecure-policy copy --multi-arch=all --src-tls-verify=false \ + docker://127.0.0.1:8090/kube-apiserver:v1.26.0 \ + oci:${TEST_DATA_DIR} + [ "$status" -eq 0 ] + + run curl http://127.0.0.1:8090/v2/kube-apiserver/tags/list + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.tags[]') = '"v1.26.0"' ] +} + +@test "check k8s image was synced" { + run skopeo --insecure-policy copy --src-tls-verify=false \ + docker://127.0.0.1:8090/pause \ + oci:${TEST_DATA_DIR} + [ "$status" -eq 0 ] + + run curl http://127.0.0.1:8090/v2/pause/tags/list + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.tags[]') = '"latest"' ] +} + +@test "check registry.k8s.io image was synced" { + run skopeo copy docker://127.0.0.1:8090/kube-apiserver-amd64:v1.10.0 oci:${TEST_DATA_DIR} --src-tls-verify=false + [ "$status" -eq 0 ] + + run curl http://127.0.0.1:8090/v2/kube-apiserver-amd64/tags/list + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.tags[]') = '"v1.10.0"' ] +} + +@test "check aws.amazon.com/ecr images was synced" { + run skopeo copy docker://127.0.0.1:8090/amazonlinux/amazonlinux:latest oci:${TEST_DATA_DIR} --src-tls-verify=false + [ "$status" -eq 0 ] + + run curl http://127.0.0.1:8090/v2/amazonlinux/amazonlinux/tags/list + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.tags[]') = '"latest"' ] +} + +@test "check gcr.io image was synced" { + run skopeo copy docker://127.0.0.1:8090/google-containers/kube-proxy-amd64:v1.17.9 oci:${TEST_DATA_DIR} --src-tls-verify=false + [ "$status" -eq 0 ] + + run curl http://127.0.0.1:8090/v2/google-containers/kube-proxy-amd64/tags/list + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.tags[]') = '"v1.17.9"' ] +} + +@test "check mcr.microsoft.com image was synced" { + run skopeo copy docker://127.0.0.1:8090/azure-cognitive-services/vision/spatial-analysis/diagnostics:latest oci:${TEST_DATA_DIR} --src-tls-verify=false + [ "$status" -eq 0 ] + + run curl http://127.0.0.1:8090/v2/azure-cognitive-services/vision/spatial-analysis/diagnostics/tags/list + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.tags[]') = '"latest"' ] +} + +@test "check registry.gitlab.com image was synced" { + run skopeo copy docker://127.0.0.1:8090/gitlab-org/public-image-archive/gitlab-ee:15.11.6-ee.0 oci:${TEST_DATA_DIR} --src-tls-verify=false + [ "$status" -eq 0 ] + + run curl http://127.0.0.1:8090/v2/gitlab-org/public-image-archive/gitlab-ee/tags/list + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.tags[]') = '"15.11.6-ee.0"' ] +} + +@test "check quay.io image was synced" { + run skopeo copy docker://127.0.0.1:8090/coreos/etcd:v3.4.26 oci:${TEST_DATA_DIR} --src-tls-verify=false + [ "$status" -eq 0 ] + + run curl http://127.0.0.1:8090/v2/coreos/etcd/tags/list + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.tags[]') = '"v3.4.26"' ] +} + +@test "check ghcr.io image was synced" { + run skopeo copy docker://127.0.0.1:8090/project-zot/zot-linux-amd64:v2.0.0-rc5 oci:${TEST_DATA_DIR} --src-tls-verify=false + [ "$status" -eq 0 ] + + run curl http://127.0.0.1:8090/v2/project-zot/zot-linux-amd64/tags/list + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.tags[]') = '"v2.0.0-rc5"' ] +}