Skip to content

Commit

Permalink
Support layer deltas
Browse files Browse the repository at this point in the history
Deltas are a way to avoid downloading a full copy if a layer tar file
if you have a previous version of the layer available locally. In
testing these deltas have been shown to be around 10x to 100x smaller
than the .tar.gz files for typical Linux base images.

In the typical client-side case we have some previous version of the
image stored in container-storage somewhere, which means that we have
an uncompressed files available, but not the actual tarball
(compressed or not).

This means we can use github.com/containers/tar-diff which takes
two tar files and produces a delta file which when applied on the
untar:ed content of the first tarfile produces the (bitwise identical)
content of the uncompressed second tarfile. It just happens that the
uncompressed tarfile is exactly what we need to reproduce, because that
is how the layers are refered to in the image config (the DiffIDs).

How this works is that we use OCI artifacts to store, for each regular
image a manifest with information about the available deltas for the
image.  This image looks like a regular manifest, except each layer
contains a tar-diff (as a blob) an uses the existing annotations key
to record which DiffIDs the layer applies to.

For example, a manifest would look like this:

```
{
  "schemaVersion": 2,
  "config": {
    "mediaType": "application/vnd.oci.image.config.v1+json",
    "digest": "sha256:<config file hash>",
    "size": 3
  },
  "annotations": {
        "io.github.containers.delta.target": "sha256:<image_manifest_hash>",
  },
  "layers": [
    {
      "mediaType": "application/vnd.tar-diff",
      "digest": "sha256:<tar-diff delta file hash>",
      "size": 7059734,
      "annotations": {
          "io.github.containers.delta.from": "sha256:<old layer diffid>",
          "io.github.containers.delta.to": "sha256:<new layer diffid>"
      }
    }
  ]
}
```

The config blob is just an json file containing "{}". Ideally it
should not be of type application/vnd.oci.image.config.v1+json,
because that is reserved for docker-style images. However, docker hub
(and other registries) currently don't support any other type. For
registries that support OCI artifacts we should instead use some other
type so that tooling can know that this is not a regular image.

The way we attach the delta manifest to the image is that we store it
in the same repo and the we use a single tag named `_deltaindex`
pointing to an index with all the delta manifest in the repository,
with the digest of each target image in the
`io.github.containers.delta.target` annotation key.

The delta layers record which DiffID they apply to, which is what we
want to use to look up the pre-existing layers to use as delta source
material, and it is what the delta apply will generate. This means
however that using the deltas only works if we're allowed to
substitute blobs, but this doesn't seem to be an issue in the typical
case.

Signed-off-by: Alexander Larsson <[email protected]>
  • Loading branch information
alexlarsson committed May 26, 2020
1 parent a87ed9b commit ef12614
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 6 deletions.
144 changes: 139 additions & 5 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"reflect"
"sort"
"strings"
"sync"
"time"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/containers/image/v5/types"
"github.com/containers/ocicrypt"
encconfig "github.com/containers/ocicrypt/config"
"github.com/containers/tar-diff/pkg/tar-patch"
digest "github.com/opencontainers/go-digest"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
Expand Down Expand Up @@ -790,6 +792,11 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
srcInfosUpdated = true
}

deltaLayers, err := types.ImageDeltaLayers(ic.src, ctx)
if err != nil {
return err
}

type copyLayerData struct {
destInfo types.BlobInfo
diffID digest.Digest
Expand All @@ -809,7 +816,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
}

data := make([]copyLayerData, numLayers)
copyLayerHelper := func(index int, srcLayer types.BlobInfo, toEncrypt bool, pool *mpb.Progress) {
copyLayerHelper := func(index int, srcLayer types.BlobInfo, toEncrypt bool, pool *mpb.Progress, deltaLayers []types.BlobInfo) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
cld := copyLayerData{}
Expand All @@ -824,7 +831,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name())
}
} else {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, toEncrypt, pool)
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, index, srcLayer, toEncrypt, pool, deltaLayers)
}
data[index] = cld
}
Expand Down Expand Up @@ -861,7 +868,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
return errors.Wrapf(err, "Can't acquire semaphore")
}
copyGroup.Add(1)
go copyLayerHelper(i, srcLayer, encLayerBitmap[i], progressPool)
go copyLayerHelper(i, srcLayer, encLayerBitmap[i], progressPool, deltaLayers)
}

// A call to copyGroup.Wait() is done at this point by the defer above.
Expand Down Expand Up @@ -1043,9 +1050,83 @@ type diffIDResult struct {
err error
}

// Get all the deltas that apply to this layer
func (ic *imageCopier) getMatchingDeltaLayers(ctx context.Context, srcIndex int, deltaLayers []types.BlobInfo) (digest.Digest, []*types.BlobInfo) {
if deltaLayers == nil {
return "", nil
}
config, _ := ic.src.OCIConfig(ctx)
if config == nil || config.RootFS.DiffIDs == nil || len(config.RootFS.DiffIDs) <= srcIndex {
return "", nil
}

layerDiffId := config.RootFS.DiffIDs[srcIndex]

var matchingLayers []*types.BlobInfo
for i := range deltaLayers {
deltaLayer := &deltaLayers[i]
to := deltaLayer.Annotations["io.github.containers.delta.to"]
if to == layerDiffId.String() {
matchingLayers = append(matchingLayers, deltaLayer)
}
}

return layerDiffId, matchingLayers
}

// Looks at which of the matching delta froms have locally available data and picks the best one
func (ic *imageCopier) resolveDeltaLayer(ctx context.Context, matchingDeltas []*types.BlobInfo) (io.ReadCloser, tar_patch.DataSource, types.BlobInfo, error) {
// Sort smallest deltas so we favour the smallest useable one
sort.Slice(matchingDeltas, func(i, j int) bool {
return matchingDeltas[i].Size < matchingDeltas[j].Size
})

for i := range matchingDeltas {
matchingDelta := matchingDeltas[i]
from := matchingDelta.Annotations["io.github.containers.delta.from"]
fromDigest, err := digest.Parse(from)
if err != nil {
continue // Silently ignore if server specified a werid format
}

dataSource, err := types.ImageDestinationGetLayerDeltaData(ic.c.dest, ctx, fromDigest)
if err != nil {
return nil, nil, types.BlobInfo{}, err // Internal error
}
if dataSource == nil {
continue // from layer doesn't exist
}

logrus.Debugf("Using delta %v for DiffID %v", matchingDelta.Digest, fromDigest)

deltaStream, _, err := ic.c.rawSource.GetBlob(ctx, *matchingDelta, ic.c.blobInfoCache)
if err != nil {
return nil, nil, types.BlobInfo{}, errors.Wrapf(err, "Error reading delta blob %s", matchingDelta.Digest)
}
return deltaStream, dataSource, *matchingDelta, nil
}
return nil, nil, types.BlobInfo{}, nil
}

func (ic *imageCopier) canUseDeltas(srcInfo types.BlobInfo) (bool, string) {
// Deltas rewrite the manifest to refer to the uncompressed digest, so we must be able to substiture blobs
if !ic.canSubstituteBlobs {
return false, ""
}

switch srcInfo.MediaType {
case manifest.DockerV2Schema2LayerMediaType, manifest.DockerV2SchemaLayerMediaTypeUncompressed:
return true, manifest.DockerV2SchemaLayerMediaTypeUncompressed
case imgspecv1.MediaTypeImageLayer, imgspecv1.MediaTypeImageLayerGzip, imgspecv1.MediaTypeImageLayerZstd:
return true, imgspecv1.MediaTypeImageLayer
}

return false, ""
}

// copyLayer copies a layer with srcInfo (with known Digest and Annotations and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
func (ic *imageCopier) copyLayer(ctx context.Context, srcIndex int, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress, deltaLayers []types.BlobInfo) (types.BlobInfo, digest.Digest, error) {
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
// Diffs are needed if we are encrypting an image or trying to decrypt an image
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" || toEncrypt || (isOciEncrypted(srcInfo.MediaType) && ic.ociDecryptConfig != nil)
Expand All @@ -1064,6 +1145,52 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
}

// First look for a delta that matches this layer and substitute the result of that
if ok, deltaResultMediaType := ic.canUseDeltas(srcInfo); ok {
// Get deltas going TO this layer
deltaDiffID, matchingDeltas := ic.getMatchingDeltaLayers(ctx, srcIndex, deltaLayers)
// Get best possible FROM delta
deltaStream, deltaDataSource, matchingDelta, err := ic.resolveDeltaLayer(ctx, matchingDeltas)
if err != nil {
return types.BlobInfo{}, "", err
}
if deltaStream != nil {
bar := ic.c.createProgressBar(pool, matchingDelta, "delta", "done")

wrappedDeltaStream := bar.ProxyReader(deltaStream)

// Convert deltaStream to uncompressed tar layer stream
pr, pw := io.Pipe()
go func() {
if err := tar_patch.Apply(wrappedDeltaStream, deltaDataSource, pw); err != nil {
// We will notice this error when failing to verify the digest, so leave it be
logrus.Infof("Failed to apply layer delta: %v", err)
}
deltaDataSource.Close()
deltaStream.Close()
wrappedDeltaStream.Close()
pw.Close()
}()
defer pr.Close()

// Copy uncompressed tar layer to destination, verifying the diffID
blobInfo, err := ic.c.copyBlobFromStream(ctx, pr, types.BlobInfo{Digest: deltaDiffID, Size: -1, MediaType: deltaResultMediaType, Annotations: srcInfo.Annotations}, nil, ic.canModifyManifest, false, toEncrypt, nil)
if err != nil {
return types.BlobInfo{}, "", err
}

bar.SetTotal(matchingDelta.Size, true)

// We verified this when streaming the applied delta above
diffID := deltaDiffID

// Record the fact that this blob is uncompressed
ic.c.blobInfoCache.RecordDigestUncompressedPair(diffID, diffID)

return blobInfo, diffID, err
}
}

// Fallback: copy the layer, computing the diffID if we need to do so
srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo, ic.c.blobInfoCache)
if err != nil {
Expand Down Expand Up @@ -1213,7 +1340,9 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest)
}
isCompressed := decompressor != nil
destStream = bar.ProxyReader(destStream)
if bar != nil {
destStream = bar.ProxyReader(destStream)
}

// === Send a copy of the original, uncompressed, stream, to a separate path if necessary.
var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so.
Expand All @@ -1232,6 +1361,11 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
logrus.Debugf("Using original blob without modification for encrypted blob")
compressionOperation = types.PreserveOriginal
inputInfo = srcInfo
} else if canModifyBlob && manifest.IsNoCompressType(srcInfo.MediaType) {
// This is a blob we should not repack, such as a delta
logrus.Debugf("Using original blob without modification for no-compress type")
compressionOperation = types.PreserveOriginal
inputInfo = srcInfo
} else if canModifyBlob && c.dest.DesiredLayerCompression() == types.Compress && !isCompressed {
logrus.Debugf("Compressing blob on the fly")
compressionOperation = types.Compress
Expand Down
10 changes: 10 additions & 0 deletions copy/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/types"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -77,6 +78,15 @@ func (f fakeImageSource) SupportsEncryption(ctx context.Context) bool {
func (f fakeImageSource) Size() (int64, error) {
panic("Unexpected call to a mock function")
}
func (f fakeImageSource) GetDeltaManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) {
panic("Unexpected call to a mock function")
}
func (f fakeImageSource) GetDeltaIndex(ctx context.Context) (types.ImageReference, error) {
panic("Unexpected call to a mock function")
}
func (f fakeImageSource) DeltaLayers(ctx context.Context) ([]types.BlobInfo, error) {
panic("Unexpected call to a mock function")
}

func TestDetermineManifestConversion(t *testing.T) {
supportS1S2OCI := []string{
Expand Down
38 changes: 38 additions & 0 deletions docker/docker_image_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,44 @@ func (s *dockerImageSource) fetchManifest(ctx context.Context, tagOrDigest strin
return manblob, simplifyContentType(res.Header.Get("Content-Type")), nil
}

func (s *dockerImageSource) GetDeltaManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) {
// Get the real manifest digest
srcManifestDigest, err := s.manifestDigest(ctx, instanceDigest)
if err != nil {
return nil, "", err
}

// Load the delta manifest index
ib, _, err := s.fetchManifest(ctx, "_deltaindex")
// Don't return error if the manifest doesn't exist, only for internal errors
// Deltas are an optional optimization anyway
if err == nil {
index, err := manifest.OCI1IndexFromManifest(ib)
if err != nil {
return nil, "", err
}

// Look up the delta manifest in the index by the real manifest digest
for _, manifest := range index.Manifests {
if manifest.Annotations["io.github.containers.delta.target"] == srcManifestDigest.String() {
return s.fetchManifest(ctx, manifest.Digest.String())
}
}
}

// No delta
return nil, "", nil
}

func (s *dockerImageSource) GetDeltaIndex(ctx context.Context) (types.ImageReference, error) {
deltaRef, err := reference.WithTag(s.logicalRef.ref, "_deltaindex")
if err != nil {
return nil, err
}

return newReference(deltaRef)
}

// ensureManifestIsLoaded sets s.cachedManifest and s.cachedManifestMIMEType
//
// ImageSource implementations are not required or expected to do any caching,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/containers/libtrust v0.0.0-20190913040956-14b96171aa3b
github.com/containers/ocicrypt v1.0.2
github.com/containers/storage v1.20.1
github.com/containers/tar-diff v0.1.2
github.com/docker/distribution v2.7.1+incompatible
github.com/docker/docker v1.4.2-0.20191219165747-a9416c67da9f
github.com/docker/docker-credential-helpers v0.6.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ github.com/containers/storage v1.19.2 h1:vhcUwEjDZiPJxaLPFsjvyavnEjFw6qQi9HAkVz1
github.com/containers/storage v1.19.2/go.mod h1:gYCp3jzgXkvubO0rI14QAjz5Mxm/qKJgLmHFyqayDnw=
github.com/containers/storage v1.20.1 h1:2XE4eRIqSa6YjhAZjNwIkIKE6+Miy+5WV8l1KzY2ZKk=
github.com/containers/storage v1.20.1/go.mod h1:RoKzO8KSDogCT6c06rEbanZTcKYxshorB33JikEGc3A=
github.com/containers/tar-diff v0.1.2 h1:6E04zGCdCsSJ8SoApFLiYkAxqkFllcKOaOATBUGydL4=
github.com/containers/tar-diff v0.1.2/go.mod h1:9/tnBUlqmoW1bz83CQAW9wC+EQCH+h1Wn8uq3VxLbMc=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
14 changes: 14 additions & 0 deletions image/unparsed.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ func (i *UnparsedImage) Manifest(ctx context.Context) ([]byte, string, error) {
return i.cachedManifest, i.cachedManifestMIMEType, nil
}

func (i *UnparsedImage) DeltaLayers(ctx context.Context) ([]types.BlobInfo, error) {
// Note that GetDeltaManifest can return nil with a nil error. This is ok if no deltas exist
mb, mt, err := types.ImageSourceGetDeltaManifest(i.src, ctx, i.instanceDigest)
if mb == nil {
return nil, err
}

m, err := manifestInstanceFromBlob(ctx, nil, i.src, mb, mt)
if err != nil {
return nil, err
}
return m.LayerInfos(), nil
}

// expectedManifestDigest returns a the expected value of the manifest digest, and an indicator whether it is known.
// The bool return value seems redundant with digest != ""; it is used explicitly
// to refuse (unexpected) situations when the digest exists but is "".
Expand Down
11 changes: 10 additions & 1 deletion manifest/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type OCI1 struct {
imgspecv1.Manifest
}

const (
// MediaTypeDescriptor specifies the media type for a content descriptor.
MediaTypeTarDiff = "application/vnd.tar-diff"
)

// SupportedOCI1MediaType checks if the specified string is a supported OCI1
// media type.
//
Expand All @@ -41,7 +46,7 @@ type OCI1 struct {
// useful for validation anyway.
func SupportedOCI1MediaType(m string) error {
switch m {
case imgspecv1.MediaTypeDescriptor, imgspecv1.MediaTypeImageConfig, imgspecv1.MediaTypeImageLayer, imgspecv1.MediaTypeImageLayerGzip, imgspecv1.MediaTypeImageLayerNonDistributable, imgspecv1.MediaTypeImageLayerNonDistributableGzip, imgspecv1.MediaTypeImageLayerNonDistributableZstd, imgspecv1.MediaTypeImageLayerZstd, imgspecv1.MediaTypeImageManifest, imgspecv1.MediaTypeLayoutHeader, ociencspec.MediaTypeLayerEnc, ociencspec.MediaTypeLayerGzipEnc:
case imgspecv1.MediaTypeDescriptor, imgspecv1.MediaTypeImageConfig, imgspecv1.MediaTypeImageLayer, imgspecv1.MediaTypeImageLayerGzip, imgspecv1.MediaTypeImageLayerNonDistributable, imgspecv1.MediaTypeImageLayerNonDistributableGzip, imgspecv1.MediaTypeImageLayerNonDistributableZstd, imgspecv1.MediaTypeImageLayerZstd, imgspecv1.MediaTypeImageManifest, imgspecv1.MediaTypeLayoutHeader, ociencspec.MediaTypeLayerEnc, ociencspec.MediaTypeLayerGzipEnc, MediaTypeTarDiff:
return nil
default:
return fmt.Errorf("unsupported OCIv1 media type: %q", m)
Expand Down Expand Up @@ -211,3 +216,7 @@ func getDecryptedMediaType(mediatype string) (string, error) {

return strings.TrimSuffix(mediatype, "+encrypted"), nil
}

func IsNoCompressType(mediatype string) bool {
return mediatype == MediaTypeTarDiff
}
15 changes: 15 additions & 0 deletions openshift/openshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,17 @@ func (s *openshiftImageSource) GetManifest(ctx context.Context, instanceDigest *
return s.docker.GetManifest(ctx, instanceDigest)
}

func (s *openshiftImageSource) GetDeltaManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) {
if err := s.ensureImageIsResolved(ctx); err != nil {
return nil, "", err
}
return types.ImageSourceGetDeltaManifest(s.docker, ctx, instanceDigest)
}

func (s *openshiftImageSource) GetDeltaIndex(ctx context.Context) (types.ImageReference, error) {
return types.ImageSourceGetDeltaIndex(s.docker, ctx)
}

// HasThreadSafeGetBlob indicates whether GetBlob can be executed concurrently.
func (s *openshiftImageSource) HasThreadSafeGetBlob() bool {
return false
Expand Down Expand Up @@ -511,6 +522,10 @@ func (d *openshiftImageDestination) Commit(ctx context.Context, unparsedToplevel
return d.docker.Commit(ctx, unparsedToplevel)
}

func (d *openshiftImageDestination) GetLayerDeltaData(ctx context.Context, diffID digest.Digest) (types.DeltaDataSource, error) {
return types.ImageDestinationGetLayerDeltaData(d.docker, ctx, diffID)
}

// These structs are subsets of github.com/openshift/origin/pkg/image/api/v1 and its dependencies.
type imageStream struct {
Status imageStreamStatus `json:"status,omitempty"`
Expand Down
Loading

0 comments on commit ef12614

Please sign in to comment.