From fa48dadb75f95a77ddc51cce89ef098717a47060 Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Tue, 12 Mar 2024 13:20:48 +0100 Subject: [PATCH] computer checksums for s3 uploads to support object locking --- .../instaclustr/esop/impl/hash/HashSpec.java | 16 +++++- .../esop/s3/v2/BaseS3Backuper.java | 53 +++++++++++++++++-- 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/instaclustr/esop/impl/hash/HashSpec.java b/src/main/java/com/instaclustr/esop/impl/hash/HashSpec.java index 27b038c..698c54f 100644 --- a/src/main/java/com/instaclustr/esop/impl/hash/HashSpec.java +++ b/src/main/java/com/instaclustr/esop/impl/hash/HashSpec.java @@ -3,6 +3,7 @@ import java.io.InputStream; import java.security.MessageDigest; import java.util.Arrays; +import java.util.Base64; import java.util.function.Supplier; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -42,6 +43,8 @@ public HashAlgorithm convert(final String value) { public interface Hasher { String getHash(InputStream is) throws Exception; + + String getHash(byte[] digest) throws Exception; } private static class SHAHasher implements Hasher { @@ -66,10 +69,14 @@ public String getHash(InputStream is) throws Exception byte[] bytes = digest.digest(); + return getHash(bytes); + } + + @Override + public String getHash(byte[] digest) throws Exception { final StringBuilder sb = new StringBuilder(); - //This bytes[] has bytes in decimal format, convert it to hexadecimal format - for (final byte aByte : bytes) { + for (final byte aByte : digest) { sb.append(Integer.toString((aByte & 0xff) + 0x100, 16).substring(1)); } @@ -92,6 +99,11 @@ public String getHash(InputStream is) throws Exception return Long.toString(checksum.getValue()); } + + @Override + public String getHash(byte[] digest) throws Exception { + throw new UnsupportedOperationException(); + } } public enum HashAlgorithm { diff --git a/src/main/java/com/instaclustr/esop/s3/v2/BaseS3Backuper.java b/src/main/java/com/instaclustr/esop/s3/v2/BaseS3Backuper.java index c4e3d73..57603af 100644 --- a/src/main/java/com/instaclustr/esop/s3/v2/BaseS3Backuper.java +++ b/src/main/java/com/instaclustr/esop/s3/v2/BaseS3Backuper.java @@ -4,12 +4,15 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.file.Path; +import java.security.MessageDigest; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Base64; import java.util.List; import java.util.stream.Collectors; +import com.instaclustr.esop.impl.hash.HashSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +29,7 @@ import software.amazon.awssdk.core.waiters.WaiterOverrideConfiguration; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; @@ -39,10 +43,11 @@ import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest; import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse; import software.amazon.awssdk.services.s3.model.MultipartUpload; -import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.awssdk.services.s3.model.NoSuchUploadException; import software.amazon.awssdk.services.s3.model.ObjectAttributes; import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectTaggingRequest; +import software.amazon.awssdk.services.s3.model.PutObjectTaggingResponse; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.SdkPartType; import software.amazon.awssdk.services.s3.model.StorageClass; @@ -177,7 +182,9 @@ public void uploadText(String text, RemoteObjectReference objectReference) throw byte[] bytes = text.getBytes(UTF_8); s3Clients.getNonEncryptingClient() - .putObject(getPutObjectRequest(objectReference, bytes.length), + .putObject(getPutObjectRequest(objectReference, + bytes.length, + getDigest(prepareMessageDigest().digest(bytes))), RequestBody.fromBytes(bytes)); } @@ -192,18 +199,22 @@ public void uploadEncryptedText(String plainText, RemoteObjectReference objectRe byte[] bytes = plainText.getBytes(UTF_8); s3Clients.getEncryptingClient().get() - .putObject(getPutObjectRequest(objectReference, bytes.length), + .putObject(getPutObjectRequest(objectReference, + bytes.length, + getDigest(prepareMessageDigest().digest(bytes))), RequestBody.fromBytes(bytes)); } private PutObjectRequest getPutObjectRequest(RemoteObjectReference s3RemoteObjectReference, long unencryptedSize, + String checksumSHA256, Tag... tags) { return PutObjectRequest.builder() .bucket(request.storageLocation.bucket) .key(s3RemoteObjectReference.canonicalPath) .storageClass(StorageClass.STANDARD_IA) .tagging(Tagging.builder().tagSet(tags).build()) + .checksumSHA256(checksumSHA256) .build(); } @@ -217,12 +228,15 @@ private void uploadFile(S3Client s3Client, .bucket(request.storageLocation.bucket) .key(objectReference.canonicalPath) .tagging(tagging) + .checksumAlgorithm(ChecksumAlgorithm.SHA256) .build(); CreateMultipartUploadResponse multipartUploadResponse = s3Client.createMultipartUpload(multipartUploadRequest); String uploadId = multipartUploadResponse.uploadId(); + MessageDigest sha256 = prepareMessageDigest(); + try { long partSize = Long.parseLong(System.getProperty("upload.max.part.size", Long.toString(100 * 1024 * 1024))); @@ -244,6 +258,7 @@ private void uploadFile(S3Client s3Client, .key(objectReference.canonicalPath) .uploadId(uploadId) .partNumber(partNumber) + .checksumAlgorithm(ChecksumAlgorithm.SHA256) .sdkPartType(partNumber == numberOfParts ? SdkPartType.LAST : SdkPartType.DEFAULT) .build(); @@ -253,7 +268,10 @@ private void uploadFile(S3Client s3Client, completedParts.add(CompletedPart.builder() .partNumber(partNumber) .eTag(partResponse.eTag()) + .checksumSHA256(partResponse.checksumSHA256()) .build()); + + sha256.update(byteBuffer); } // Complete the multipart upload @@ -284,6 +302,22 @@ private void uploadFile(S3Client s3Client, logger.debug("Object under key " + objectReference.canonicalPath + " exists"); + Tag checksumTag = Tag.builder() + .key("fullObjectChecksum") + .value(HashSpec.HashAlgorithm.SHA_256.getHasher().getHash(sha256.digest())) + .build(); + + PutObjectTaggingResponse putObjectTaggingResponse = s3Client.putObjectTagging(PutObjectTaggingRequest.builder() + .bucket(request.storageLocation.bucket) + .key(objectReference.canonicalPath) + .tagging(Tagging.builder().tagSet(checksumTag).build()).build()); + + if (!putObjectTaggingResponse.sdkHttpResponse().isSuccessful()) { + throw new RuntimeException(String.format("Unsuccessful tagging of %s with checksum, upload id %s", objectReference.canonicalPath, uploadId)); + } else { + logger.debug("Tagged {} with {}", objectReference.canonicalPath, checksumTag.toString()); + } + if (s3Clients.hasEncryptingClient()) { try { GetObjectAttributesResponse objectAttributes = s3Clients.getNonEncryptingClient() @@ -295,6 +329,7 @@ private void uploadFile(S3Client s3Client, .build()); manifestEntry.size = objectAttributes.objectSize(); + manifestEntry.hash = Base64.getEncoder().encodeToString(sha256.digest()); } catch (Throwable t) { logger.warn("Unable to get attribute {} for key {} by GetObjectAttributes request. Please check your permissions.", @@ -368,4 +403,16 @@ public void abortMultipartUpload(String uploadId, } } } + + private static MessageDigest prepareMessageDigest() { + try { + return MessageDigest.getInstance("SHA-256"); + } catch (Throwable t) { + throw new IllegalStateException("Unable to get instance of SHA-256 message digest"); + } + } + + private String getDigest(byte[] digest) { + return Base64.getEncoder().encodeToString(digest); + } } \ No newline at end of file