Skip to content

Commit

Permalink
Reduce upload buffer size in GoogleTaskLogs. (apache#16236)
Browse files Browse the repository at this point in the history
* Reduce upload buffer size in GoogleTaskLogs.

Use a 1MB upload buffer, rather than the default of 15 MB in the API client. This is
mainly because MMs may upload logs in parallel, and typically have small heaps. The
default-sized 15 MB buffers add up quickly and can cause a MM to run out of memory.

* Make bufferSize a nullable Integer. Add tests.
  • Loading branch information
gianm authored Apr 8, 2024
1 parent 4ff7e2c commit 5e5cf9a
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void insert(final File file, final String contentType, final String path)
try {
RetryUtils.retry(
(RetryUtils.Task<Void>) () -> {
storage.insert(config.getBucket(), path, new FileContent(contentType, file));
storage.insert(config.getBucket(), path, new FileContent(contentType, file), null);
return null;
},
GoogleUtils::isRetryable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@

public class GoogleStorage
{
private static final Logger log = new Logger(GoogleStorage.class);
private static final HumanReadableBytes DEFAULT_WRITE_CHUNK_SIZE = new HumanReadableBytes("4MiB");

/**
* Some segment processing tools such as DataSegmentKiller are initialized when an ingestion job starts
* if the extension is loaded, even when the implementation of DataSegmentKiller is not used. As a result,
Expand All @@ -53,20 +56,34 @@ public class GoogleStorage
* <p>
* See OmniDataSegmentKiller for how DataSegmentKillers are initialized.
*/
private static final Logger log = new Logger(GoogleStorage.class);

private final Supplier<Storage> storage;

private final HumanReadableBytes DEFAULT_WRITE_CHUNK_SIZE = new HumanReadableBytes("4MiB");

public GoogleStorage(final Supplier<Storage> storage)
{
this.storage = storage;
}

public void insert(final String bucket, final String path, AbstractInputStreamContent mediaContent) throws IOException
/**
* Upload an object. From {@link Storage#createFrom(BlobInfo, InputStream, int, Storage.BlobWriteOption...)},
* "larger buffer sizes might improve the upload performance but require more memory."
*
* @param bucket target bucket
* @param path target path
* @param mediaContent content to upload
* @param bufferSize size of upload buffer, or null to use the upstream default (15 MB as of this writing)
*/
public void insert(
final String bucket,
final String path,
final AbstractInputStreamContent mediaContent,
@Nullable final Integer bufferSize
) throws IOException
{
storage.get().createFrom(getBlobInfo(bucket, path), mediaContent.getInputStream());
if (bufferSize == null) {
storage.get().createFrom(getBlobInfo(bucket, path), mediaContent.getInputStream());
} else {
storage.get().createFrom(getBlobInfo(bucket, path), mediaContent.getInputStream(), bufferSize);
}
}

public InputStream getInputStream(final String bucket, final String path) throws IOException
Expand Down Expand Up @@ -148,13 +165,13 @@ public GoogleStorageObjectMetadata getMetadata(

/**
* Deletes an object in a bucket on the specified path
*
* A false response from GCS delete API is indicative of file not found. Any other error is raised as a StorageException
* and should be explicitly handled.
Ref: <a href="https://github.com/googleapis/java-storage/blob/v2.29.1/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java">HttpStorageRpc.java</a>
* Ref: <a href="https://github.com/googleapis/java-storage/blob/v2.29.1/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java">HttpStorageRpc.java</a>
*
* @param bucket GCS bucket
* @param path Object path
* @param path Object path
*/
public void delete(final String bucket, final String path)
{
Expand Down Expand Up @@ -202,9 +219,12 @@ public long size(final String bucket, final String path) throws IOException
* Return the etag for an object. This is a value that changes whenever the object's data or metadata changes and is
* typically but not always the MD5 hash of the object. Ref:
* <a href="https://cloud.google.com/storage/docs/hashes-etags#etags">ETags</a>
*
* @param bucket
* @param path
*
* @return
*
* @throws IOException
*/
public String version(final String bucket, final String path) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ public class GoogleTaskLogs implements TaskLogs
{
private static final Logger LOG = new Logger(GoogleTaskLogs.class);

/**
* Use 1MB upload buffer, rather than the default of 15 MB in the API client. Mainly because MMs may upload logs
* in parallel, and typically have small heaps. The default-sized 15 MB buffers add up quickly.
*/
static final int UPLOAD_BUFFER_SIZE = 1024 * 1024;

private final GoogleTaskLogsConfig config;
private final GoogleStorage storage;
private final GoogleInputDataConfig inputDataConfig;
Expand Down Expand Up @@ -92,7 +98,7 @@ private void pushTaskFile(final File logFile, final String taskKey) throws IOExc
try {
RetryUtils.retry(
(RetryUtils.Task<Void>) () -> {
storage.insert(config.getBucket(), taskKey, mediaContent);
storage.insert(config.getBucket(), taskKey, mediaContent, UPLOAD_BUFFER_SIZE);
return null;
},
GoogleUtils::isRetryable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.apache.druid.storage.google;

import com.google.api.client.http.AbstractInputStreamContent;
import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.collect.ImmutableList;
Expand All @@ -31,7 +33,9 @@
import org.junit.Before;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -65,12 +69,51 @@ public void setUp()
blob = EasyMock.mock(Blob.class);
}

@Test
public void testInsertDefaultBufferSize() throws IOException
{
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]);
final Capture<InputStream> inputStreamCapture = Capture.newInstance();
final AbstractInputStreamContent httpContent = EasyMock.createMock(AbstractInputStreamContent.class);
EasyMock.expect(httpContent.getInputStream()).andReturn(inputStream);
EasyMock.expect(
mockStorage.createFrom(
EasyMock.eq(BlobInfo.newBuilder(BlobId.of(BUCKET, PATH)).build()),
EasyMock.capture(inputStreamCapture)
)
).andReturn(blob);
EasyMock.replay(httpContent, mockStorage, blob);
googleStorage.insert(BUCKET, PATH, httpContent, null);
EasyMock.verify(httpContent, mockStorage, blob);
}

@Test
public void testInsertCustomBufferSize() throws IOException
{
final int bufferSize = 100;
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]);
final Capture<InputStream> inputStreamCapture = Capture.newInstance();
final AbstractInputStreamContent httpContent = EasyMock.createMock(AbstractInputStreamContent.class);
EasyMock.expect(httpContent.getInputStream()).andReturn(inputStream);
EasyMock.expect(
mockStorage.createFrom(
EasyMock.eq(BlobInfo.newBuilder(BlobId.of(BUCKET, PATH)).build()),
EasyMock.capture(inputStreamCapture),
EasyMock.eq(bufferSize)
)
).andReturn(blob);
EasyMock.replay(httpContent, mockStorage, blob);
googleStorage.insert(BUCKET, PATH, httpContent, bufferSize);
EasyMock.verify(httpContent, mockStorage, blob);
}

@Test
public void testDeleteSuccess()
{
EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(true);
EasyMock.replay(mockStorage);
googleStorage.delete(BUCKET, PATH);
EasyMock.verify(mockStorage);
}

@Test
Expand All @@ -79,6 +122,7 @@ public void testDeleteFileNotFound()
EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(false);
EasyMock.replay(mockStorage);
googleStorage.delete(BUCKET, PATH);
EasyMock.verify(mockStorage);
}

@Test
Expand All @@ -87,6 +131,7 @@ public void testDeleteFailure()
EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andThrow(STORAGE_EXCEPTION);
EasyMock.replay(mockStorage);
Assert.assertThrows(StorageException.class, () -> googleStorage.delete(BUCKET, PATH));
EasyMock.verify(mockStorage);
}

@Test
Expand All @@ -107,7 +152,7 @@ public void testBatchDeleteSuccess()
assertTrue(paths.size() == recordedPaths.size() && paths.containsAll(recordedPaths) && recordedPaths.containsAll(
paths));
assertEquals(BUCKET, recordedBlobIds.get(0).getBucket());

EasyMock.verify(mockStorage);
}

@Test
Expand All @@ -129,7 +174,7 @@ public void testBatchDeleteFileNotFound()
assertTrue(paths.containsAll(recordedPaths));
assertTrue(recordedPaths.containsAll(paths));
assertEquals(BUCKET, recordedBlobIds.get(0).getBucket());

EasyMock.verify(mockStorage);
}

@Test
Expand All @@ -140,6 +185,7 @@ public void testBatchDeleteFailure()
.andThrow(STORAGE_EXCEPTION);
EasyMock.replay(mockStorage);
Assert.assertThrows(StorageException.class, () -> googleStorage.batchDelete(BUCKET, paths));
EasyMock.verify(mockStorage);
}

@Test
Expand All @@ -164,6 +210,7 @@ public void testGetMetadataMatch() throws IOException
new GoogleStorageObjectMetadata(BUCKET, PATH, SIZE, UPDATE_TIME.toEpochSecond() * 1000)
);

EasyMock.verify(mockStorage);
}

@Test
Expand All @@ -172,6 +219,7 @@ public void testExistsTrue()
EasyMock.expect(mockStorage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(blob);
EasyMock.replay(mockStorage);
assertTrue(googleStorage.exists(BUCKET, PATH));
EasyMock.verify(mockStorage);
}

@Test
Expand All @@ -180,6 +228,7 @@ public void testExistsFalse()
EasyMock.expect(mockStorage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(null);
EasyMock.replay(mockStorage);
assertFalse(googleStorage.exists(BUCKET, PATH));
EasyMock.verify(mockStorage);
}

@Test
Expand All @@ -198,6 +247,7 @@ public void testSize() throws IOException
long size = googleStorage.size(BUCKET, PATH);

assertEquals(size, SIZE);
EasyMock.verify(mockStorage, blob);
}

@Test
Expand All @@ -215,6 +265,7 @@ public void testVersion() throws IOException
EasyMock.replay(mockStorage, blob);

assertEquals(etag, googleStorage.version(BUCKET, PATH));
EasyMock.verify(mockStorage, blob);
}

@Test
Expand Down Expand Up @@ -279,5 +330,7 @@ public void testList() throws IOException
assertEquals(objectPage.getObjectList().get(0), objectMetadata1);
assertEquals(objectPage.getObjectList().get(1), objectMetadata2);
assertEquals(objectPage.getNextPageToken(), nextPageToken);

EasyMock.verify(mockStorage, blobPage, blob1, blob2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void testPushTaskLog() throws Exception
storage.insert(
EasyMock.eq(BUCKET),
EasyMock.eq(PREFIX + "/" + TASKID),
EasyMock.anyObject(InputStreamContent.class)
EasyMock.anyObject(InputStreamContent.class),
EasyMock.eq(GoogleTaskLogs.UPLOAD_BUFFER_SIZE)
);
EasyMock.expectLastCall();

Expand Down Expand Up @@ -120,7 +121,8 @@ public void testPushTaskStatus() throws Exception
storage.insert(
EasyMock.eq(BUCKET),
EasyMock.eq(PREFIX + "/" + TASKID),
EasyMock.anyObject(InputStreamContent.class)
EasyMock.anyObject(InputStreamContent.class),
EasyMock.eq(GoogleTaskLogs.UPLOAD_BUFFER_SIZE)
);
EasyMock.expectLastCall();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ public void uploadFileToGcs(String filePath, String contentType) throws IOExcept
{
LOG.info("Uploading file %s at path %s in bucket %s", filePath, GOOGLE_PREFIX, GOOGLE_BUCKET);
File file = new File(filePath);
googleStorageClient.insert(GOOGLE_BUCKET,
GOOGLE_PREFIX + "/" + file.getName(),
new FileContent(contentType, file)
googleStorageClient.insert(
GOOGLE_BUCKET,
GOOGLE_PREFIX + "/" + file.getName(),
new FileContent(contentType, file),
null
);
}

Expand Down

0 comments on commit 5e5cf9a

Please sign in to comment.