Skip to content

Commit

Permalink
fix(sync): various fixes for s3+remote storage feature
Browse files Browse the repository at this point in the history
Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Nov 22, 2023
1 parent 5242f18 commit 56d4ef0
Show file tree
Hide file tree
Showing 9 changed files with 799 additions and 62 deletions.
131 changes: 131 additions & 0 deletions pkg/cli/server/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1637,3 +1637,134 @@ func TestOverlappingSyncRetentionConfig(t *testing.T) {
So(string(data), ShouldContainSubstring, "overlapping sync content\":{\"Prefix\":\"prod/*")
})
}

func TestSyncWithRemoteStorageConfig(t *testing.T) {
oldArgs := os.Args

defer func() { os.Args = oldArgs }()

Convey("Test verify sync with remote storage works if sync.tmpdir is provided", t, func(c C) {
tmpfile, err := os.CreateTemp("", "zot-test*.json")
So(err, ShouldBeNil)
defer os.Remove(tmpfile.Name()) // clean up

content := `{
"distSpecVersion": "1.1.0-dev",
"storage": {
"rootDirectory": "%s",
"dedupe": false,
"remoteCache": false,
"storageDriver": {
"name": "s3",
"rootdirectory": "/zot",
"region": "us-east-2",
"regionendpoint": "localhost:4566",
"bucket": "zot-storage",
"secure": false,
"skipverify": false
}
},
"http": {
"address": "0.0.0.0",
"port": "%s"
},
"log": {
"level": "debug",
"output": "%s"
},
"extensions": {
"sync": {
"tmpDir": "/tmp/sync",
"registries": [
{
"urls": [
"http://localhost:9000"
],
"onDemand": true,
"tlsVerify": false,
"content": [
{
"prefix": "**"
}
]
}
]
}
}
}`

logPath, err := runCLIWithConfig(t.TempDir(), content)
So(err, ShouldBeNil)

data, err := os.ReadFile(logPath)
So(err, ShouldBeNil)
defer os.Remove(logPath) // clean up
So(string(data), ShouldNotContainSubstring,
"using both sync and remote storage features needs config.Extensions.Sync.TmpDir to be specified")
})

Convey("Test verify sync with remote storage panics if sync.tmpdir is not provided", t, func(c C) {
port := GetFreePort()
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up

tmpfile, err := os.CreateTemp("", "zot-test*.json")
So(err, ShouldBeNil)
defer os.Remove(tmpfile.Name()) // clean up
content := fmt.Sprintf(`{
"distSpecVersion": "1.1.0-dev",
"storage": {
"rootDirectory": "%s",
"dedupe": false,
"remoteCache": false,
"storageDriver": {
"name": "s3",
"rootdirectory": "/zot",
"region": "us-east-2",
"regionendpoint": "localhost:4566",
"bucket": "zot-storage",
"secure": false,
"skipverify": false
}
},
"http": {
"address": "0.0.0.0",
"port": "%s"
},
"log": {
"level": "debug",
"output": "%s"
},
"extensions": {
"sync": {
"registries": [
{
"urls": [
"http://localhost:9000"
],
"onDemand": true,
"tlsVerify": false,
"content": [
{
"prefix": "**"
}
]
}
]
}
}
}`, t.TempDir(), port, logFile.Name())

err = os.WriteFile(tmpfile.Name(), []byte(content), 0o0600)
So(err, ShouldBeNil)

So(func() { _ = cli.NewServerRootCmd().Execute() }, ShouldPanic)

data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up
So(string(data), ShouldContainSubstring,
"using both sync and remote storage features needs config.Extensions.Sync.TmpDir to be specified")
})
}
15 changes: 15 additions & 0 deletions pkg/cli/server/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,13 @@ func validateConfiguration(config *config.Config, log zlog.Logger) error {
return zerr.ErrBadConfig
}

// enforce tmpDir in case sync + s3
if config.Extensions != nil && config.Extensions.Sync != nil && config.Extensions.Sync.TmpDir == "" {
log.Error().Err(zerr.ErrBadConfig).
Msg("using both sync and remote storage features needs config.Extensions.Sync.TmpDir to be specified")

return zerr.ErrBadConfig
}
}

// enforce s3 driver on subpaths in case of using storage driver
Expand All @@ -396,6 +403,14 @@ func validateConfiguration(config *config.Config, log zlog.Logger) error {

return zerr.ErrBadConfig
}

// enforce tmpDir in case sync + s3
if config.Extensions != nil && config.Extensions.Sync != nil && config.Extensions.Sync.TmpDir == "" {
log.Error().Err(zerr.ErrBadConfig).
Msg("using both sync and remote storage features needs config.Extensions.Sync.TmpDir to be specified")

return zerr.ErrBadConfig
}
}
}
}
Expand Down
9 changes: 2 additions & 7 deletions pkg/extensions/extension_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package extensions
import (
"net"
"net/url"
"os"
"strings"

zerr "zotregistry.io/zot/errors"
Expand Down Expand Up @@ -47,13 +46,9 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
}

tmpDir := config.Extensions.Sync.TmpDir
if tmpDir == "" {
// use an os tmpdir as tmpdir if not set
tmpDir = os.TempDir()
}
credsPath := config.Extensions.Sync.CredentialsFile

service, err := sync.New(registryConfig, config.Extensions.Sync.CredentialsFile, tmpDir,
storeController, metaDB, log)
service, err := sync.New(registryConfig, credsPath, tmpDir, storeController, metaDB, log)
if err != nil {
return nil, err
}
Expand Down
20 changes: 9 additions & 11 deletions pkg/extensions/sync/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,18 @@ type DestinationRegistry struct {
}

func NewDestinationRegistry(
storeController storage.StoreController,
tmpStorage OciLayoutStorage,
storeController storage.StoreController, // local store controller
tempStoreController storage.StoreController, // temp store controller
metaDB mTypes.MetaDB,
log log.Logger,
) Destination {
if tmpStorage == nil {
// to allow passing nil we can do this, noting that it will only work for a local StoreController
tmpStorage = NewOciLayoutStorage(storeController)
}
return &DestinationRegistry{
storeController: storeController,
tempStorage: NewOciLayoutStorage(tempStoreController),
metaDB: metaDB,
// first we sync from remote (using containers/image copy from docker:// to oci:) to a temp imageStore
// then we copy the image from tempStorage to zot's storage using ImageStore APIs
tempStorage: tmpStorage,
log: log,
log: log,
}
}

Expand Down Expand Up @@ -288,9 +284,11 @@ func getImageStoreFromImageReference(imageReference types.ImageReference, repo,
tempRootDir = strings.ReplaceAll(imageReference.StringWithinTransport(), fmt.Sprintf("%s:", repo), "")
}

metrics := monitoring.NewMetricsServer(false, log.Logger{})
return getImageStore(tempRootDir)
}

tempImageStore := local.NewImageStore(tempRootDir, false, false, log.Logger{}, metrics, nil, nil)
func getImageStore(rootDir string) storageTypes.ImageStore {
metrics := monitoring.NewMetricsServer(false, log.Logger{})

return tempImageStore
return local.NewImageStore(rootDir, false, false, log.Logger{}, metrics, nil, nil)
}
27 changes: 17 additions & 10 deletions pkg/extensions/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"zotregistry.io/zot/pkg/log"
mTypes "zotregistry.io/zot/pkg/meta/types"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/local"
)

type BaseService struct {
Expand Down Expand Up @@ -67,13 +66,20 @@ func New(

service.contentManager = NewContentManager(opts.Content, log)

tmpImageStore := local.NewImageStore(tmpDir,
false, false, log, nil, nil, nil,
)

tmpStorage := NewOciLayoutStorage(storage.StoreController{DefaultStore: tmpImageStore})

service.destination = NewDestinationRegistry(storeController, tmpStorage, metadb, log)
if len(tmpDir) == 0 {
// first it will sync in tmpDir then it will move everything into local ImageStore
service.destination = NewDestinationRegistry(storeController, storeController, metadb, log)
} else {
// first it will sync under /rootDir/reponame/.sync/ then it will move everything into local ImageStore
service.destination = NewDestinationRegistry(
storeController,
storage.StoreController{
DefaultStore: getImageStore(tmpDir),
},
metadb,
log,
)
}

retryOptions := &retry.RetryOptions{}

Expand Down Expand Up @@ -140,7 +146,7 @@ func (service *BaseService) SetNextAvailableClient() error {
if err != nil {
service.log.Error().Err(err).Str("url", url).Msg("sync: failed to initialize http client")

continue
return err
}

if !service.client.Ping() {
Expand Down Expand Up @@ -355,7 +361,8 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
return nil
}

func (service *BaseService) syncTag(ctx context.Context, destinationRepo, remoteRepo, tag string) (digest.Digest, error) {
func (service *BaseService) syncTag(ctx context.Context, destinationRepo, remoteRepo, tag string,
) (digest.Digest, error) {
copyOptions := getCopyOptions(service.remote.GetContext(), service.destination.GetContext())

policyContext, err := getPolicyContext(service.log)
Expand Down
12 changes: 8 additions & 4 deletions pkg/extensions/sync/sync_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ func TestDestinationRegistry(t *testing.T) {
syncImgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver)
repoName := "repo"

registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, nil, log)
storeController := storage.StoreController{DefaultStore: syncImgStore}
registry := NewDestinationRegistry(storeController, storeController, nil, log)
imageReference, err := registry.GetImageReference(repoName, "1.0")
So(err, ShouldBeNil)
So(imageReference, ShouldNotBeNil)
Expand Down Expand Up @@ -302,7 +303,8 @@ func TestDestinationRegistry(t *testing.T) {
syncImgStore := local.NewImageStore(dir, true, true, log, metrics, linter, cacheDriver)
repoName := "repo"

registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, nil, log)
storeController := storage.StoreController{DefaultStore: syncImgStore}
registry := NewDestinationRegistry(storeController, storeController, nil, log)

err = registry.CommitImage(imageReference, repoName, "1.0")
So(err, ShouldBeNil)
Expand Down Expand Up @@ -336,7 +338,8 @@ func TestDestinationRegistry(t *testing.T) {
})

Convey("trigger metaDB error on index manifest in CommitImage()", func() {
registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, mocks.MetaDBMock{
storeController := storage.StoreController{DefaultStore: syncImgStore}
registry := NewDestinationRegistry(storeController, storeController, mocks.MetaDBMock{
SetRepoReferenceFn: func(ctx context.Context, repo string, reference string, imageMeta mTypes.ImageMeta) error {
if reference == "1.0" {
return zerr.ErrRepoMetaNotFound
Expand All @@ -351,7 +354,8 @@ func TestDestinationRegistry(t *testing.T) {
})

Convey("trigger metaDB error on image manifest in CommitImage()", func() {
registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, mocks.MetaDBMock{
storeController := storage.StoreController{DefaultStore: syncImgStore}
registry := NewDestinationRegistry(storeController, storeController, mocks.MetaDBMock{
SetRepoReferenceFn: func(ctx context.Context, repo, reference string, imageMeta mTypes.ImageMeta) error {
return zerr.ErrRepoMetaNotFound
},
Expand Down
2 changes: 1 addition & 1 deletion test/blackbox/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ PATH=$PATH:${SCRIPTPATH}/../../hack/tools/bin

tests=("pushpull" "pushpull_authn" "delete_images" "referrers" "metadata" "anonymous_policy"
"annotations" "detect_manifest_collision" "cve" "sync" "sync_docker" "sync_replica_cluster"
"scrub" "garbage_collect" "metrics" "metrics_minimal")
"sync_remote" "scrub" "garbage_collect" "metrics" "metrics_minimal")

for test in ${tests[*]}; do
${BATS} ${BATS_FLAGS} ${SCRIPTPATH}/${test}.bats > ${test}.log & pids+=($!)
Expand Down
Loading

0 comments on commit 56d4ef0

Please sign in to comment.