diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java index e352926de7d4..a4ddde49b213 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java @@ -83,7 +83,7 @@ public void insert(final File file, final String contentType, final String path) try { RetryUtils.retry( (RetryUtils.Task) () -> { - storage.insert(config.getBucket(), path, new FileContent(contentType, file)); + storage.insert(config.getBucket(), path, new FileContent(contentType, file), null); return null; }, GoogleUtils::isRetryable, diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java index 6a5d17f7c242..52bd576ee9d8 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java @@ -44,6 +44,9 @@ public class GoogleStorage { + private static final Logger log = new Logger(GoogleStorage.class); + private static final HumanReadableBytes DEFAULT_WRITE_CHUNK_SIZE = new HumanReadableBytes("4MiB"); + /** * Some segment processing tools such as DataSegmentKiller are initialized when an ingestion job starts * if the extension is loaded, even when the implementation of DataSegmentKiller is not used. As a result, @@ -53,20 +56,34 @@ public class GoogleStorage *

* See OmniDataSegmentKiller for how DataSegmentKillers are initialized. */ - private static final Logger log = new Logger(GoogleStorage.class); - private final Supplier storage; - private final HumanReadableBytes DEFAULT_WRITE_CHUNK_SIZE = new HumanReadableBytes("4MiB"); - public GoogleStorage(final Supplier storage) { this.storage = storage; } - public void insert(final String bucket, final String path, AbstractInputStreamContent mediaContent) throws IOException + /** + * Upload an object. From {@link Storage#createFrom(BlobInfo, InputStream, int, Storage.BlobWriteOption...)}, + * "larger buffer sizes might improve the upload performance but require more memory." + * + * @param bucket target bucket + * @param path target path + * @param mediaContent content to upload + * @param bufferSize size of upload buffer, or null to use the upstream default (15 MB as of this writing) + */ + public void insert( + final String bucket, + final String path, + final AbstractInputStreamContent mediaContent, + @Nullable final Integer bufferSize + ) throws IOException { - storage.get().createFrom(getBlobInfo(bucket, path), mediaContent.getInputStream()); + if (bufferSize == null) { + storage.get().createFrom(getBlobInfo(bucket, path), mediaContent.getInputStream()); + } else { + storage.get().createFrom(getBlobInfo(bucket, path), mediaContent.getInputStream(), bufferSize); + } } public InputStream getInputStream(final String bucket, final String path) throws IOException @@ -148,13 +165,13 @@ public GoogleStorageObjectMetadata getMetadata( /** * Deletes an object in a bucket on the specified path - + * * A false response from GCS delete API is indicative of file not found. Any other error is raised as a StorageException * and should be explicitly handled. - Ref: HttpStorageRpc.java + * Ref: HttpStorageRpc.java * * @param bucket GCS bucket - * @param path Object path + * @param path Object path */ public void delete(final String bucket, final String path) { @@ -202,9 +219,12 @@ public long size(final String bucket, final String path) throws IOException * Return the etag for an object. This is a value that changes whenever the object's data or metadata changes and is * typically but not always the MD5 hash of the object. Ref: * ETags + * * @param bucket * @param path + * * @return + * * @throws IOException */ public String version(final String bucket, final String path) throws IOException diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java index a11694f4a2f6..2825f4033350 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java @@ -39,6 +39,12 @@ public class GoogleTaskLogs implements TaskLogs { private static final Logger LOG = new Logger(GoogleTaskLogs.class); + /** + * Use 1MB upload buffer, rather than the default of 15 MB in the API client. Mainly because MMs may upload logs + * in parallel, and typically have small heaps. The default-sized 15 MB buffers add up quickly. + */ + static final int UPLOAD_BUFFER_SIZE = 1024 * 1024; + private final GoogleTaskLogsConfig config; private final GoogleStorage storage; private final GoogleInputDataConfig inputDataConfig; @@ -92,7 +98,7 @@ private void pushTaskFile(final File logFile, final String taskKey) throws IOExc try { RetryUtils.retry( (RetryUtils.Task) () -> { - storage.insert(config.getBucket(), taskKey, mediaContent); + storage.insert(config.getBucket(), taskKey, mediaContent, UPLOAD_BUFFER_SIZE); return null; }, GoogleUtils::isRetryable, diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java index 15fbde3c5211..dcead48639e2 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java @@ -19,9 +19,11 @@ package org.apache.druid.storage.google; +import com.google.api.client.http.AbstractInputStreamContent; import com.google.api.gax.paging.Page; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; import com.google.common.collect.ImmutableList; @@ -31,7 +33,9 @@ import org.junit.Before; import org.junit.Test; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.List; @@ -65,12 +69,51 @@ public void setUp() blob = EasyMock.mock(Blob.class); } + @Test + public void testInsertDefaultBufferSize() throws IOException + { + final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]); + final Capture inputStreamCapture = Capture.newInstance(); + final AbstractInputStreamContent httpContent = EasyMock.createMock(AbstractInputStreamContent.class); + EasyMock.expect(httpContent.getInputStream()).andReturn(inputStream); + EasyMock.expect( + mockStorage.createFrom( + EasyMock.eq(BlobInfo.newBuilder(BlobId.of(BUCKET, PATH)).build()), + EasyMock.capture(inputStreamCapture) + ) + ).andReturn(blob); + EasyMock.replay(httpContent, mockStorage, blob); + googleStorage.insert(BUCKET, PATH, httpContent, null); + EasyMock.verify(httpContent, mockStorage, blob); + } + + @Test + public void testInsertCustomBufferSize() throws IOException + { + final int bufferSize = 100; + final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]); + final Capture inputStreamCapture = Capture.newInstance(); + final AbstractInputStreamContent httpContent = EasyMock.createMock(AbstractInputStreamContent.class); + EasyMock.expect(httpContent.getInputStream()).andReturn(inputStream); + EasyMock.expect( + mockStorage.createFrom( + EasyMock.eq(BlobInfo.newBuilder(BlobId.of(BUCKET, PATH)).build()), + EasyMock.capture(inputStreamCapture), + EasyMock.eq(bufferSize) + ) + ).andReturn(blob); + EasyMock.replay(httpContent, mockStorage, blob); + googleStorage.insert(BUCKET, PATH, httpContent, bufferSize); + EasyMock.verify(httpContent, mockStorage, blob); + } + @Test public void testDeleteSuccess() { EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(true); EasyMock.replay(mockStorage); googleStorage.delete(BUCKET, PATH); + EasyMock.verify(mockStorage); } @Test @@ -79,6 +122,7 @@ public void testDeleteFileNotFound() EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(false); EasyMock.replay(mockStorage); googleStorage.delete(BUCKET, PATH); + EasyMock.verify(mockStorage); } @Test @@ -87,6 +131,7 @@ public void testDeleteFailure() EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andThrow(STORAGE_EXCEPTION); EasyMock.replay(mockStorage); Assert.assertThrows(StorageException.class, () -> googleStorage.delete(BUCKET, PATH)); + EasyMock.verify(mockStorage); } @Test @@ -107,7 +152,7 @@ public void testBatchDeleteSuccess() assertTrue(paths.size() == recordedPaths.size() && paths.containsAll(recordedPaths) && recordedPaths.containsAll( paths)); assertEquals(BUCKET, recordedBlobIds.get(0).getBucket()); - + EasyMock.verify(mockStorage); } @Test @@ -129,7 +174,7 @@ public void testBatchDeleteFileNotFound() assertTrue(paths.containsAll(recordedPaths)); assertTrue(recordedPaths.containsAll(paths)); assertEquals(BUCKET, recordedBlobIds.get(0).getBucket()); - + EasyMock.verify(mockStorage); } @Test @@ -140,6 +185,7 @@ public void testBatchDeleteFailure() .andThrow(STORAGE_EXCEPTION); EasyMock.replay(mockStorage); Assert.assertThrows(StorageException.class, () -> googleStorage.batchDelete(BUCKET, paths)); + EasyMock.verify(mockStorage); } @Test @@ -164,6 +210,7 @@ public void testGetMetadataMatch() throws IOException new GoogleStorageObjectMetadata(BUCKET, PATH, SIZE, UPDATE_TIME.toEpochSecond() * 1000) ); + EasyMock.verify(mockStorage); } @Test @@ -172,6 +219,7 @@ public void testExistsTrue() EasyMock.expect(mockStorage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(blob); EasyMock.replay(mockStorage); assertTrue(googleStorage.exists(BUCKET, PATH)); + EasyMock.verify(mockStorage); } @Test @@ -180,6 +228,7 @@ public void testExistsFalse() EasyMock.expect(mockStorage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(null); EasyMock.replay(mockStorage); assertFalse(googleStorage.exists(BUCKET, PATH)); + EasyMock.verify(mockStorage); } @Test @@ -198,6 +247,7 @@ public void testSize() throws IOException long size = googleStorage.size(BUCKET, PATH); assertEquals(size, SIZE); + EasyMock.verify(mockStorage, blob); } @Test @@ -215,6 +265,7 @@ public void testVersion() throws IOException EasyMock.replay(mockStorage, blob); assertEquals(etag, googleStorage.version(BUCKET, PATH)); + EasyMock.verify(mockStorage, blob); } @Test @@ -279,5 +330,7 @@ public void testList() throws IOException assertEquals(objectPage.getObjectList().get(0), objectMetadata1); assertEquals(objectPage.getObjectList().get(1), objectMetadata2); assertEquals(objectPage.getNextPageToken(), nextPageToken); + + EasyMock.verify(mockStorage, blobPage, blob1, blob2); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java index 438d4b8ed6f5..82797331e3ed 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java @@ -91,7 +91,8 @@ public void testPushTaskLog() throws Exception storage.insert( EasyMock.eq(BUCKET), EasyMock.eq(PREFIX + "/" + TASKID), - EasyMock.anyObject(InputStreamContent.class) + EasyMock.anyObject(InputStreamContent.class), + EasyMock.eq(GoogleTaskLogs.UPLOAD_BUFFER_SIZE) ); EasyMock.expectLastCall(); @@ -120,7 +121,8 @@ public void testPushTaskStatus() throws Exception storage.insert( EasyMock.eq(BUCKET), EasyMock.eq(PREFIX + "/" + TASKID), - EasyMock.anyObject(InputStreamContent.class) + EasyMock.anyObject(InputStreamContent.class), + EasyMock.eq(GoogleTaskLogs.UPLOAD_BUFFER_SIZE) ); EasyMock.expectLastCall(); diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java index ec67269e46d6..1babde1e0cb2 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java @@ -93,9 +93,11 @@ public void uploadFileToGcs(String filePath, String contentType) throws IOExcept { LOG.info("Uploading file %s at path %s in bucket %s", filePath, GOOGLE_PREFIX, GOOGLE_BUCKET); File file = new File(filePath); - googleStorageClient.insert(GOOGLE_BUCKET, - GOOGLE_PREFIX + "/" + file.getName(), - new FileContent(contentType, file) + googleStorageClient.insert( + GOOGLE_BUCKET, + GOOGLE_PREFIX + "/" + file.getName(), + new FileContent(contentType, file), + null ); }