From 5e5cf9af99be1b842a7c351a7b5526215979cc75 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 8 Apr 2024 12:54:31 -0700 Subject: [PATCH] Reduce upload buffer size in GoogleTaskLogs. (#16236) * Reduce upload buffer size in GoogleTaskLogs. Use a 1MB upload buffer, rather than the default of 15 MB in the API client. This is mainly because MMs may upload logs in parallel, and typically have small heaps. The default-sized 15 MB buffers add up quickly and can cause a MM to run out of memory. * Make bufferSize a nullable Integer. Add tests. --- .../google/GoogleDataSegmentPusher.java | 2 +- .../druid/storage/google/GoogleStorage.java | 38 ++++++++++--- .../druid/storage/google/GoogleTaskLogs.java | 8 ++- .../storage/google/GoogleStorageTest.java | 57 ++++++++++++++++++- .../storage/google/GoogleTaskLogsTest.java | 6 +- .../druid/testsEx/utils/GcsTestUtil.java | 8 ++- 6 files changed, 101 insertions(+), 18 deletions(-) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java index e352926de7d4..a4ddde49b213 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java @@ -83,7 +83,7 @@ public void insert(final File file, final String contentType, final String path) try { RetryUtils.retry( (RetryUtils.Task) () -> { - storage.insert(config.getBucket(), path, new FileContent(contentType, file)); + storage.insert(config.getBucket(), path, new FileContent(contentType, file), null); return null; }, GoogleUtils::isRetryable, diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java index 6a5d17f7c242..52bd576ee9d8 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java @@ -44,6 +44,9 @@ public class GoogleStorage { + private static final Logger log = new Logger(GoogleStorage.class); + private static final HumanReadableBytes DEFAULT_WRITE_CHUNK_SIZE = new HumanReadableBytes("4MiB"); + /** * Some segment processing tools such as DataSegmentKiller are initialized when an ingestion job starts * if the extension is loaded, even when the implementation of DataSegmentKiller is not used. As a result, @@ -53,20 +56,34 @@ public class GoogleStorage *

* See OmniDataSegmentKiller for how DataSegmentKillers are initialized. */ - private static final Logger log = new Logger(GoogleStorage.class); - private final Supplier storage; - private final HumanReadableBytes DEFAULT_WRITE_CHUNK_SIZE = new HumanReadableBytes("4MiB"); - public GoogleStorage(final Supplier storage) { this.storage = storage; } - public void insert(final String bucket, final String path, AbstractInputStreamContent mediaContent) throws IOException + /** + * Upload an object. From {@link Storage#createFrom(BlobInfo, InputStream, int, Storage.BlobWriteOption...)}, + * "larger buffer sizes might improve the upload performance but require more memory." + * + * @param bucket target bucket + * @param path target path + * @param mediaContent content to upload + * @param bufferSize size of upload buffer, or null to use the upstream default (15 MB as of this writing) + */ + public void insert( + final String bucket, + final String path, + final AbstractInputStreamContent mediaContent, + @Nullable final Integer bufferSize + ) throws IOException { - storage.get().createFrom(getBlobInfo(bucket, path), mediaContent.getInputStream()); + if (bufferSize == null) { + storage.get().createFrom(getBlobInfo(bucket, path), mediaContent.getInputStream()); + } else { + storage.get().createFrom(getBlobInfo(bucket, path), mediaContent.getInputStream(), bufferSize); + } } public InputStream getInputStream(final String bucket, final String path) throws IOException @@ -148,13 +165,13 @@ public GoogleStorageObjectMetadata getMetadata( /** * Deletes an object in a bucket on the specified path - + * * A false response from GCS delete API is indicative of file not found. Any other error is raised as a StorageException * and should be explicitly handled. - Ref: HttpStorageRpc.java + * Ref: HttpStorageRpc.java * * @param bucket GCS bucket - * @param path Object path + * @param path Object path */ public void delete(final String bucket, final String path) { @@ -202,9 +219,12 @@ public long size(final String bucket, final String path) throws IOException * Return the etag for an object. This is a value that changes whenever the object's data or metadata changes and is * typically but not always the MD5 hash of the object. Ref: * ETags + * * @param bucket * @param path + * * @return + * * @throws IOException */ public String version(final String bucket, final String path) throws IOException diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java index a11694f4a2f6..2825f4033350 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java @@ -39,6 +39,12 @@ public class GoogleTaskLogs implements TaskLogs { private static final Logger LOG = new Logger(GoogleTaskLogs.class); + /** + * Use 1MB upload buffer, rather than the default of 15 MB in the API client. Mainly because MMs may upload logs + * in parallel, and typically have small heaps. The default-sized 15 MB buffers add up quickly. + */ + static final int UPLOAD_BUFFER_SIZE = 1024 * 1024; + private final GoogleTaskLogsConfig config; private final GoogleStorage storage; private final GoogleInputDataConfig inputDataConfig; @@ -92,7 +98,7 @@ private void pushTaskFile(final File logFile, final String taskKey) throws IOExc try { RetryUtils.retry( (RetryUtils.Task) () -> { - storage.insert(config.getBucket(), taskKey, mediaContent); + storage.insert(config.getBucket(), taskKey, mediaContent, UPLOAD_BUFFER_SIZE); return null; }, GoogleUtils::isRetryable, diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java index 15fbde3c5211..dcead48639e2 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java @@ -19,9 +19,11 @@ package org.apache.druid.storage.google; +import com.google.api.client.http.AbstractInputStreamContent; import com.google.api.gax.paging.Page; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; import com.google.common.collect.ImmutableList; @@ -31,7 +33,9 @@ import org.junit.Before; import org.junit.Test; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.List; @@ -65,12 +69,51 @@ public void setUp() blob = EasyMock.mock(Blob.class); } + @Test + public void testInsertDefaultBufferSize() throws IOException + { + final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]); + final Capture inputStreamCapture = Capture.newInstance(); + final AbstractInputStreamContent httpContent = EasyMock.createMock(AbstractInputStreamContent.class); + EasyMock.expect(httpContent.getInputStream()).andReturn(inputStream); + EasyMock.expect( + mockStorage.createFrom( + EasyMock.eq(BlobInfo.newBuilder(BlobId.of(BUCKET, PATH)).build()), + EasyMock.capture(inputStreamCapture) + ) + ).andReturn(blob); + EasyMock.replay(httpContent, mockStorage, blob); + googleStorage.insert(BUCKET, PATH, httpContent, null); + EasyMock.verify(httpContent, mockStorage, blob); + } + + @Test + public void testInsertCustomBufferSize() throws IOException + { + final int bufferSize = 100; + final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]); + final Capture inputStreamCapture = Capture.newInstance(); + final AbstractInputStreamContent httpContent = EasyMock.createMock(AbstractInputStreamContent.class); + EasyMock.expect(httpContent.getInputStream()).andReturn(inputStream); + EasyMock.expect( + mockStorage.createFrom( + EasyMock.eq(BlobInfo.newBuilder(BlobId.of(BUCKET, PATH)).build()), + EasyMock.capture(inputStreamCapture), + EasyMock.eq(bufferSize) + ) + ).andReturn(blob); + EasyMock.replay(httpContent, mockStorage, blob); + googleStorage.insert(BUCKET, PATH, httpContent, bufferSize); + EasyMock.verify(httpContent, mockStorage, blob); + } + @Test public void testDeleteSuccess() { EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(true); EasyMock.replay(mockStorage); googleStorage.delete(BUCKET, PATH); + EasyMock.verify(mockStorage); } @Test @@ -79,6 +122,7 @@ public void testDeleteFileNotFound() EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(false); EasyMock.replay(mockStorage); googleStorage.delete(BUCKET, PATH); + EasyMock.verify(mockStorage); } @Test @@ -87,6 +131,7 @@ public void testDeleteFailure() EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andThrow(STORAGE_EXCEPTION); EasyMock.replay(mockStorage); Assert.assertThrows(StorageException.class, () -> googleStorage.delete(BUCKET, PATH)); + EasyMock.verify(mockStorage); } @Test @@ -107,7 +152,7 @@ public void testBatchDeleteSuccess() assertTrue(paths.size() == recordedPaths.size() && paths.containsAll(recordedPaths) && recordedPaths.containsAll( paths)); assertEquals(BUCKET, recordedBlobIds.get(0).getBucket()); - + EasyMock.verify(mockStorage); } @Test @@ -129,7 +174,7 @@ public void testBatchDeleteFileNotFound() assertTrue(paths.containsAll(recordedPaths)); assertTrue(recordedPaths.containsAll(paths)); assertEquals(BUCKET, recordedBlobIds.get(0).getBucket()); - + EasyMock.verify(mockStorage); } @Test @@ -140,6 +185,7 @@ public void testBatchDeleteFailure() .andThrow(STORAGE_EXCEPTION); EasyMock.replay(mockStorage); Assert.assertThrows(StorageException.class, () -> googleStorage.batchDelete(BUCKET, paths)); + EasyMock.verify(mockStorage); } @Test @@ -164,6 +210,7 @@ public void testGetMetadataMatch() throws IOException new GoogleStorageObjectMetadata(BUCKET, PATH, SIZE, UPDATE_TIME.toEpochSecond() * 1000) ); + EasyMock.verify(mockStorage); } @Test @@ -172,6 +219,7 @@ public void testExistsTrue() EasyMock.expect(mockStorage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(blob); EasyMock.replay(mockStorage); assertTrue(googleStorage.exists(BUCKET, PATH)); + EasyMock.verify(mockStorage); } @Test @@ -180,6 +228,7 @@ public void testExistsFalse() EasyMock.expect(mockStorage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(null); EasyMock.replay(mockStorage); assertFalse(googleStorage.exists(BUCKET, PATH)); + EasyMock.verify(mockStorage); } @Test @@ -198,6 +247,7 @@ public void testSize() throws IOException long size = googleStorage.size(BUCKET, PATH); assertEquals(size, SIZE); + EasyMock.verify(mockStorage, blob); } @Test @@ -215,6 +265,7 @@ public void testVersion() throws IOException EasyMock.replay(mockStorage, blob); assertEquals(etag, googleStorage.version(BUCKET, PATH)); + EasyMock.verify(mockStorage, blob); } @Test @@ -279,5 +330,7 @@ public void testList() throws IOException assertEquals(objectPage.getObjectList().get(0), objectMetadata1); assertEquals(objectPage.getObjectList().get(1), objectMetadata2); assertEquals(objectPage.getNextPageToken(), nextPageToken); + + EasyMock.verify(mockStorage, blobPage, blob1, blob2); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java index 438d4b8ed6f5..82797331e3ed 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java @@ -91,7 +91,8 @@ public void testPushTaskLog() throws Exception storage.insert( EasyMock.eq(BUCKET), EasyMock.eq(PREFIX + "/" + TASKID), - EasyMock.anyObject(InputStreamContent.class) + EasyMock.anyObject(InputStreamContent.class), + EasyMock.eq(GoogleTaskLogs.UPLOAD_BUFFER_SIZE) ); EasyMock.expectLastCall(); @@ -120,7 +121,8 @@ public void testPushTaskStatus() throws Exception storage.insert( EasyMock.eq(BUCKET), EasyMock.eq(PREFIX + "/" + TASKID), - EasyMock.anyObject(InputStreamContent.class) + EasyMock.anyObject(InputStreamContent.class), + EasyMock.eq(GoogleTaskLogs.UPLOAD_BUFFER_SIZE) ); EasyMock.expectLastCall(); diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java index ec67269e46d6..1babde1e0cb2 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java @@ -93,9 +93,11 @@ public void uploadFileToGcs(String filePath, String contentType) throws IOExcept { LOG.info("Uploading file %s at path %s in bucket %s", filePath, GOOGLE_PREFIX, GOOGLE_BUCKET); File file = new File(filePath); - googleStorageClient.insert(GOOGLE_BUCKET, - GOOGLE_PREFIX + "/" + file.getName(), - new FileContent(contentType, file) + googleStorageClient.insert( + GOOGLE_BUCKET, + GOOGLE_PREFIX + "/" + file.getName(), + new FileContent(contentType, file), + null ); }