diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java index 3870485f03f..ec70b635104 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java @@ -23,12 +23,14 @@ import com.google.common.base.Preconditions; import java.time.Duration; import java.util.UUID; +import javax.annotation.Nullable; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.DataSizeUtils; import org.apache.pinot.spi.utils.TimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.services.s3.model.StorageClass; /** @@ -49,6 +51,8 @@ public class S3Config { public static final String ENDPOINT = "endpoint"; public static final String DISABLE_ACL_CONFIG_KEY = "disableAcl"; + public static final String STORAGE_CLASS = "storageClass"; + // Encryption related configurations public static final String SERVER_SIDE_ENCRYPTION_CONFIG_KEY = "serverSideEncryption"; public static final String SSE_KMS_KEY_ID_CONFIG_KEY = "ssekmsKeyId"; @@ -76,6 +80,7 @@ public class S3Config { private final String _accessKey; private final String _secretKey; private final String _region; + private final String _storageClass; private final boolean _disableAcl; private final String _endpoint; @@ -100,6 +105,14 @@ public S3Config(PinotConfiguration pinotConfig) { _region = pinotConfig.getProperty(REGION); _endpoint = pinotConfig.getProperty(ENDPOINT); + _storageClass = pinotConfig.getProperty(STORAGE_CLASS); + if (_storageClass != null) { + if (StorageClass.fromValue(_storageClass) == StorageClass.UNKNOWN_TO_SDK_VERSION) { + throw new IllegalStateException( + "unknown s3 storage class: " + _storageClass + " - Valid storage classes: " + StorageClass.knownValues()); + } + } + _serverSideEncryption = pinotConfig.getProperty(SERVER_SIDE_ENCRYPTION_CONFIG_KEY); _ssekmsKeyId = pinotConfig.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY); _ssekmsEncryptionContext = pinotConfig.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY); @@ -247,4 +260,9 @@ public long getMultiPartUploadPartSize() { public ApacheHttpClient.Builder getHttpClientBuilder() { return _httpClientBuilder; } + + @Nullable + public String getStorageClass() { + return _storageClass; + } } diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java index 4fc84f3541c..d1129156a92 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java @@ -77,6 +77,7 @@ import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.services.s3.model.ServerSideEncryption; +import software.amazon.awssdk.services.s3.model.StorageClass; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.sts.StsClient; @@ -99,6 +100,7 @@ public class S3PinotFS extends BasePinotFS { private String _ssekmsEncryptionContext; private long _minObjectSizeToUploadInParts; private long _multiPartUploadPartSize; + private @Nullable StorageClass _storageClass; @Override public void init(PinotConfiguration config) { @@ -149,6 +151,12 @@ public void init(PinotConfiguration config) { if (s3Config.getHttpClientBuilder() != null) { s3ClientBuilder.httpClientBuilder(s3Config.getHttpClientBuilder()); } + + if (s3Config.getStorageClass() != null) { + _storageClass = StorageClass.fromValue(s3Config.getStorageClass()); + assert (_storageClass != StorageClass.UNKNOWN_TO_SDK_VERSION); + } + _s3Client = s3ClientBuilder.build(); setMultiPartUploadConfigs(s3Config); } catch (S3Exception e) { @@ -180,6 +188,17 @@ public void init(S3Client s3Client, String serverSideEncryption, PinotConfigurat setDisableAcl(s3Config); } + @VisibleForTesting + void setStorageClass(@Nullable StorageClass storageClass) { + _storageClass = storageClass; + } + + @VisibleForTesting + @Nullable + StorageClass getStorageClass() { + return _storageClass; + } + private void setServerSideEncryption(@Nullable String serverSideEncryption, S3Config s3Config) { if (serverSideEncryption != null) { try { @@ -581,8 +600,13 @@ private void uploadFileInParts(File srcFile, URI dstUri) throws Exception { String bucket = dstUri.getHost(); String prefix = sanitizePath(getBase(dstUri).relativize(dstUri).getPath()); + CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder = CreateMultipartUploadRequest.builder(); + createMultipartUploadRequestBuilder.bucket(bucket).key(prefix); + if (_storageClass != null) { + createMultipartUploadRequestBuilder.storageClass(_storageClass); + } CreateMultipartUploadResponse multipartUpload = - _s3Client.createMultipartUpload(CreateMultipartUploadRequest.builder().bucket(bucket).key(prefix).build()); + _s3Client.createMultipartUpload(createMultipartUploadRequestBuilder.build()); String uploadId = multipartUpload.uploadId(); // Upload parts sequentially to overcome the 5GB limit of a single PutObject call. // TODO: parts can be uploaded in parallel for higher throughput, given a thread pool. @@ -699,6 +723,11 @@ private PutObjectRequest generatePutObjectRequest(URI uri, String path) { putReqBuilder.ssekmsEncryptionContext(_ssekmsEncryptionContext); } } + + if (_storageClass != null) { + putReqBuilder.storageClass(_storageClass); + } + return putReqBuilder.build(); } @@ -706,6 +735,9 @@ private CopyObjectRequest generateCopyObjectRequest(String copySource, URI dest, Map metadata) { CopyObjectRequest.Builder copyReqBuilder = CopyObjectRequest.builder().copySource(copySource).destinationBucket(dest.getHost()).destinationKey(path); + if (_storageClass != null) { + copyReqBuilder.storageClass(_storageClass); + } if (metadata != null) { copyReqBuilder.metadata(metadata).metadataDirective(MetadataDirective.REPLACE); } diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3ConfigTest.java b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3ConfigTest.java index 2df60baa9f6..6d51f11ff38 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3ConfigTest.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3ConfigTest.java @@ -21,6 +21,7 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.testng.Assert; import org.testng.annotations.Test; +import software.amazon.awssdk.services.s3.model.StorageClass; public class S3ConfigTest { @@ -44,4 +45,26 @@ public void testParseDuration() { Assert.assertEquals(S3Config.parseDuration("P1DT2H30S"), S3Config.parseDuration("1d2h30s")); S3Config.parseDuration("10"); } + + @Test + public void testDefaultStorageClassIsNull() { + PinotConfiguration pinotConfig = new PinotConfiguration(); + S3Config cfg = new S3Config(pinotConfig); + Assert.assertNull(cfg.getStorageClass()); + } + + @Test + public void testIntelligentTieringStorageClass() { + PinotConfiguration pinotConfig = new PinotConfiguration(); + pinotConfig.setProperty("storageClass", StorageClass.INTELLIGENT_TIERING.toString()); + S3Config cfg = new S3Config(pinotConfig); + Assert.assertEquals(cfg.getStorageClass(), "INTELLIGENT_TIERING"); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void testInvalidStorageClass() { + PinotConfiguration pinotConfig = new PinotConfiguration(); + pinotConfig.setProperty("storageClass", "invalid-storage-class"); + S3Config cfg = new S3Config(pinotConfig); + } } diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java index 34edc0a6444..18ca80f0046 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java @@ -36,6 +36,8 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; @@ -47,6 +49,7 @@ import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.s3.model.StorageClass; @Test @@ -83,9 +86,14 @@ public void tearDown() FileUtils.deleteQuietly(TEMP_FILE); } + @BeforeMethod + public void beforeMethod() { + _s3PinotFS.setStorageClass(null); + } + private void createEmptyFile(String folderName, String fileName) { String fileNameWithFolder = folderName.length() == 0 ? fileName : folderName + DELIMITER + fileName; - _s3Client.putObject(S3TestUtils.getPutObjectRequest(BUCKET, fileNameWithFolder), + _s3Client.putObject(S3TestUtils.getPutObjectRequest(BUCKET, fileNameWithFolder, _s3PinotFS.getStorageClass()), RequestBody.fromBytes(new byte[0])); } @@ -347,9 +355,12 @@ public void testExists() Assert.assertFalse(fileNotExists); } - @Test - public void testCopyFromAndToLocal() + @Test(dataProvider = "storageClasses") + public void testCopyFromAndToLocal(StorageClass storageClass) throws Exception { + + _s3PinotFS.setStorageClass(storageClass); + String fileName = "copyFile.txt"; File fileToCopy = new File(TEMP_FILE, fileName); File fileToDownload = new File(TEMP_FILE, "copyFile_download.txt").getAbsoluteFile(); @@ -366,9 +377,12 @@ public void testCopyFromAndToLocal() } } - @Test - public void testMultiPartUpload() + @Test(dataProvider = "storageClasses") + public void testMultiPartUpload(StorageClass storageClass) throws Exception { + + _s3PinotFS.setStorageClass(storageClass); + String fileName = "copyFile_for_multipart.txt"; File fileToCopy = new File(TEMP_FILE, fileName); File fileToDownload = new File(TEMP_FILE, "copyFile_download_multipart.txt").getAbsoluteFile(); @@ -399,7 +413,9 @@ public void testOpenFile() String fileName = "sample.txt"; String fileContent = "Hello, World"; - _s3Client.putObject(S3TestUtils.getPutObjectRequest(BUCKET, fileName), RequestBody.fromString(fileContent)); + _s3Client.putObject( + S3TestUtils.getPutObjectRequest(BUCKET, fileName, _s3PinotFS.getStorageClass()), + RequestBody.fromString(fileContent)); InputStream is = _s3PinotFS.open(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName))); String actualContents = IOUtils.toString(is, StandardCharsets.UTF_8); @@ -418,25 +434,28 @@ public void testMkdir() Assert.assertTrue(headObjectResponse.sdkHttpResponse().isSuccessful()); } - @Test - public void testMoveFile() + @Test(dataProvider = "storageClasses") + public void testMoveFile(StorageClass storageClass) throws Exception { - String fileName = "file-to-move"; + _s3PinotFS.setStorageClass(storageClass); + + String sourceFilename = "source-file-" + System.currentTimeMillis(); + String targetFilename = "target-file-" + System.currentTimeMillis(); int fileSize = 5000; - File file = new File(TEMP_FILE, fileName); + File file = new File(TEMP_FILE, sourceFilename); try { createDummyFile(file, fileSize); - URI sourceUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)); + URI sourceUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, sourceFilename)); _s3PinotFS.copyFromLocalFile(file, sourceUri); HeadObjectResponse sourceHeadObjectResponse = - _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName)); + _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, sourceFilename)); - URI targetUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, "move-target")); + URI targetUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, targetFilename)); boolean moveResult = _s3PinotFS.move(sourceUri, targetUri, false); Assert.assertTrue(moveResult); @@ -445,7 +464,7 @@ public void testMoveFile() Assert.assertTrue(_s3PinotFS.exists(targetUri)); HeadObjectResponse targetHeadObjectResponse = - _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, "move-target")); + _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, targetFilename)); Assert.assertEquals(targetHeadObjectResponse.contentLength(), fileSize); Assert.assertEquals(targetHeadObjectResponse.storageClass(), @@ -469,6 +488,15 @@ public void testMoveFile() } } + @DataProvider(name = "storageClasses") + public Object[][] createStorageClasses() { + return new Object[][] { + { null }, + { StorageClass.STANDARD }, + { StorageClass.INTELLIGENT_TIERING } + }; + } + private static void createDummyFile(File file, int size) throws IOException { FileUtils.deleteQuietly(file); diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3TestUtils.java b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3TestUtils.java index e7938de1093..05a42f8a5fc 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3TestUtils.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3TestUtils.java @@ -18,17 +18,23 @@ */ package org.apache.pinot.plugin.filesystem; +import javax.annotation.Nullable; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.StorageClass; public class S3TestUtils { private S3TestUtils() { } - public static PutObjectRequest getPutObjectRequest(String bucket, String key) { - return PutObjectRequest.builder().bucket(bucket).key(key).build(); + public static PutObjectRequest getPutObjectRequest(String bucket, String key, @Nullable StorageClass storageClass) { + PutObjectRequest.Builder builder = PutObjectRequest.builder().bucket(bucket).key(key); + if (storageClass != null) { + builder.storageClass(storageClass); + } + return builder.build(); } public static HeadObjectRequest getHeadObjectRequest(String bucket, String key) {