Skip to content

Commit

Permalink
feat: support local tarball for nydusify copy
Browse files Browse the repository at this point in the history
Signed-off-by: BruceAko <[email protected]>
  • Loading branch information
BruceAko committed Sep 4, 2024
1 parent 114ec88 commit 0e0c898
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 19 deletions.
122 changes: 122 additions & 0 deletions contrib/nydusify/pkg/converter/provider/ported.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,39 @@ package provider

import (
"context"
"encoding/json"
"fmt"
"io"
"strings"

"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/archive"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/errdefs"
"github.com/opencontainers/go-digest"

// nolint:staticcheck
"github.com/containerd/containerd/remotes/docker/schema1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/semaphore"
)

type importOpts struct {
indexName string
imageRefT func(string) string
dgstRefT func(digest.Digest) string
skipDgstRef func(string) bool
platformMatcher platforms.MatchComparer
compress bool
discardLayers bool
skipMissing bool
imageLabels map[string]string
}

// Ported from containerd project, copyright The containerd Authors.
// github.com/containerd/containerd/blob/main/pull.go
func fetch(ctx context.Context, store content.Store, rCtx *containerd.RemoteContext, ref string, limit int) (images.Image, error) {
Expand Down Expand Up @@ -177,3 +194,108 @@ func push(ctx context.Context, store content.Store, pushCtx *containerd.RemoteCo

return remotes.PushContent(ctx, pusher, desc, store, limiter, pushCtx.PlatformMatcher, wrapper)
}

// Ported from containerd project, copyright The containerd Authors.
// github.com/containerd/containerd/blob/main/import.go
func load(ctx context.Context, reader io.Reader, store content.Store, iopts importOpts) ([]images.Image, error) {
var aio []archive.ImportOpt
if iopts.compress {
aio = append(aio, archive.WithImportCompression())
}

index, err := archive.ImportIndex(ctx, store, reader, aio...)
if err != nil {
return nil, err
}

var imgs []images.Image

if iopts.indexName != "" {
imgs = append(imgs, images.Image{
Name: iopts.indexName,
Target: index,
})
}
var platformMatcher = iopts.platformMatcher

var handler images.HandlerFunc = func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
// Only save images at top level
if desc.Digest != index.Digest {
// Don't set labels on missing content.
children, err := images.Children(ctx, store, desc)
if iopts.skipMissing && errdefs.IsNotFound(err) {
return nil, images.ErrSkipDesc
}
return children, err
}

var idx ocispec.Index
p, err := content.ReadBlob(ctx, store, desc)
if err != nil {
return nil, err
}
if err := json.Unmarshal(p, &idx); err != nil {
return nil, err
}

for _, m := range idx.Manifests {
name := imageName(m.Annotations, iopts.imageRefT)
if name != "" {
imgs = append(imgs, images.Image{
Name: name,
Target: m,
})
}
if iopts.skipDgstRef != nil {
if iopts.skipDgstRef(name) {
continue
}
}
if iopts.dgstRefT != nil {
ref := iopts.dgstRefT(m.Digest)
if ref != "" {
imgs = append(imgs, images.Image{
Name: ref,
Target: m,
})
}
}
}

return idx.Manifests, nil
}

handler = images.FilterPlatforms(handler, platformMatcher)
if iopts.discardLayers {
handler = images.SetChildrenMappedLabels(store, handler, images.ChildGCLabelsFilterLayers)
} else {
handler = images.SetChildrenLabels(store, handler)
}
if err := images.WalkNotEmpty(ctx, handler, index); err != nil {
return nil, err
}

for i := range imgs {
fieldsPath := []string{"target"}
if iopts.imageLabels != nil {
fieldsPath = append(fieldsPath, "labels")
imgs[i].Labels = iopts.imageLabels
}
}

return imgs, nil
}

func imageName(annotations map[string]string, ociCleanup func(string) string) string {
name := annotations[images.AnnotationImageName]
if name != "" {
return name
}
name = annotations[ocispec.AnnotationRefName]
if name != "" {
if ociCleanup != nil {
name = ociCleanup(name)
}
}
return name
}
34 changes: 34 additions & 0 deletions contrib/nydusify/pkg/converter/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package provider
import (
"context"
"crypto/tls"
"io"
"net"
"net/http"
"os"
Expand All @@ -17,13 +18,16 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images/archive"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/goharbor/acceleration-service/pkg/cache"
accelcontent "github.com/goharbor/acceleration-service/pkg/content"
"github.com/goharbor/acceleration-service/pkg/remote"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

var LayerConcurrentLimit = 5
Expand Down Expand Up @@ -152,6 +156,36 @@ func (pvd *Provider) Push(ctx context.Context, desc ocispec.Descriptor, ref stri
return push(ctx, pvd.store, rc, desc, ref)
}

func (pvd *Provider) Import(ctx context.Context, reader io.Reader) (string, error) {
iopts := importOpts{
dgstRefT: func(dgst digest.Digest) string {
return "nydus" + "@" + dgst.String()
},
skipDgstRef: func(name string) bool { return name != "" },
platformMatcher: pvd.platformMC,
}
images, err := load(ctx, reader, pvd.store, iopts)
if err != nil {
return "", err
}

if len(images) != 1 {
return "", errors.New("importing multiple images")
}
image := images[0]

pvd.mutex.Lock()
defer pvd.mutex.Unlock()
pvd.images[image.Name] = &image.Target

return image.Name, nil
}

func (pvd *Provider) Export(ctx context.Context, writer io.Writer, img *ocispec.Descriptor, name string) error {
opts := []archive.ExportOpt{archive.WithManifest(*img, name), archive.WithPlatform(pvd.platformMC)}
return archive.Export(ctx, pvd.store, writer, opts...)
}

func (pvd *Provider) Image(_ context.Context, ref string) (*ocispec.Descriptor, error) {
pvd.mutex.Lock()
defer pvd.mutex.Unlock()
Expand Down
76 changes: 57 additions & 19 deletions contrib/nydusify/pkg/copier/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"path/filepath"
"strings"

"github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
containerdErrdefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
Expand Down Expand Up @@ -285,41 +286,78 @@ func Copy(ctx context.Context, opt Opt) error {
}
defer os.RemoveAll(tmpDir)

sourceNamed, err := docker.ParseDockerRef(opt.Source)
if err != nil {
return errors.Wrap(err, "parse source reference")
}
targetNamed, err := docker.ParseDockerRef(opt.Target)
if err != nil {
return errors.Wrap(err, "parse target reference")
}
source := sourceNamed.String()
target := targetNamed.String()
var source string
if strings.HasPrefix(opt.Source, "file:///") {
inputPath := strings.TrimPrefix(opt.Source, "file:///")

logrus.Infof("importing source image from %s", inputPath)
f, err := os.Open(inputPath)
if err != nil {
return err
}
defer f.Close()
decompressor, err := compression.DecompressStream(f)
if err != nil {
return err
}
if source, err = pvd.Import(ctx, decompressor); err != nil {
return errors.Wrap(err, "import source image")
}
logrus.Infof("imported source image %s", source)
} else {
sourceNamed, err := docker.ParseDockerRef(opt.Source)
if err != nil {
return errors.Wrap(err, "parse source reference")
}
source = sourceNamed.String()

logrus.Infof("pulling source image %s", source)
if err := pvd.Pull(ctx, source); err != nil {
if errdefs.NeedsRetryWithHTTP(err) {
pvd.UsePlainHTTP()
if err := pvd.Pull(ctx, source); err != nil {
return errors.Wrap(err, "try to pull image")
logrus.Infof("pulling source image %s", source)
if err := pvd.Pull(ctx, source); err != nil {
if errdefs.NeedsRetryWithHTTP(err) {
pvd.UsePlainHTTP()
if err := pvd.Pull(ctx, source); err != nil {
return errors.Wrap(err, "try to pull image")
}
} else {
return errors.Wrap(err, "pull source image")
}
} else {
return errors.Wrap(err, "pull source image")
}
logrus.Infof("pulled source image %s", source)
}
logrus.Infof("pulled source image %s", source)

sourceImage, err := pvd.Image(ctx, source)
if err != nil {
return errors.Wrap(err, "find image from store")
}

if strings.HasPrefix(opt.Target, "file:///") {
outputPath := strings.TrimPrefix(opt.Target, "file:///")

logrus.Infof("exporting source image to %s", outputPath)
f, err := os.OpenFile(outputPath, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer f.Close()
if err := pvd.Export(ctx, f, sourceImage, source); err != nil {
return errors.Wrap(err, "export source image to target tar file")
}
logrus.Infof("exported image %s", source)
return nil
}

sourceDescs, err := utils.GetManifests(ctx, pvd.ContentStore(), *sourceImage, platformMC)
if err != nil {
return errors.Wrap(err, "get image manifests")
}
targetDescs := make([]ocispec.Descriptor, len(sourceDescs))

targetNamed, err := docker.ParseDockerRef(opt.Target)
if err != nil {
return errors.Wrap(err, "parse target reference")
}
target := targetNamed.String()

sem := semaphore.NewWeighted(1)
eg := errgroup.Group{}
for idx := range sourceDescs {
Expand Down

0 comments on commit 0e0c898

Please sign in to comment.