diff --git a/copy/compression.go b/copy/compression.go index 081c49312f..fb5e1b174e 100644 --- a/copy/compression.go +++ b/copy/compression.go @@ -52,6 +52,16 @@ func blobPipelineDetectCompressionStep(stream *sourceStream, srcInfo types.BlobI } stream.reader = reader + if decompressor != nil && format.Name() == compressiontypes.ZstdAlgorithmName { + tocDigest, err := chunkedToc.GetTOCDigest(srcInfo.Annotations) + if err != nil { + return bpDetectCompressionStepData{}, err + } + if tocDigest != nil { + format = compression.ZstdChunked + } + + } res := bpDetectCompressionStepData{ isCompressed: decompressor != nil, format: format, @@ -71,13 +81,14 @@ func blobPipelineDetectCompressionStep(stream *sourceStream, srcInfo types.BlobI // bpCompressionStepData contains data that the copy pipeline needs about the compression step. type bpCompressionStepData struct { - operation bpcOperation // What we are actually doing - uploadedOperation types.LayerCompression // Operation to use for updating the blob metadata (matching the end state, not necessarily what we do) - uploadedAlgorithm *compressiontypes.Algorithm // An algorithm parameter for the compressionOperation edits. - uploadedAnnotations map[string]string // Compression-related annotations that should be set on the uploaded blob. WARNING: This is only set after the srcStream.reader is fully consumed. - srcCompressorBaseVariantName string // Compressor base variant name to record in the blob info cache for the source blob. - uploadedCompressorName string // Compressor name to record in the blob info cache for the uploaded blob. - closers []io.Closer // Objects to close after the upload is done, if any. + operation bpcOperation // What we are actually doing + uploadedOperation types.LayerCompression // Operation to use for updating the blob metadata (matching the end state, not necessarily what we do) + uploadedAlgorithm *compressiontypes.Algorithm // An algorithm parameter for the compressionOperation edits. + uploadedAnnotations map[string]string // Compression-related annotations that should be set on the uploaded blob. WARNING: This is only set after the srcStream.reader is fully consumed. + srcCompressorBaseVariantName string // Compressor base variant name to record in the blob info cache for the source blob. + uploadedCompressorBaseVariantName string // Compressor base variant name to record in the blob info cache for the uploaded blob. + uploadedCompressorSpecificVariantName string // Compressor specific variant name to record in the blob info cache for the uploaded blob. + closers []io.Closer // Objects to close after the upload is done, if any. } type bpcOperation int @@ -129,11 +140,12 @@ func (ic *imageCopier) bpcPreserveEncrypted(stream *sourceStream, _ bpDetectComp // We can’t do anything with an encrypted blob unless decrypted. logrus.Debugf("Using original blob without modification for encrypted blob") return &bpCompressionStepData{ - operation: bpcOpPreserveOpaque, - uploadedOperation: types.PreserveOriginal, - uploadedAlgorithm: nil, - srcCompressorBaseVariantName: internalblobinfocache.UnknownCompression, - uploadedCompressorName: internalblobinfocache.UnknownCompression, + operation: bpcOpPreserveOpaque, + uploadedOperation: types.PreserveOriginal, + uploadedAlgorithm: nil, + srcCompressorBaseVariantName: internalblobinfocache.UnknownCompression, + uploadedCompressorBaseVariantName: internalblobinfocache.UnknownCompression, + uploadedCompressorSpecificVariantName: internalblobinfocache.UnknownCompression, }, nil } return nil, nil @@ -157,14 +169,19 @@ func (ic *imageCopier) bpcCompressUncompressed(stream *sourceStream, detected bp Digest: "", Size: -1, } + specificVariantName := uploadedAlgorithm.Name() + if specificVariantName == uploadedAlgorithm.BaseVariantName() { + specificVariantName = internalblobinfocache.UnknownCompression + } return &bpCompressionStepData{ - operation: bpcOpCompressUncompressed, - uploadedOperation: types.Compress, - uploadedAlgorithm: uploadedAlgorithm, - uploadedAnnotations: annotations, - srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName, - uploadedCompressorName: uploadedAlgorithm.Name(), - closers: []io.Closer{reader}, + operation: bpcOpCompressUncompressed, + uploadedOperation: types.Compress, + uploadedAlgorithm: uploadedAlgorithm, + uploadedAnnotations: annotations, + srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName, + uploadedCompressorBaseVariantName: uploadedAlgorithm.BaseVariantName(), + uploadedCompressorSpecificVariantName: specificVariantName, + closers: []io.Closer{reader}, }, nil } return nil, nil @@ -197,15 +214,20 @@ func (ic *imageCopier) bpcRecompressCompressed(stream *sourceStream, detected bp Digest: "", Size: -1, } + specificVariantName := ic.compressionFormat.Name() + if specificVariantName == ic.compressionFormat.BaseVariantName() { + specificVariantName = internalblobinfocache.UnknownCompression + } succeeded = true return &bpCompressionStepData{ - operation: bpcOpRecompressCompressed, - uploadedOperation: types.PreserveOriginal, - uploadedAlgorithm: ic.compressionFormat, - uploadedAnnotations: annotations, - srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName, - uploadedCompressorName: ic.compressionFormat.Name(), - closers: []io.Closer{decompressed, recompressed}, + operation: bpcOpRecompressCompressed, + uploadedOperation: types.PreserveOriginal, + uploadedAlgorithm: ic.compressionFormat, + uploadedAnnotations: annotations, + srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName, + uploadedCompressorBaseVariantName: ic.compressionFormat.BaseVariantName(), + uploadedCompressorSpecificVariantName: specificVariantName, + closers: []io.Closer{decompressed, recompressed}, }, nil } return nil, nil @@ -226,12 +248,13 @@ func (ic *imageCopier) bpcDecompressCompressed(stream *sourceStream, detected bp Size: -1, } return &bpCompressionStepData{ - operation: bpcOpDecompressCompressed, - uploadedOperation: types.Decompress, - uploadedAlgorithm: nil, - srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName, - uploadedCompressorName: internalblobinfocache.Uncompressed, - closers: []io.Closer{s}, + operation: bpcOpDecompressCompressed, + uploadedOperation: types.Decompress, + uploadedAlgorithm: nil, + srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName, + uploadedCompressorBaseVariantName: internalblobinfocache.Uncompressed, + uploadedCompressorSpecificVariantName: internalblobinfocache.UnknownCompression, + closers: []io.Closer{s}, }, nil } return nil, nil @@ -276,7 +299,8 @@ func (ic *imageCopier) bpcPreserveOriginal(_ *sourceStream, detected bpDetectCom // We only record the base variant of the format on upload; we didn’t do anything with // the TOC, we don’t know whether it matches the blob digest, so we don’t want to trigger // reuse of any kind between the blob digest and the TOC digest. - uploadedCompressorName: detected.srcCompressorBaseVariantName, + uploadedCompressorBaseVariantName: detected.srcCompressorBaseVariantName, + uploadedCompressorSpecificVariantName: internalblobinfocache.UnknownCompression, } } @@ -336,24 +360,16 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf return fmt.Errorf("Internal error: Unexpected d.operation value %#v", d.operation) } } - if d.srcCompressorBaseVariantName == "" || d.uploadedCompressorName == "" { - return fmt.Errorf("internal error: missing compressor names (src base: %q, uploaded: %q)", - d.srcCompressorBaseVariantName, d.uploadedCompressorName) + if d.srcCompressorBaseVariantName == "" || d.uploadedCompressorBaseVariantName == "" || d.uploadedCompressorSpecificVariantName == "" { + return fmt.Errorf("internal error: missing compressor names (src base: %q, uploaded base: %q, uploaded specific: %q)", + d.srcCompressorBaseVariantName, d.uploadedCompressorBaseVariantName, d.uploadedCompressorSpecificVariantName) } - if d.uploadedCompressorName != internalblobinfocache.UnknownCompression { - if d.uploadedCompressorName != compressiontypes.ZstdChunkedAlgorithmName { - // HACK: Don’t record zstd:chunked algorithms. - // There is already a similar hack in internal/imagedestination/impl/helpers.CandidateMatchesTryReusingBlobOptions, - // and that one prevents reusing zstd:chunked blobs, so recording the algorithm here would be mostly harmless. - // - // We skip that here anyway to work around the inability of blobPipelineDetectCompressionStep to differentiate - // between zstd and zstd:chunked; so we could, in varying situations over time, call RecordDigestCompressorName - // with the same digest and both ZstdAlgorithmName and ZstdChunkedAlgorithmName , which causes warnings about - // inconsistent data to be logged. - c.blobInfoCache.RecordDigestCompressorData(uploadedInfo.Digest, internalblobinfocache.DigestCompressorData{ - BaseVariantCompressor: d.uploadedCompressorName, - }) - } + if d.uploadedCompressorBaseVariantName != internalblobinfocache.UnknownCompression { + c.blobInfoCache.RecordDigestCompressorData(uploadedInfo.Digest, internalblobinfocache.DigestCompressorData{ + BaseVariantCompressor: d.uploadedCompressorBaseVariantName, + SpecificVariantCompressor: d.uploadedCompressorSpecificVariantName, + SpecificVariantAnnotations: d.uploadedAnnotations, + }) } if srcInfo.Digest != "" && srcInfo.Digest != uploadedInfo.Digest && d.srcCompressorBaseVariantName != internalblobinfocache.UnknownCompression { @@ -361,7 +377,9 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf // blob as is, or perhaps decompressed it; either way we don’t trust the TOC digest, // so record neither the variant name, nor the TOC digest. c.blobInfoCache.RecordDigestCompressorData(srcInfo.Digest, internalblobinfocache.DigestCompressorData{ - BaseVariantCompressor: d.srcCompressorBaseVariantName, + BaseVariantCompressor: d.srcCompressorBaseVariantName, + SpecificVariantCompressor: internalblobinfocache.UnknownCompression, + SpecificVariantAnnotations: nil, }) } return nil diff --git a/copy/single.go b/copy/single.go index 714dc81368..ba414a22d6 100644 --- a/copy/single.go +++ b/copy/single.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "maps" "reflect" "slices" "strings" @@ -162,7 +163,7 @@ func (c *copier) copySingleImage(ctx context.Context, unparsedImage *image.Unpar if format == nil { format = defaultCompressionFormat } - if format.Name() == compression.ZstdChunked.Name() { + if format.Name() == compressiontypes.ZstdChunkedAlgorithmName { if ic.requireCompressionFormatMatch { return copySingleImageResult{}, errors.New("explicitly requested to combine zstd:chunked with encryption, which is not beneficial; use plain zstd instead") } @@ -888,21 +889,33 @@ func updatedBlobInfoFromReuse(inputInfo types.BlobInfo, reusedBlob private.Reuse // Handling of compression, encryption, and the related MIME types and the like are all the responsibility // of the generic code in this package. res := types.BlobInfo{ - Digest: reusedBlob.Digest, - Size: reusedBlob.Size, - URLs: nil, // This _must_ be cleared if Digest changes; clear it in other cases as well, to preserve previous behavior. - Annotations: inputInfo.Annotations, // FIXME: This should remove zstd:chunked annotations (but those annotations being left with incorrect values should not break pulls) - MediaType: inputInfo.MediaType, // Mostly irrelevant, MediaType is updated based on Compression*/CryptoOperation. + Digest: reusedBlob.Digest, + Size: reusedBlob.Size, + URLs: nil, // This _must_ be cleared if Digest changes; clear it in other cases as well, to preserve previous behavior. + // FIXME: This should remove zstd:chunked annotations IF the original was chunked and the new one isn’t + // (but those annotations being left with incorrect values should not break pulls). + Annotations: maps.Clone(inputInfo.Annotations), + MediaType: inputInfo.MediaType, // Mostly irrelevant, MediaType is updated based on Compression*/CryptoOperation. CompressionOperation: reusedBlob.CompressionOperation, CompressionAlgorithm: reusedBlob.CompressionAlgorithm, CryptoOperation: inputInfo.CryptoOperation, // Expected to be unset anyway. } // The transport is only expected to fill CompressionOperation and CompressionAlgorithm - // if the blob was substituted; otherwise, fill it in based + // if the blob was substituted; otherwise, it is optional, and if not set, fill it in based // on what we know from the srcInfos we were given. if reusedBlob.Digest == inputInfo.Digest { - res.CompressionOperation = inputInfo.CompressionOperation - res.CompressionAlgorithm = inputInfo.CompressionAlgorithm + if res.CompressionOperation == types.PreserveOriginal { + res.CompressionOperation = inputInfo.CompressionOperation + } + if res.CompressionAlgorithm == nil { + res.CompressionAlgorithm = inputInfo.CompressionAlgorithm + } + } + if len(reusedBlob.CompressionAnnotations) != 0 { + if res.Annotations == nil { + res.Annotations = map[string]string{} + } + maps.Copy(res.Annotations, reusedBlob.CompressionAnnotations) } return res } diff --git a/copy/single_test.go b/copy/single_test.go index 144b5ed2aa..890a63bce8 100644 --- a/copy/single_test.go +++ b/copy/single_test.go @@ -55,22 +55,42 @@ func TestUpdatedBlobInfoFromReuse(t *testing.T) { }, { // Reuse with substitution reused: private.ReusedBlob{ - Digest: "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - Size: 513543640, - CompressionOperation: types.Decompress, - CompressionAlgorithm: nil, + Digest: "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + Size: 513543640, + CompressionOperation: types.Decompress, + CompressionAlgorithm: nil, + CompressionAnnotations: map[string]string{"decompressed": "value"}, }, expected: types.BlobInfo{ Digest: "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", Size: 513543640, URLs: nil, - Annotations: map[string]string{"test-annotation-2": "two"}, + Annotations: map[string]string{"test-annotation-2": "two", "decompressed": "value"}, MediaType: imgspecv1.MediaTypeImageLayerGzip, CompressionOperation: types.Decompress, CompressionAlgorithm: nil, // CryptoOperation is set to the zero value }, }, + { // Reuse turning zstd into zstd:chunked + reused: private.ReusedBlob{ + Digest: "sha256:6a5a5368e0c2d3e5909184fa28ddfd56072e7ff3ee9a945876f7eee5896ef5bb", + Size: 51354364, + CompressionOperation: types.Compress, + CompressionAlgorithm: &compression.ZstdChunked, + CompressionAnnotations: map[string]string{"zstd-toc": "value"}, + }, + expected: types.BlobInfo{ + Digest: "sha256:6a5a5368e0c2d3e5909184fa28ddfd56072e7ff3ee9a945876f7eee5896ef5bb", + Size: 51354364, + URLs: nil, + Annotations: map[string]string{"test-annotation-2": "two", "zstd-toc": "value"}, + MediaType: imgspecv1.MediaTypeImageLayerGzip, + CompressionOperation: types.Compress, + CompressionAlgorithm: &compression.ZstdChunked, + // CryptoOperation is set to the zero value + }, + }, } { res := updatedBlobInfoFromReuse(srcInfo, c.reused) assert.Equal(t, c.expected, res, fmt.Sprintf("%#v", c.reused)) diff --git a/docker/docker_image_dest.go b/docker/docker_image_dest.go index 7f7a74bd37..ed3d4a2c0b 100644 --- a/docker/docker_image_dest.go +++ b/docker/docker_image_dest.go @@ -332,6 +332,7 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context, return false, private.ReusedBlob{}, errors.New("Can not check for a blob with unknown digest") } + originalCandidateKnownToBeMissing := false if impl.OriginalCandidateMatchesTryReusingBlobOptions(options) { // First, check whether the blob happens to already exist at the destination. haveBlob, reusedInfo, err := d.tryReusingExactBlob(ctx, info, options.Cache) @@ -341,9 +342,17 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context, if haveBlob { return true, reusedInfo, nil } + originalCandidateKnownToBeMissing = true } else { logrus.Debugf("Ignoring exact blob match, compression %s does not match required %s or MIME types %#v", optionalCompressionName(options.OriginalCompression), optionalCompressionName(options.RequiredCompression), options.PossibleManifestFormats) + // We can get here with a blob detected to be zstd when the user wants a zstd:chunked. + // In that case we keep originalCandiateKnownToBeMissing = false, so that if we find + // a BIC entry for this blob, we do use that entry and return a zstd:chunked entry + // with the BIC’s annotations. + // This is not quite correct, it only works if the BIC also contains an acceptable _location_. + // Ideally, we could look up just the compression algorithm/annotations for info.digest, + // and use it even if no location candidate exists and the original dandidate is present. } // Then try reusing blobs from other locations. @@ -387,7 +396,8 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context, // for it in the current repo. candidateRepo = reference.TrimNamed(d.ref.ref) } - if candidateRepo.Name() == d.ref.ref.Name() && candidate.Digest == info.Digest { + if originalCandidateKnownToBeMissing && + candidateRepo.Name() == d.ref.ref.Name() && candidate.Digest == info.Digest { logrus.Debug("... Already tried the primary destination") continue } @@ -427,10 +437,12 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context, options.Cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), candidate.Digest, newBICLocationReference(d.ref)) return true, private.ReusedBlob{ - Digest: candidate.Digest, - Size: size, - CompressionOperation: candidate.CompressionOperation, - CompressionAlgorithm: candidate.CompressionAlgorithm}, nil + Digest: candidate.Digest, + Size: size, + CompressionOperation: candidate.CompressionOperation, + CompressionAlgorithm: candidate.CompressionAlgorithm, + CompressionAnnotations: candidate.CompressionAnnotations, + }, nil } return false, private.ReusedBlob{}, nil diff --git a/internal/blobinfocache/types.go b/internal/blobinfocache/types.go index 276c8073e3..acf82ee639 100644 --- a/internal/blobinfocache/types.go +++ b/internal/blobinfocache/types.go @@ -37,8 +37,11 @@ type BlobInfoCache2 interface { // RecordDigestCompressorData records data for the blob with the specified digest. // WARNING: Only call this with LOCALLY VERIFIED data: - // - don’t record a compressor for a digest just because some remote author claims so - // (e.g. because a manifest says so); + // - don’t record a compressor for a digest just because some remote author claims so + // (e.g. because a manifest says so); + // - don’t record the non-base variant or annotations if we are not _sure_ that the base variant + // and the blob’s digest match the non-base variant’s annotations (e.g. because we saw them + // in a manifest) // otherwise the cache could be poisoned and cause us to make incorrect edits to type // information in a manifest. RecordDigestCompressorData(anyDigest digest.Digest, data DigestCompressorData) @@ -52,6 +55,9 @@ type BlobInfoCache2 interface { // (This is worded generically, but basically targeted at the zstd / zstd:chunked situation.) type DigestCompressorData struct { BaseVariantCompressor string // A compressor’s base variant name, or Uncompressed or UnknownCompression. + // The following fields are only valid if the base variant is neither Uncompressed nor UnknownCompression: + SpecificVariantCompressor string // A non-base variant compressor (or UnknownCompression if the true format is just the base variant) + SpecificVariantAnnotations map[string]string // Annotations required to benefit from the base variant. } // CandidateLocations2Options are used in CandidateLocations2. @@ -66,9 +72,10 @@ type CandidateLocations2Options struct { // BICReplacementCandidate2 is an item returned by BlobInfoCache2.CandidateLocations2. type BICReplacementCandidate2 struct { - Digest digest.Digest - CompressionOperation types.LayerCompression // Either types.Decompress for uncompressed, or types.Compress for compressed - CompressionAlgorithm *compressiontypes.Algorithm // An algorithm when the candidate is compressed, or nil when it is uncompressed - UnknownLocation bool // is true when `Location` for this blob is not set - Location types.BICLocationReference // not set if UnknownLocation is set to `true` + Digest digest.Digest + CompressionOperation types.LayerCompression // Either types.Decompress for uncompressed, or types.Compress for compressed + CompressionAlgorithm *compressiontypes.Algorithm // An algorithm when the candidate is compressed, or nil when it is uncompressed + CompressionAnnotations map[string]string // If necessary, annotations necessary to use CompressionAlgorithm + UnknownLocation bool // is true when `Location` for this blob is not set + Location types.BICLocationReference // not set if UnknownLocation is set to `true` } diff --git a/internal/imagedestination/wrapper.go b/internal/imagedestination/wrapper.go index cdd3c5e5d0..f5a38541ae 100644 --- a/internal/imagedestination/wrapper.go +++ b/internal/imagedestination/wrapper.go @@ -76,6 +76,9 @@ func (w *wrapped) TryReusingBlobWithOptions(ctx context.Context, info types.Blob Size: blob.Size, CompressionOperation: blob.CompressionOperation, CompressionAlgorithm: blob.CompressionAlgorithm, + // CompressionAnnotations could be set to blob.Annotations, but that may contain unrelated + // annotations, and we didn’t use the blob.Annotations field previously, so we’ll + // continue not using it. }, nil } diff --git a/internal/manifest/manifest.go b/internal/manifest/manifest.go index ee0ddc772a..3fb52104a6 100644 --- a/internal/manifest/manifest.go +++ b/internal/manifest/manifest.go @@ -205,11 +205,6 @@ type ReuseConditions struct { // (which can be nil to represent uncompressed or unknown) matches reuseConditions. func CandidateCompressionMatchesReuseConditions(c ReuseConditions, candidateCompression *compressiontypes.Algorithm) bool { if c.RequiredCompression != nil { - if c.RequiredCompression.Name() == compressiontypes.ZstdChunkedAlgorithmName { - // HACK: Never match when the caller asks for zstd:chunked, because we don’t record the annotations required to use the chunked blobs. - // The caller must re-compress to build those annotations. - return false - } if candidateCompression == nil || (c.RequiredCompression.Name() != candidateCompression.Name() && c.RequiredCompression.Name() != candidateCompression.BaseVariantName()) { return false diff --git a/internal/manifest/manifest_test.go b/internal/manifest/manifest_test.go index ae2cc32ca0..ebb3ef5593 100644 --- a/internal/manifest/manifest_test.go +++ b/internal/manifest/manifest_test.go @@ -192,6 +192,9 @@ func TestCandidateCompressionMatchesReuseConditions(t *testing.T) { }{ // RequiredCompression restrictions {&compression.Zstd, nil, &compression.Zstd, true}, + {&compression.Zstd, nil, &compression.ZstdChunked, true}, + {&compression.ZstdChunked, nil, &compression.Zstd, false}, + {&compression.ZstdChunked, nil, &compression.ZstdChunked, true}, {&compression.Gzip, nil, &compression.Zstd, false}, {&compression.Zstd, nil, nil, false}, {nil, nil, &compression.Zstd, true}, diff --git a/internal/private/private.go b/internal/private/private.go index 63fb9326de..d81ea6703e 100644 --- a/internal/private/private.go +++ b/internal/private/private.go @@ -134,9 +134,14 @@ type ReusedBlob struct { Size int64 // Must be provided // The following compression fields should be set when the reuse substitutes // a differently-compressed blob. + // They may be set also to change from a base variant to a specific variant of an algorithm. CompressionOperation types.LayerCompression // Compress/Decompress, matching the reused blob; PreserveOriginal if N/A CompressionAlgorithm *compression.Algorithm // Algorithm if compressed, nil if decompressed or N/A + // Annotations that should be added, for CompressionAlgorithm. Note that they might need to be + // added even if the digest doesn’t change (if we found the annotations in a cache). + CompressionAnnotations map[string]string + MatchedByTOCDigest bool // Whether the layer was reused/matched by TOC digest. Used only for UI purposes. } diff --git a/pkg/blobinfocache/boltdb/boltdb.go b/pkg/blobinfocache/boltdb/boltdb.go index 036d2c0149..2d4137ffd2 100644 --- a/pkg/blobinfocache/boltdb/boltdb.go +++ b/pkg/blobinfocache/boltdb/boltdb.go @@ -2,6 +2,8 @@ package boltdb import ( + "bytes" + "encoding/json" "errors" "fmt" "io/fs" @@ -30,6 +32,10 @@ var ( // digestCompressorBucket stores a mapping from any digest to a compressor, or blobinfocache.Uncompressed (not blobinfocache.UnknownCompression). // It may not exist in caches created by older versions, even if uncompressedDigestBucket is present. digestCompressorBucket = []byte("digestCompressor") + // digestSpecificVariantCompressorBucket stores a mapping from any digest to a (compressor, NUL byte, annotations as JSON), valid + // only if digestCompressorBucket contains a value. The compressor is not `UnknownCompression`. + digestSpecificVariantCompressorBucket = []byte("digestSpecificVariantCompressor") + // It may not exist in caches created by older versions, even if digestCompressorBucket is present. // digestByUncompressedBucket stores a bucket per uncompressed digest, with the bucket containing a set of digests for that uncompressed digest // (as a set of key=digest, value="" pairs) digestByUncompressedBucket = []byte("digestByUncompressed") @@ -299,25 +305,68 @@ func (bdc *cache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompresse // WARNING: Only call this with LOCALLY VERIFIED data: // - don’t record a compressor for a digest just because some remote author claims so // (e.g. because a manifest says so); +// - don’t record the non-base variant or annotations if we are not _sure_ that the base variant +// and the blob’s digest match the non-base variant’s annotations (e.g. because we saw them +// in a manifest) // // otherwise the cache could be poisoned and cause us to make incorrect edits to type // information in a manifest. func (bdc *cache) RecordDigestCompressorData(anyDigest digest.Digest, data blobinfocache.DigestCompressorData) { _ = bdc.update(func(tx *bolt.Tx) error { + key := []byte(anyDigest.String()) + b, err := tx.CreateBucketIfNotExists(digestCompressorBucket) if err != nil { return err } - key := []byte(anyDigest.String()) + warned := false if previousBytes := b.Get(key); previousBytes != nil { if string(previousBytes) != data.BaseVariantCompressor { logrus.Warnf("Compressor for blob with digest %s previously recorded as %s, now %s", anyDigest, string(previousBytes), data.BaseVariantCompressor) + warned = true } } if data.BaseVariantCompressor == blobinfocache.UnknownCompression { - return b.Delete(key) + if err := b.Delete(key); err != nil { + return err + } + if b := tx.Bucket(digestSpecificVariantCompressorBucket); b != nil { + if err := b.Delete(key); err != nil { + return err + } + } } - return b.Put(key, []byte(data.BaseVariantCompressor)) + if err := b.Put(key, []byte(data.BaseVariantCompressor)); err != nil { + return err + } + + if data.SpecificVariantCompressor != blobinfocache.UnknownCompression { + b, err := tx.CreateBucketIfNotExists(digestSpecificVariantCompressorBucket) + if err != nil { + return err + } + if !warned { // Don’t warn twice about the same digest + if previousBytes := b.Get(key); previousBytes != nil { + if prevSVCBytes, _, ok := bytes.Cut(previousBytes, []byte{0}); ok { + prevSVC := string(prevSVCBytes) + if data.SpecificVariantCompressor != prevSVC { + logrus.Warnf("Specific compressor for blob with digest %s previously recorded as %s, now %s", anyDigest, prevSVC, data.SpecificVariantCompressor) + } + } + } + } + annotations, err := json.Marshal(data.SpecificVariantAnnotations) + if err != nil { + return err + } + data := bytes.Clone([]byte(data.SpecificVariantCompressor)) + data = append(data, 0) + data = append(data, annotations...) + if err := b.Put(key, data); err != nil { + return err + } + } + return nil }) // FIXME? Log error (but throttle the log volume on repeated accesses)? } @@ -354,24 +403,36 @@ func (bdc *cache) RecordKnownLocation(transport types.ImageTransport, scope type // appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket // (which might be nil) with corresponding compression -// info from compressionBucket (which might be nil), and returns the result of appending them +// info from compressionBucket and specificVariantCompresssionBucket (which might be nil), and returns the result of appending them // to candidates. // v2Options is not nil if the caller is CandidateLocations2: this allows including candidates with unknown location, and filters out candidates // with unknown compression. -func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket *bolt.Bucket, digest digest.Digest, - v2Options *blobinfocache.CandidateLocations2Options) []prioritize.CandidateWithTime { +func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket, specificVariantCompresssionBucket *bolt.Bucket, + digest digest.Digest, v2Options *blobinfocache.CandidateLocations2Options) []prioritize.CandidateWithTime { digestKey := []byte(digest.String()) - compressorName := blobinfocache.UnknownCompression + compressionData := blobinfocache.DigestCompressorData{ + BaseVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantAnnotations: nil, + } if compressionBucket != nil { // the bucket won't exist if the cache was created by a v1 implementation and // hasn't yet been updated by a v2 implementation if compressorNameValue := compressionBucket.Get(digestKey); len(compressorNameValue) > 0 { - compressorName = string(compressorNameValue) + compressionData.BaseVariantCompressor = string(compressorNameValue) + } + if specificVariantCompresssionBucket != nil { + if svcData := specificVariantCompresssionBucket.Get(digestKey); svcData != nil { + if compressorBytes, annotationBytes, ok := bytes.Cut(svcData, []byte{0}); ok { + compressionData.SpecificVariantCompressor = string(compressorBytes) + if err := json.Unmarshal(annotationBytes, &compressionData.SpecificVariantAnnotations); err != nil { + return candidates // FIXME? Log error (but throttle the log volume on repeated accesses)? + } + } + } } } - template := prioritize.CandidateTemplateWithCompression(v2Options, digest, blobinfocache.DigestCompressorData{ - BaseVariantCompressor: compressorName, - }) + template := prioritize.CandidateTemplateWithCompression(v2Options, digest, compressionData) if template == nil { return candidates } @@ -416,11 +477,12 @@ func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types if scopeBucket != nil { scopeBucket = scopeBucket.Bucket([]byte(scope.Opaque)) } - // compressionBucket won't have been created if previous writers never recorded info about compression, + // compressionBucket and svCompressionBucket won't have been created if previous writers never recorded info about compression, // and we don't want to fail just because of that compressionBucket := tx.Bucket(digestCompressorBucket) + specificVariantCompressionBucket := tx.Bucket(digestSpecificVariantCompressorBucket) - res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, primaryDigest, v2Options) + res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, specificVariantCompressionBucket, primaryDigest, v2Options) if canSubstitute { if uncompressedDigestValue = bdc.uncompressedDigest(tx, primaryDigest); uncompressedDigestValue != "" { b := tx.Bucket(digestByUncompressedBucket) @@ -433,7 +495,7 @@ func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types return err } if d != primaryDigest && d != uncompressedDigestValue { - res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, d, v2Options) + res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, specificVariantCompressionBucket, d, v2Options) } return nil }); err != nil { @@ -442,7 +504,7 @@ func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types } } if uncompressedDigestValue != primaryDigest { - res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, uncompressedDigestValue, v2Options) + res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, specificVariantCompressionBucket, uncompressedDigestValue, v2Options) } } } diff --git a/pkg/blobinfocache/internal/prioritize/prioritize.go b/pkg/blobinfocache/internal/prioritize/prioritize.go index 03548209f9..40ed09bed5 100644 --- a/pkg/blobinfocache/internal/prioritize/prioritize.go +++ b/pkg/blobinfocache/internal/prioritize/prioritize.go @@ -28,9 +28,10 @@ const replacementUnknownLocationAttempts = 2 // CandidateTemplate is a subset of BICReplacementCandidate2 with data related to a specific digest, // which can be later combined with information about a location. type CandidateTemplate struct { - digest digest.Digest - compressionOperation types.LayerCompression // Either types.Decompress for uncompressed, or types.Compress for compressed - compressionAlgorithm *compression.Algorithm // An algorithm when the candidate is compressed, or nil when it is uncompressed + digest digest.Digest + compressionOperation types.LayerCompression // Either types.Decompress for uncompressed, or types.Compress for compressed + compressionAlgorithm *compression.Algorithm // An algorithm when the candidate is compressed, or nil when it is uncompressed + compressionAnnotations map[string]string // If necessary, annotations necessary to use compressionAlgorithm } // CandidateTemplateWithCompression returns a CandidateTemplate if a blob with data is acceptable @@ -40,7 +41,7 @@ type CandidateTemplate struct { // if not nil, the call is assumed to be CandidateLocations2. func CandidateTemplateWithCompression(v2Options *blobinfocache.CandidateLocations2Options, digest digest.Digest, data blobinfocache.DigestCompressorData) *CandidateTemplate { if v2Options == nil { - return &CandidateTemplate{ // Anything goes. The compressionOperation, compressionAlgorithm values are not used. + return &CandidateTemplate{ // Anything goes. The compressionOperation, compressionAlgorithm and compressionAnnotations values are not used. digest: digest, } } @@ -60,14 +61,40 @@ func CandidateTemplateWithCompression(v2Options *blobinfocache.CandidateLocation return nil } return &CandidateTemplate{ - digest: digest, - compressionOperation: types.Decompress, - compressionAlgorithm: nil, + digest: digest, + compressionOperation: types.Decompress, + compressionAlgorithm: nil, + compressionAnnotations: nil, } case blobinfocache.UnknownCompression: logrus.Debugf("Ignoring BlobInfoCache record of digest %q with unknown compression", digest.String()) return nil // Not allowed with CandidateLocations2 default: + // See if we can use the specific variant, first. + if data.SpecificVariantCompressor != blobinfocache.UnknownCompression { + algo, err := compression.AlgorithmByName(data.SpecificVariantCompressor) + if err != nil { + logrus.Debugf("Not considering unrecognized specific compression variant %q for BlobInfoCache record of digest %q: %v", + data.SpecificVariantCompressor, digest.String(), err) + } else { + if !manifest.CandidateCompressionMatchesReuseConditions(manifest.ReuseConditions{ + PossibleManifestFormats: v2Options.PossibleManifestFormats, + RequiredCompression: v2Options.RequiredCompression, + }, &algo) { + logrus.Debugf("Ignoring specific compression variant %q for BlobInfoCache record of digest %q, it does not match required %s or MIME types %#v", + data.SpecificVariantCompressor, digest.String(), requiredCompression, v2Options.PossibleManifestFormats) + } else { + return &CandidateTemplate{ + digest: digest, + compressionOperation: types.Compress, + compressionAlgorithm: &algo, + compressionAnnotations: data.SpecificVariantAnnotations, + } + } + } + } + + // Try the base variant. algo, err := compression.AlgorithmByName(data.BaseVariantCompressor) if err != nil { logrus.Debugf("Ignoring BlobInfoCache record of digest %q with unrecognized compression %q: %v", @@ -83,9 +110,10 @@ func CandidateTemplateWithCompression(v2Options *blobinfocache.CandidateLocation return nil } return &CandidateTemplate{ - digest: digest, - compressionOperation: types.Compress, - compressionAlgorithm: &algo, + digest: digest, + compressionOperation: types.Compress, + compressionAlgorithm: &algo, + compressionAnnotations: nil, } } } @@ -100,11 +128,12 @@ type CandidateWithTime struct { func (template CandidateTemplate) CandidateWithLocation(location types.BICLocationReference, lastSeen time.Time) CandidateWithTime { return CandidateWithTime{ candidate: blobinfocache.BICReplacementCandidate2{ - Digest: template.digest, - CompressionOperation: template.compressionOperation, - CompressionAlgorithm: template.compressionAlgorithm, - UnknownLocation: false, - Location: location, + Digest: template.digest, + CompressionOperation: template.compressionOperation, + CompressionAlgorithm: template.compressionAlgorithm, + CompressionAnnotations: template.compressionAnnotations, + UnknownLocation: false, + Location: location, }, lastSeen: lastSeen, } @@ -114,11 +143,12 @@ func (template CandidateTemplate) CandidateWithLocation(location types.BICLocati func (template CandidateTemplate) CandidateWithUnknownLocation() CandidateWithTime { return CandidateWithTime{ candidate: blobinfocache.BICReplacementCandidate2{ - Digest: template.digest, - CompressionOperation: template.compressionOperation, - CompressionAlgorithm: template.compressionAlgorithm, - UnknownLocation: true, - Location: types.BICLocationReference{Opaque: ""}, + Digest: template.digest, + CompressionOperation: template.compressionOperation, + CompressionAlgorithm: template.compressionAlgorithm, + CompressionAnnotations: template.compressionAnnotations, + UnknownLocation: true, + Location: types.BICLocationReference{Opaque: ""}, }, lastSeen: time.Time{}, } diff --git a/pkg/blobinfocache/internal/prioritize/prioritize_test.go b/pkg/blobinfocache/internal/prioritize/prioritize_test.go index 0d9fe12c1d..a344d7e271 100644 --- a/pkg/blobinfocache/internal/prioritize/prioritize_test.go +++ b/pkg/blobinfocache/internal/prioritize/prioritize_test.go @@ -56,14 +56,26 @@ var ( ) func TestCandidateTemplateWithCompression(t *testing.T) { + chunkedAnnotations := map[string]string{"a": "b"} uncompressedData := blobinfocache.DigestCompressorData{ - BaseVariantCompressor: blobinfocache.Uncompressed, + BaseVariantCompressor: blobinfocache.Uncompressed, + SpecificVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantAnnotations: nil, } gzipData := blobinfocache.DigestCompressorData{ - BaseVariantCompressor: compressiontypes.GzipAlgorithmName, + BaseVariantCompressor: compressiontypes.GzipAlgorithmName, + SpecificVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantAnnotations: nil, } zstdData := blobinfocache.DigestCompressorData{ - BaseVariantCompressor: compressiontypes.ZstdAlgorithmName, + BaseVariantCompressor: compressiontypes.ZstdAlgorithmName, + SpecificVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantAnnotations: nil, + } + zstdChunkedData := blobinfocache.DigestCompressorData{ + BaseVariantCompressor: compressiontypes.ZstdAlgorithmName, + SpecificVariantCompressor: compressiontypes.ZstdChunkedAlgorithmName, + SpecificVariantAnnotations: chunkedAnnotations, } for _, c := range []struct { @@ -72,14 +84,17 @@ func TestCandidateTemplateWithCompression(t *testing.T) { data blobinfocache.DigestCompressorData v2Matches bool // if v2Matches: - v2Op types.LayerCompression - v2Algo string + v2Op types.LayerCompression + v2Algo string + v2Annotations map[string]string }{ { name: "unknown", requiredCompression: nil, data: blobinfocache.DigestCompressorData{ - BaseVariantCompressor: blobinfocache.UnknownCompression, + BaseVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantAnnotations: nil, }, v2Matches: false, }, @@ -90,6 +105,7 @@ func TestCandidateTemplateWithCompression(t *testing.T) { v2Matches: true, v2Op: types.Decompress, v2Algo: "", + v2Annotations: nil, }, { name: "uncompressed, want gzip", @@ -104,6 +120,7 @@ func TestCandidateTemplateWithCompression(t *testing.T) { v2Matches: true, v2Op: types.Compress, v2Algo: compressiontypes.GzipAlgorithmName, + v2Annotations: nil, }, { name: "gzip, want zstd", @@ -115,7 +132,9 @@ func TestCandidateTemplateWithCompression(t *testing.T) { name: "unknown base", requiredCompression: nil, data: blobinfocache.DigestCompressorData{ - BaseVariantCompressor: "this value is unknown", + BaseVariantCompressor: "this value is unknown", + SpecificVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantAnnotations: nil, }, v2Matches: false, }, @@ -126,6 +145,7 @@ func TestCandidateTemplateWithCompression(t *testing.T) { v2Matches: true, v2Op: types.Compress, v2Algo: compressiontypes.ZstdAlgorithmName, + v2Annotations: nil, }, { name: "zstd, want gzip", @@ -140,6 +160,7 @@ func TestCandidateTemplateWithCompression(t *testing.T) { v2Matches: true, v2Op: types.Compress, v2Algo: compressiontypes.ZstdAlgorithmName, + v2Annotations: nil, }, { name: "zstd, want zstd:chunked", @@ -147,12 +168,59 @@ func TestCandidateTemplateWithCompression(t *testing.T) { data: zstdData, v2Matches: false, }, + { + name: "zstd:chunked", + requiredCompression: nil, + data: zstdChunkedData, + v2Matches: true, + v2Op: types.Compress, + v2Algo: compressiontypes.ZstdChunkedAlgorithmName, + v2Annotations: chunkedAnnotations, + }, + { + name: "zstd:chunked, want gzip", + requiredCompression: &compression.Gzip, + data: zstdChunkedData, + v2Matches: false, + }, + { + name: "zstd:chunked, want zstd", // Note that we return the full chunked data in this case. + requiredCompression: &compression.Zstd, + data: zstdChunkedData, + v2Matches: true, + v2Op: types.Compress, + v2Algo: compressiontypes.ZstdChunkedAlgorithmName, + v2Annotations: chunkedAnnotations, + }, + { + name: "zstd:chunked, want zstd:chunked", + requiredCompression: &compression.ZstdChunked, + data: zstdChunkedData, + v2Matches: true, + v2Op: types.Compress, + v2Algo: compressiontypes.ZstdChunkedAlgorithmName, + v2Annotations: chunkedAnnotations, + }, + { + name: "zstd:unknown", + requiredCompression: nil, + data: blobinfocache.DigestCompressorData{ + BaseVariantCompressor: compressiontypes.ZstdAlgorithmName, + SpecificVariantCompressor: "this value is unknown", + SpecificVariantAnnotations: chunkedAnnotations, + }, + v2Matches: true, + v2Op: types.Compress, + v2Algo: compressiontypes.ZstdAlgorithmName, + v2Annotations: nil, + }, } { res := CandidateTemplateWithCompression(nil, digestCompressedPrimary, c.data) assert.Equal(t, &CandidateTemplate{ - digest: digestCompressedPrimary, - compressionOperation: types.PreserveOriginal, - compressionAlgorithm: nil, + digest: digestCompressedPrimary, + compressionOperation: types.PreserveOriginal, + compressionAlgorithm: nil, + compressionAnnotations: nil, }, res, c.name) // These tests only use RequiredCompression in CandidateLocations2Options for clarity; @@ -172,13 +240,16 @@ func TestCandidateTemplateWithCompression(t *testing.T) { require.NotNil(t, res.compressionAlgorithm, c.name) assert.Equal(t, c.v2Algo, res.compressionAlgorithm.Name()) } + assert.Equal(t, c.v2Annotations, res.compressionAnnotations, c.name) } } } func TestCandidateWithLocation(t *testing.T) { template := CandidateTemplateWithCompression(&blobinfocache.CandidateLocations2Options{}, digestCompressedPrimary, blobinfocache.DigestCompressorData{ - BaseVariantCompressor: compressiontypes.ZstdAlgorithmName, + BaseVariantCompressor: compressiontypes.ZstdAlgorithmName, + SpecificVariantCompressor: compressiontypes.ZstdChunkedAlgorithmName, + SpecificVariantAnnotations: map[string]string{"a": "b"}, }) require.NotNil(t, template) loc := types.BICLocationReference{Opaque: "opaque"} @@ -186,7 +257,8 @@ func TestCandidateWithLocation(t *testing.T) { res := template.CandidateWithLocation(loc, time) assert.Equal(t, digestCompressedPrimary, res.candidate.Digest) assert.Equal(t, types.Compress, res.candidate.CompressionOperation) - assert.Equal(t, compressiontypes.ZstdAlgorithmName, res.candidate.CompressionAlgorithm.Name()) + assert.Equal(t, compressiontypes.ZstdChunkedAlgorithmName, res.candidate.CompressionAlgorithm.Name()) + assert.Equal(t, map[string]string{"a": "b"}, res.candidate.CompressionAnnotations) assert.Equal(t, false, res.candidate.UnknownLocation) assert.Equal(t, loc, res.candidate.Location) assert.Equal(t, time, res.lastSeen) @@ -194,13 +266,16 @@ func TestCandidateWithLocation(t *testing.T) { func TestCandidateWithUnknownLocation(t *testing.T) { template := CandidateTemplateWithCompression(&blobinfocache.CandidateLocations2Options{}, digestCompressedPrimary, blobinfocache.DigestCompressorData{ - BaseVariantCompressor: compressiontypes.ZstdAlgorithmName, + BaseVariantCompressor: compressiontypes.ZstdAlgorithmName, + SpecificVariantCompressor: compressiontypes.ZstdChunkedAlgorithmName, + SpecificVariantAnnotations: map[string]string{"a": "b"}, }) require.NotNil(t, template) res := template.CandidateWithUnknownLocation() assert.Equal(t, digestCompressedPrimary, res.candidate.Digest) assert.Equal(t, types.Compress, res.candidate.CompressionOperation) - assert.Equal(t, compressiontypes.ZstdAlgorithmName, res.candidate.CompressionAlgorithm.Name()) + assert.Equal(t, compressiontypes.ZstdChunkedAlgorithmName, res.candidate.CompressionAlgorithm.Name()) + assert.Equal(t, map[string]string{"a": "b"}, res.candidate.CompressionAnnotations) assert.Equal(t, true, res.candidate.UnknownLocation) } diff --git a/pkg/blobinfocache/internal/test/test.go b/pkg/blobinfocache/internal/test/test.go index 30b1ab4b12..cb4a4e6b01 100644 --- a/pkg/blobinfocache/internal/test/test.go +++ b/pkg/blobinfocache/internal/test/test.go @@ -25,7 +25,7 @@ const ( compressorNameU = blobinfocache.Uncompressed compressorNameA = compressiontypes.GzipAlgorithmName compressorNameB = compressiontypes.ZstdAlgorithmName - compressorNameCU = compressiontypes.ZstdChunkedAlgorithmName + compressorNameCU = compressiontypes.XzAlgorithmName digestUnknownLocation = digest.Digest("sha256:7777777777777777777777777777777777777777777777777777777777777777") digestFilteringUncompressed = digest.Digest("sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") @@ -150,6 +150,7 @@ func testGenericRecordKnownLocations(t *testing.T, cache blobinfocache.BlobInfoC type candidate struct { d digest.Digest cn string + ca map[string]string lr string } @@ -194,11 +195,12 @@ func assertCandidatesMatch2(t *testing.T, scopeName string, expected []candidate algo = &algo_ } e[i] = blobinfocache.BICReplacementCandidate2{ - Digest: ev.d, - CompressionOperation: op, - CompressionAlgorithm: algo, - UnknownLocation: false, - Location: types.BICLocationReference{Opaque: scopeName + ev.lr}, + Digest: ev.d, + CompressionOperation: op, + CompressionAlgorithm: algo, + CompressionAnnotations: ev.ca, + UnknownLocation: false, + Location: types.BICLocationReference{Opaque: scopeName + ev.lr}, } } assertCandidatesMatch2Native(t, e, actual) @@ -283,14 +285,16 @@ func testGenericCandidateLocations2(t *testing.T, cache blobinfocache.BlobInfoCa {"B", digestCompressedB, compressorNameB}, {"CU", digestCompressedUnrelated, compressorNameCU}, } + chunkedAnnotations := map[string]string{"a": "b"} digestNameSetFiltering := []struct { // Used primarily to test filtering in CandidateLocations2Options - n string - d digest.Digest - m string + n string + d digest.Digest + base, specific string + annotations map[string]string }{ - {"gzip", digestGzip, compressiontypes.GzipAlgorithmName}, - {"zstd", digestZstd, compressiontypes.ZstdAlgorithmName}, - {"zstdChunked", digestZstdChunked, compressiontypes.ZstdChunkedAlgorithmName}, + {"gzip", digestGzip, compressiontypes.GzipAlgorithmName, blobinfocache.UnknownCompression, nil}, + {"zstd", digestZstd, compressiontypes.ZstdAlgorithmName, blobinfocache.UnknownCompression, nil}, + {"zstdChunked", digestZstdChunked, compressiontypes.ZstdAlgorithmName, compressiontypes.ZstdChunkedAlgorithmName, chunkedAnnotations}, } for _, e := range digestNameSetFiltering { // digestFilteringUncompressed exists only to allow the three entries to be considered as candidates cache.RecordDigestUncompressedPair(e.d, digestFilteringUncompressed) @@ -315,7 +319,9 @@ func testGenericCandidateLocations2(t *testing.T, cache blobinfocache.BlobInfoCa // If a record exists with compression without Location then // then return a record without location and with `UnknownLocation: true` cache.RecordDigestCompressorData(digestUnknownLocation, blobinfocache.DigestCompressorData{ - BaseVariantCompressor: compressiontypes.Bzip2AlgorithmName, + BaseVariantCompressor: compressiontypes.Bzip2AlgorithmName, + SpecificVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantAnnotations: nil, }) res = cache.CandidateLocations2(transport, scope, digestUnknownLocation, blobinfocache.CandidateLocations2Options{ CanSubstitute: true, @@ -358,7 +364,9 @@ func testGenericCandidateLocations2(t *testing.T, cache blobinfocache.BlobInfoCa if scopeIndex != 0 { for _, e := range digestNameSetPrioritization { cache.RecordDigestCompressorData(e.d, blobinfocache.DigestCompressorData{ - BaseVariantCompressor: blobinfocache.UnknownCompression, + BaseVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantAnnotations: nil, }) } } @@ -422,13 +430,15 @@ func testGenericCandidateLocations2(t *testing.T, cache blobinfocache.BlobInfoCa }) assertCandidatesMatch2(t, scopeName, []candidate{}, res) - // Tests of lookups / prioritization when compression is unknown + // Tests of lookups / prioritization when compression is known // ------------------------------------------------------------- // Set the "known" compression values for _, e := range digestNameSetPrioritization { cache.RecordDigestCompressorData(e.d, blobinfocache.DigestCompressorData{ - BaseVariantCompressor: e.m, + BaseVariantCompressor: e.m, + SpecificVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantAnnotations: nil, }) } @@ -512,7 +522,9 @@ func testGenericCandidateLocations2(t *testing.T, cache blobinfocache.BlobInfoCa } for _, e := range digestNameSetFiltering { cache.RecordDigestCompressorData(e.d, blobinfocache.DigestCompressorData{ - BaseVariantCompressor: e.m, + BaseVariantCompressor: e.base, + SpecificVariantCompressor: e.specific, + SpecificVariantAnnotations: e.annotations, }) } @@ -521,9 +533,9 @@ func testGenericCandidateLocations2(t *testing.T, cache blobinfocache.BlobInfoCa CanSubstitute: true, }) assertCandidatesMatch2(t, scopeName, []candidate{ // Sorted in the reverse of digestNameSetFiltering order - {d: digestZstdChunked, cn: compressiontypes.ZstdChunkedAlgorithmName, lr: "zstdChunked"}, - {d: digestZstd, cn: compressiontypes.ZstdAlgorithmName, lr: "zstd"}, - {d: digestGzip, cn: compressiontypes.GzipAlgorithmName, lr: "gzip"}, + {d: digestZstdChunked, cn: compressiontypes.ZstdChunkedAlgorithmName, ca: chunkedAnnotations, lr: "zstdChunked"}, + {d: digestZstd, cn: compressiontypes.ZstdAlgorithmName, ca: nil, lr: "zstd"}, + {d: digestGzip, cn: compressiontypes.GzipAlgorithmName, ca: nil, lr: "gzip"}, }, res) // Manifest format filters @@ -532,16 +544,16 @@ func testGenericCandidateLocations2(t *testing.T, cache blobinfocache.BlobInfoCa PossibleManifestFormats: []string{manifest.DockerV2Schema2MediaType}, }) assertCandidatesMatch2(t, scopeName, []candidate{ - {d: digestGzip, cn: compressiontypes.GzipAlgorithmName, lr: "gzip"}, + {d: digestGzip, cn: compressiontypes.GzipAlgorithmName, ca: nil, lr: "gzip"}, }, res) res = cache.CandidateLocations2(transport, scope, digestFilteringUncompressed, blobinfocache.CandidateLocations2Options{ CanSubstitute: true, PossibleManifestFormats: []string{imgspecv1.MediaTypeImageManifest}, }) assertCandidatesMatch2(t, scopeName, []candidate{ // Sorted in the reverse of digestNameSetFiltering order - {d: digestZstdChunked, cn: compressiontypes.ZstdChunkedAlgorithmName, lr: "zstdChunked"}, - {d: digestZstd, cn: compressiontypes.ZstdAlgorithmName, lr: "zstd"}, - {d: digestGzip, cn: compressiontypes.GzipAlgorithmName, lr: "gzip"}, + {d: digestZstdChunked, cn: compressiontypes.ZstdChunkedAlgorithmName, ca: chunkedAnnotations, lr: "zstdChunked"}, + {d: digestZstd, cn: compressiontypes.ZstdAlgorithmName, ca: nil, lr: "zstd"}, + {d: digestGzip, cn: compressiontypes.GzipAlgorithmName, ca: nil, lr: "gzip"}, }, res) // Compression algorithm filters @@ -550,21 +562,37 @@ func testGenericCandidateLocations2(t *testing.T, cache blobinfocache.BlobInfoCa RequiredCompression: &compression.Gzip, }) assertCandidatesMatch2(t, scopeName, []candidate{ - {d: digestGzip, cn: compressiontypes.GzipAlgorithmName, lr: "gzip"}, + {d: digestGzip, cn: compressiontypes.GzipAlgorithmName, ca: nil, lr: "gzip"}, }, res) res = cache.CandidateLocations2(transport, scope, digestFilteringUncompressed, blobinfocache.CandidateLocations2Options{ CanSubstitute: true, RequiredCompression: &compression.ZstdChunked, }) - // Right now, zstd:chunked requests never match a candidate, see CandidateCompressionMatchesReuseConditions(). - assertCandidatesMatch2(t, scopeName, []candidate{}, res) + assertCandidatesMatch2(t, scopeName, []candidate{ + {d: digestZstdChunked, cn: compressiontypes.ZstdChunkedAlgorithmName, ca: chunkedAnnotations, lr: "zstdChunked"}, + }, res) res = cache.CandidateLocations2(transport, scope, digestFilteringUncompressed, blobinfocache.CandidateLocations2Options{ CanSubstitute: true, RequiredCompression: &compression.Zstd, }) - assertCandidatesMatch2(t, scopeName, []candidate{ // When the user asks for zstd, zstd:chunked candidates are also acceptable. - {d: digestZstdChunked, cn: compressiontypes.ZstdChunkedAlgorithmName, lr: "zstdChunked"}, - {d: digestZstd, cn: compressiontypes.ZstdAlgorithmName, lr: "zstd"}, + assertCandidatesMatch2(t, scopeName, []candidate{ // When the user asks for zstd, zstd:chunked candidates are also acceptable, and include the chunked information. + {d: digestZstdChunked, cn: compressiontypes.ZstdChunkedAlgorithmName, ca: chunkedAnnotations, lr: "zstdChunked"}, + {d: digestZstd, cn: compressiontypes.ZstdAlgorithmName, ca: nil, lr: "zstd"}, + }, res) + + // After RecordDigestCompressorData with zstd:chunked details, a later call with zstd-only does not drop the chunked details. + cache.RecordDigestCompressorData(digestZstdChunked, blobinfocache.DigestCompressorData{ + BaseVariantCompressor: compressiontypes.ZstdAlgorithmName, + SpecificVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantAnnotations: nil, + }) + res = cache.CandidateLocations2(transport, scope, digestFilteringUncompressed, blobinfocache.CandidateLocations2Options{ + CanSubstitute: true, + }) + assertCandidatesMatch2(t, scopeName, []candidate{ // Sorted in the reverse of digestNameSetFiltering order + {d: digestZstdChunked, cn: compressiontypes.ZstdChunkedAlgorithmName, ca: chunkedAnnotations, lr: "zstdChunked"}, + {d: digestZstd, cn: compressiontypes.ZstdAlgorithmName, ca: nil, lr: "zstd"}, + {d: digestGzip, cn: compressiontypes.GzipAlgorithmName, ca: nil, lr: "gzip"}, }, res) } } diff --git a/pkg/blobinfocache/memory/memory.go b/pkg/blobinfocache/memory/memory.go index 067c6b7e11..9d4125d664 100644 --- a/pkg/blobinfocache/memory/memory.go +++ b/pkg/blobinfocache/memory/memory.go @@ -28,7 +28,7 @@ type cache struct { uncompressedDigestsByTOC map[digest.Digest]digest.Digest digestsByUncompressed map[digest.Digest]*set.Set[digest.Digest] // stores a set of digests for each uncompressed digest knownLocations map[locationKey]map[types.BICLocationReference]time.Time // stores last known existence time for each location reference - compressors map[digest.Digest]string // stores a compressor name, or blobinfocache.Uncompressed (not blobinfocache.UnknownCompression), for each digest + compressors map[digest.Digest]blobinfocache.DigestCompressorData // stores compression data for each digest; BaseVariantCompressor != UnknownCompression } // New returns a BlobInfoCache implementation which is in-memory only. @@ -49,7 +49,7 @@ func new2() *cache { uncompressedDigestsByTOC: map[digest.Digest]digest.Digest{}, digestsByUncompressed: map[digest.Digest]*set.Set[digest.Digest]{}, knownLocations: map[locationKey]map[types.BICLocationReference]time.Time{}, - compressors: map[digest.Digest]string{}, + compressors: map[digest.Digest]blobinfocache.DigestCompressorData{}, } } @@ -148,20 +148,36 @@ func (mem *cache) RecordKnownLocation(transport types.ImageTransport, scope type // WARNING: Only call this with LOCALLY VERIFIED data: // - don’t record a compressor for a digest just because some remote author claims so // (e.g. because a manifest says so); +// - don’t record the non-base variant or annotations if we are not _sure_ that the base variant +// and the blob’s digest match the non-base variant’s annotations (e.g. because we saw them +// in a manifest) // // otherwise the cache could be poisoned and cause us to make incorrect edits to type // information in a manifest. func (mem *cache) RecordDigestCompressorData(anyDigest digest.Digest, data blobinfocache.DigestCompressorData) { mem.mutex.Lock() defer mem.mutex.Unlock() - if previous, ok := mem.compressors[anyDigest]; ok && previous != data.BaseVariantCompressor { - logrus.Warnf("Compressor for blob with digest %s previously recorded as %s, now %s", anyDigest, previous, data.BaseVariantCompressor) + if previous, ok := mem.compressors[anyDigest]; ok { + if previous.BaseVariantCompressor != data.BaseVariantCompressor { + logrus.Warnf("Base compressor for blob with digest %s previously recorded as %s, now %s", anyDigest, previous.BaseVariantCompressor, data.BaseVariantCompressor) + } else if previous.SpecificVariantCompressor != blobinfocache.UnknownCompression && data.SpecificVariantCompressor != blobinfocache.UnknownCompression && + previous.SpecificVariantCompressor != data.SpecificVariantCompressor { + logrus.Warnf("Specific compressor for blob with digest %s previously recorded as %s, now %s", anyDigest, previous.SpecificVariantCompressor, data.SpecificVariantCompressor) + } + // We don’t check SpecificVariantAnnotations for equality, it’s possible that their generation is not deterministic. + + // Preserve specific variant information if the incoming data does not have it. + if data.BaseVariantCompressor != blobinfocache.UnknownCompression && data.SpecificVariantCompressor == blobinfocache.UnknownCompression && + previous.SpecificVariantCompressor != blobinfocache.UnknownCompression { + data.SpecificVariantCompressor = previous.SpecificVariantCompressor + data.SpecificVariantAnnotations = previous.SpecificVariantAnnotations + } } if data.BaseVariantCompressor == blobinfocache.UnknownCompression { delete(mem.compressors, anyDigest) return } - mem.compressors[anyDigest] = data.BaseVariantCompressor + mem.compressors[anyDigest] = data } // appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in memory @@ -171,13 +187,15 @@ func (mem *cache) RecordDigestCompressorData(anyDigest digest.Digest, data blobi // with unknown compression. func (mem *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, v2Options *blobinfocache.CandidateLocations2Options) []prioritize.CandidateWithTime { - compressorName := blobinfocache.UnknownCompression + compressionData := blobinfocache.DigestCompressorData{ + BaseVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantAnnotations: nil, + } if v, ok := mem.compressors[digest]; ok { - compressorName = v + compressionData = v } - template := prioritize.CandidateTemplateWithCompression(v2Options, digest, blobinfocache.DigestCompressorData{ - BaseVariantCompressor: compressorName, - }) + template := prioritize.CandidateTemplateWithCompression(v2Options, digest, compressionData) if template == nil { return candidates } diff --git a/pkg/blobinfocache/sqlite/sqlite.go b/pkg/blobinfocache/sqlite/sqlite.go index 8d2bf72898..1a79310239 100644 --- a/pkg/blobinfocache/sqlite/sqlite.go +++ b/pkg/blobinfocache/sqlite/sqlite.go @@ -3,6 +3,7 @@ package sqlite import ( "database/sql" + "encoding/json" "errors" "fmt" "sync" @@ -303,6 +304,16 @@ func ensureDBHasCurrentSchema(db *sql.DB) error { `uncompressedDigest TEXT NOT NULL )`, }, + { + "DigestSpecificVariantCompressors", // If changing the schema incompatibly, merge this with DigestCompressors. + `CREATE TABLE IF NOT EXISTS DigestSpecificVariantCompressors(` + + // index implied by PRIMARY KEY + `digest TEXT PRIMARY KEY NOT NULL,` + + // The compressor is not `UnknownCompression`. + `specificVariantCompressor TEXT NOT NULL, + specificVariantAnnotations BLOB NOT NULL + )`, + }, } _, err := dbTransaction(db, func(tx *sql.Tx) (void, error) { @@ -461,6 +472,9 @@ func (sqc *cache) RecordKnownLocation(transport types.ImageTransport, scope type // WARNING: Only call this with LOCALLY VERIFIED data: // - don’t record a compressor for a digest just because some remote author claims so // (e.g. because a manifest says so); +// - don’t record the non-base variant or annotations if we are not _sure_ that the base variant +// and the blob’s digest match the non-base variant’s annotations (e.g. because we saw them +// in a manifest) // // otherwise the cache could be poisoned and cause us to make incorrect edits to type // information in a manifest. @@ -468,21 +482,46 @@ func (sqc *cache) RecordDigestCompressorData(anyDigest digest.Digest, data blobi _, _ = transaction(sqc, func(tx *sql.Tx) (void, error) { previous, gotPrevious, err := querySingleValue[string](tx, "SELECT compressor FROM DigestCompressors WHERE digest = ?", anyDigest.String()) if err != nil { - return void{}, fmt.Errorf("looking for compressor of for %q", anyDigest) + return void{}, fmt.Errorf("looking for compressor of %q", anyDigest) } + warned := false if gotPrevious && previous != data.BaseVariantCompressor { logrus.Warnf("Compressor for blob with digest %s previously recorded as %s, now %s", anyDigest, previous, data.BaseVariantCompressor) + warned = true } if data.BaseVariantCompressor == blobinfocache.UnknownCompression { if _, err := tx.Exec("DELETE FROM DigestCompressors WHERE digest = ?", anyDigest.String()); err != nil { return void{}, fmt.Errorf("deleting compressor for digest %q: %w", anyDigest, err) } + if _, err := tx.Exec("DELETE FROM DigestSpecificVariantCompressors WHERE digest = ?", anyDigest.String()); err != nil { + return void{}, fmt.Errorf("deleting specific variant compressor for digest %q: %w", anyDigest, err) + } } else { if _, err := tx.Exec("INSERT OR REPLACE INTO DigestCompressors(digest, compressor) VALUES (?, ?)", anyDigest.String(), data.BaseVariantCompressor); err != nil { return void{}, fmt.Errorf("recording compressor %q for %q: %w", data.BaseVariantCompressor, anyDigest, err) } } + + if data.SpecificVariantCompressor != blobinfocache.UnknownCompression { + if !warned { // Don’t warn twice about the same digest + prevSVC, found, err := querySingleValue[string](tx, "SELECT specificVariantCompressor FROM DigestSpecificVariantCompressors WHERE digest = ?", anyDigest.String()) + if err != nil { + return void{}, fmt.Errorf("looking for specific variant compressor of %q", anyDigest) + } + if found && data.SpecificVariantCompressor != prevSVC { + logrus.Warnf("Specific compressor for blob with digest %s previously recorded as %s, now %s", anyDigest, prevSVC, data.SpecificVariantCompressor) + } + } + annotations, err := json.Marshal(data.SpecificVariantAnnotations) + if err != nil { + return void{}, err + } + if _, err := tx.Exec("INSERT OR REPLACE INTO DigestSpecificVariantCompressors(digest, specificVariantCompressor, specificVariantAnnotations) VALUES (?, ?, ?)", + anyDigest.String(), data.SpecificVariantCompressor, annotations); err != nil { + return void{}, fmt.Errorf("recording specific variant compressor %q/%q for %q: %w", data.SpecificVariantCompressor, annotations, anyDigest, err) + } + } return void{}, nil }) // FIXME? Log error (but throttle the log volume on repeated accesses)? } @@ -493,19 +532,32 @@ func (sqc *cache) RecordDigestCompressorData(anyDigest digest.Digest, data blobi // with unknown compression. func (sqc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, tx *sql.Tx, transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, v2Options *blobinfocache.CandidateLocations2Options) ([]prioritize.CandidateWithTime, error) { - compressorName := blobinfocache.UnknownCompression + compressionData := blobinfocache.DigestCompressorData{ + BaseVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantCompressor: blobinfocache.UnknownCompression, + SpecificVariantAnnotations: nil, + } if v2Options != nil { - compressor, found, err := querySingleValue[string](tx, "SELECT compressor FROM DigestCompressors WHERE digest = ?", digest.String()) - if err != nil { - return nil, fmt.Errorf("scanning compressorName: %w", err) - } - if found { - compressorName = compressor + var baseVariantCompressor string + var specificVariantCompressor sql.NullString + var annotationBytes []byte + switch err := tx.QueryRow("SELECT compressor, specificVariantCompressor, specificVariantAnnotations "+ + "FROM DigestCompressors LEFT JOIN DigestSpecificVariantCompressors USING (digest) WHERE digest = ?", digest.String()). + Scan(&baseVariantCompressor, &specificVariantCompressor, &annotationBytes); { + case errors.Is(err, sql.ErrNoRows): // Do nothing + case err != nil: + return nil, fmt.Errorf("scanning compressor data: %w", err) + default: + compressionData.BaseVariantCompressor = baseVariantCompressor + if specificVariantCompressor.Valid && annotationBytes != nil { + compressionData.SpecificVariantCompressor = specificVariantCompressor.String + if err := json.Unmarshal(annotationBytes, &compressionData.SpecificVariantAnnotations); err != nil { + return nil, err + } + } } } - template := prioritize.CandidateTemplateWithCompression(v2Options, digest, blobinfocache.DigestCompressorData{ - BaseVariantCompressor: compressorName, - }) + template := prioritize.CandidateTemplateWithCompression(v2Options, digest, compressionData) if template == nil { return candidates, nil } @@ -561,40 +613,41 @@ func (sqc *cache) candidateLocations(transport types.ImageTransport, scope types if err != nil { return nil, err } - - // FIXME? We could integrate this with appendReplacementCandidates into a single join instead of N+1 queries. - // (In the extreme, we could turn _everything_ this function does into a single query. - // And going even further, even DestructivelyPrioritizeReplacementCandidates could be turned into SQL.) - // For now, we prioritize simplicity, and sharing both code and implementation structure with the other cache implementations. - rows, err := tx.Query("SELECT anyDigest FROM DigestUncompressedPairs WHERE uncompressedDigest = ?", uncompressedDigest.String()) - if err != nil { - return nil, fmt.Errorf("querying for other digests: %w", err) - } - defer rows.Close() - for rows.Next() { - var otherDigestString string - if err := rows.Scan(&otherDigestString); err != nil { - return nil, fmt.Errorf("scanning other digest: %w", err) - } - otherDigest, err := digest.Parse(otherDigestString) + if uncompressedDigest != "" { + // FIXME? We could integrate this with appendReplacementCandidates into a single join instead of N+1 queries. + // (In the extreme, we could turn _everything_ this function does into a single query. + // And going even further, even DestructivelyPrioritizeReplacementCandidates could be turned into SQL.) + // For now, we prioritize simplicity, and sharing both code and implementation structure with the other cache implementations. + rows, err := tx.Query("SELECT anyDigest FROM DigestUncompressedPairs WHERE uncompressedDigest = ?", uncompressedDigest.String()) if err != nil { - return nil, err + return nil, fmt.Errorf("querying for other digests: %w", err) } - if otherDigest != primaryDigest && otherDigest != uncompressedDigest { - res, err = sqc.appendReplacementCandidates(res, tx, transport, scope, otherDigest, v2Options) + defer rows.Close() + for rows.Next() { + var otherDigestString string + if err := rows.Scan(&otherDigestString); err != nil { + return nil, fmt.Errorf("scanning other digest: %w", err) + } + otherDigest, err := digest.Parse(otherDigestString) if err != nil { return nil, err } + if otherDigest != primaryDigest && otherDigest != uncompressedDigest { + res, err = sqc.appendReplacementCandidates(res, tx, transport, scope, otherDigest, v2Options) + if err != nil { + return nil, err + } + } + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterating through other digests: %w", err) } - } - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("iterating through other digests: %w", err) - } - if uncompressedDigest != primaryDigest { - res, err = sqc.appendReplacementCandidates(res, tx, transport, scope, uncompressedDigest, v2Options) - if err != nil { - return nil, err + if uncompressedDigest != primaryDigest { + res, err = sqc.appendReplacementCandidates(res, tx, transport, scope, uncompressedDigest, v2Options) + if err != nil { + return nil, err + } } } }