From babdac8191e9162b47ced80a2e1b39cd2d3882b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Wed, 13 Mar 2024 22:13:30 +0100 Subject: [PATCH] Always record only the base variant information about consumed source blobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ... because we don't trust the TOC data, if any. This allows us to remove the zstd:chunked hack; we, at least, now record those blobs as zstd. Signed-off-by: Miloslav Trmač --- copy/compression.go | 109 +++++++++++++++++++++++--------------------- 1 file changed, 56 insertions(+), 53 deletions(-) diff --git a/copy/compression.go b/copy/compression.go index e5800aa4d..081c49312 100644 --- a/copy/compression.go +++ b/copy/compression.go @@ -35,10 +35,10 @@ var ( // bpDetectCompressionStepData contains data that the copy pipeline needs about the “detect compression” step. type bpDetectCompressionStepData struct { - isCompressed bool - format compressiontypes.Algorithm // Valid if isCompressed - decompressor compressiontypes.DecompressorFunc // Valid if isCompressed - srcCompressorName string // Compressor name to possibly record in the blob info cache for the source blob. + isCompressed bool + format compressiontypes.Algorithm // Valid if isCompressed + decompressor compressiontypes.DecompressorFunc // Valid if isCompressed + srcCompressorBaseVariantName string // Compressor name to possibly record in the blob info cache for the source blob. } // blobPipelineDetectCompressionStep updates *stream to detect its current compression format. @@ -58,9 +58,9 @@ func blobPipelineDetectCompressionStep(stream *sourceStream, srcInfo types.BlobI decompressor: decompressor, } if res.isCompressed { - res.srcCompressorName = format.Name() + res.srcCompressorBaseVariantName = format.BaseVariantName() } else { - res.srcCompressorName = internalblobinfocache.Uncompressed + res.srcCompressorBaseVariantName = internalblobinfocache.Uncompressed } if expectedBaseFormat, known := expectedBaseCompressionFormats[stream.info.MediaType]; known && res.isCompressed && format.BaseVariantName() != expectedBaseFormat.Name() { @@ -71,13 +71,13 @@ func blobPipelineDetectCompressionStep(stream *sourceStream, srcInfo types.BlobI // bpCompressionStepData contains data that the copy pipeline needs about the compression step. type bpCompressionStepData struct { - operation bpcOperation // What we are actually doing - uploadedOperation types.LayerCompression // Operation to use for updating the blob metadata (matching the end state, not necessarily what we do) - uploadedAlgorithm *compressiontypes.Algorithm // An algorithm parameter for the compressionOperation edits. - uploadedAnnotations map[string]string // Compression-related annotations that should be set on the uploaded blob. WARNING: This is only set after the srcStream.reader is fully consumed. - srcCompressorName string // Compressor name to record in the blob info cache for the source blob. - uploadedCompressorName string // Compressor name to record in the blob info cache for the uploaded blob. - closers []io.Closer // Objects to close after the upload is done, if any. + operation bpcOperation // What we are actually doing + uploadedOperation types.LayerCompression // Operation to use for updating the blob metadata (matching the end state, not necessarily what we do) + uploadedAlgorithm *compressiontypes.Algorithm // An algorithm parameter for the compressionOperation edits. + uploadedAnnotations map[string]string // Compression-related annotations that should be set on the uploaded blob. WARNING: This is only set after the srcStream.reader is fully consumed. + srcCompressorBaseVariantName string // Compressor base variant name to record in the blob info cache for the source blob. + uploadedCompressorName string // Compressor name to record in the blob info cache for the uploaded blob. + closers []io.Closer // Objects to close after the upload is done, if any. } type bpcOperation int @@ -129,11 +129,11 @@ func (ic *imageCopier) bpcPreserveEncrypted(stream *sourceStream, _ bpDetectComp // We can’t do anything with an encrypted blob unless decrypted. logrus.Debugf("Using original blob without modification for encrypted blob") return &bpCompressionStepData{ - operation: bpcOpPreserveOpaque, - uploadedOperation: types.PreserveOriginal, - uploadedAlgorithm: nil, - srcCompressorName: internalblobinfocache.UnknownCompression, - uploadedCompressorName: internalblobinfocache.UnknownCompression, + operation: bpcOpPreserveOpaque, + uploadedOperation: types.PreserveOriginal, + uploadedAlgorithm: nil, + srcCompressorBaseVariantName: internalblobinfocache.UnknownCompression, + uploadedCompressorName: internalblobinfocache.UnknownCompression, }, nil } return nil, nil @@ -158,13 +158,13 @@ func (ic *imageCopier) bpcCompressUncompressed(stream *sourceStream, detected bp Size: -1, } return &bpCompressionStepData{ - operation: bpcOpCompressUncompressed, - uploadedOperation: types.Compress, - uploadedAlgorithm: uploadedAlgorithm, - uploadedAnnotations: annotations, - srcCompressorName: detected.srcCompressorName, - uploadedCompressorName: uploadedAlgorithm.Name(), - closers: []io.Closer{reader}, + operation: bpcOpCompressUncompressed, + uploadedOperation: types.Compress, + uploadedAlgorithm: uploadedAlgorithm, + uploadedAnnotations: annotations, + srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName, + uploadedCompressorName: uploadedAlgorithm.Name(), + closers: []io.Closer{reader}, }, nil } return nil, nil @@ -199,13 +199,13 @@ func (ic *imageCopier) bpcRecompressCompressed(stream *sourceStream, detected bp } succeeded = true return &bpCompressionStepData{ - operation: bpcOpRecompressCompressed, - uploadedOperation: types.PreserveOriginal, - uploadedAlgorithm: ic.compressionFormat, - uploadedAnnotations: annotations, - srcCompressorName: detected.srcCompressorName, - uploadedCompressorName: ic.compressionFormat.Name(), - closers: []io.Closer{decompressed, recompressed}, + operation: bpcOpRecompressCompressed, + uploadedOperation: types.PreserveOriginal, + uploadedAlgorithm: ic.compressionFormat, + uploadedAnnotations: annotations, + srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName, + uploadedCompressorName: ic.compressionFormat.Name(), + closers: []io.Closer{decompressed, recompressed}, }, nil } return nil, nil @@ -226,12 +226,12 @@ func (ic *imageCopier) bpcDecompressCompressed(stream *sourceStream, detected bp Size: -1, } return &bpCompressionStepData{ - operation: bpcOpDecompressCompressed, - uploadedOperation: types.Decompress, - uploadedAlgorithm: nil, - srcCompressorName: detected.srcCompressorName, - uploadedCompressorName: internalblobinfocache.Uncompressed, - closers: []io.Closer{s}, + operation: bpcOpDecompressCompressed, + uploadedOperation: types.Decompress, + uploadedAlgorithm: nil, + srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName, + uploadedCompressorName: internalblobinfocache.Uncompressed, + closers: []io.Closer{s}, }, nil } return nil, nil @@ -269,11 +269,14 @@ func (ic *imageCopier) bpcPreserveOriginal(_ *sourceStream, detected bpDetectCom algorithm = nil } return &bpCompressionStepData{ - operation: bpcOp, - uploadedOperation: uploadedOp, - uploadedAlgorithm: algorithm, - srcCompressorName: detected.srcCompressorName, - uploadedCompressorName: detected.srcCompressorName, + operation: bpcOp, + uploadedOperation: uploadedOp, + uploadedAlgorithm: algorithm, + srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName, + // We only record the base variant of the format on upload; we didn’t do anything with + // the TOC, we don’t know whether it matches the blob digest, so we don’t want to trigger + // reuse of any kind between the blob digest and the TOC digest. + uploadedCompressorName: detected.srcCompressorBaseVariantName, } } @@ -333,9 +336,9 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf return fmt.Errorf("Internal error: Unexpected d.operation value %#v", d.operation) } } - if d.srcCompressorName == "" || d.uploadedCompressorName == "" { - return fmt.Errorf("internal error: missing compressor names (src: %q, uploaded: %q)", - d.srcCompressorName, d.uploadedCompressorName) + if d.srcCompressorBaseVariantName == "" || d.uploadedCompressorName == "" { + return fmt.Errorf("internal error: missing compressor names (src base: %q, uploaded: %q)", + d.srcCompressorBaseVariantName, d.uploadedCompressorName) } if d.uploadedCompressorName != internalblobinfocache.UnknownCompression { if d.uploadedCompressorName != compressiontypes.ZstdChunkedAlgorithmName { @@ -353,13 +356,13 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf } } if srcInfo.Digest != "" && srcInfo.Digest != uploadedInfo.Digest && - d.srcCompressorName != internalblobinfocache.UnknownCompression { - if d.srcCompressorName != compressiontypes.ZstdChunkedAlgorithmName { - // HACK: Don’t record zstd:chunked algorithms, see above. - c.blobInfoCache.RecordDigestCompressorData(srcInfo.Digest, internalblobinfocache.DigestCompressorData{ - BaseVariantCompressor: d.srcCompressorName, - }) - } + d.srcCompressorBaseVariantName != internalblobinfocache.UnknownCompression { + // If the source is already using some TOC-dependent variant, we either copied the + // blob as is, or perhaps decompressed it; either way we don’t trust the TOC digest, + // so record neither the variant name, nor the TOC digest. + c.blobInfoCache.RecordDigestCompressorData(srcInfo.Digest, internalblobinfocache.DigestCompressorData{ + BaseVariantCompressor: d.srcCompressorBaseVariantName, + }) } return nil }