diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 5e3bdf5269cf4..0fb0c53828da3 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -177,15 +177,7 @@ public long readBlobPreferredLength() { */ @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests"; - SocketAccess.doPrivilegedIOException(() -> { - if (blobSize <= getLargeBlobThresholdInBytes()) { - executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, null); - } else { - executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, null); - } - return null; - }); + writeBlobWithMetadata(blobName, inputStream, null, blobSize, failIfAlreadyExists); } /** @@ -196,7 +188,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b public void writeBlobWithMetadata( String blobName, InputStream inputStream, - Map metadata, + @Nullable Map metadata, long blobSize, boolean failIfAlreadyExists ) throws IOException { @@ -611,7 +603,7 @@ void executeSingleUpload( .acl(blobStore.getCannedACL()) .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher)); - if (metadata != null) { + if (metadata != null && !metadata.isEmpty()) { putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata); } if (blobStore.serverSideEncryption()) { @@ -668,7 +660,7 @@ void executeMultipartUpload( .acl(blobStore.getCannedACL()) .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector)); - if (metadata != null) { + if (metadata != null && !metadata.isEmpty()) { createMultipartUploadRequestBuilder.metadata(metadata); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java index 9459a9ab9d88b..2eb63178b19e0 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java @@ -269,7 +269,7 @@ boolean isAborted() { return isStreamAborted.get(); } - public Map getMetadata() { + Map getMetadata() { return this.metadata; } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 9d74ecc64f480..007fe77825b2a 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -129,9 +129,12 @@ private void uploadInParts( CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder = CreateMultipartUploadRequest.builder() .bucket(uploadRequest.getBucket()) - .metadata(uploadRequest.getMetadata()) .key(uploadRequest.getKey()) .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)); + + if (uploadRequest.getMetadata() != null && !uploadRequest.getMetadata().isEmpty()) { + createMultipartUploadRequestBuilder.metadata(uploadRequest.getMetadata()); + } if (uploadRequest.doRemoteDataIntegrityCheck()) { createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); } @@ -325,10 +328,13 @@ private void uploadInOneChunk( ) { PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder() .bucket(uploadRequest.getBucket()) - .metadata(uploadRequest.getMetadata()) .key(uploadRequest.getKey()) .contentLength(uploadRequest.getContentLength()) .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher)); + + if (uploadRequest.getMetadata() != null && !uploadRequest.getMetadata().isEmpty()) { + putObjectRequestBuilder.metadata(uploadRequest.getMetadata()); + } if (uploadRequest.doRemoteDataIntegrityCheck()) { putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum())); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java index 079a5de1d3330..b944a72225d36 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java @@ -9,6 +9,7 @@ package org.opensearch.repositories.s3.async; import org.opensearch.common.CheckedConsumer; +import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.stream.write.WritePriority; import java.io.IOException; @@ -49,7 +50,7 @@ public UploadRequest( boolean doRemoteDataIntegrityCheck, Long expectedChecksum, boolean uploadRetryEnabled, - Map metadata + @Nullable Map metadata ) { this.bucket = bucket; this.key = key; diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index e16e75de7f27d..ce1497bbbd2e5 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -32,6 +32,7 @@ package org.opensearch.common.blobstore; +import org.opensearch.common.Nullable; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.core.action.ActionListener; @@ -166,7 +167,7 @@ default long readBlobPreferredLength() { default void writeBlobWithMetadata( String blobName, InputStream inputStream, - Map metadata, + @Nullable Map metadata, long blobSize, boolean failIfAlreadyExists ) throws IOException { @@ -219,7 +220,7 @@ default void writeBlobWithMetadata( default void writeBlobAtomicWithMetadata( String blobName, InputStream inputStream, - Map metadata, + @Nullable Map metadata, long blobSize, boolean failIfAlreadyExists ) throws IOException { diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java index d14800e82e495..9a7fc24f7e484 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -52,7 +52,7 @@ private WriteContext( CheckedConsumer uploadFinalizer, boolean doRemoteDataIntegrityCheck, @Nullable Long expectedChecksum, - Map metadata + @Nullable Map metadata ) { this.fileName = fileName; this.streamContextSupplier = streamContextSupplier;