Skip to content

Commit

Permalink
Add MSQ Durable Storage Connector for Google Cloud Storage and change…
Browse files Browse the repository at this point in the history
… current Google Cloud Storage client library (apache#15398)

The PR addresses 2 things:

    Add MSQ durable storage connector for GCS
    Change GCS client library from the old Google API Client Library to the recommended Google Cloud Client Library. Ref: https://cloud.google.com/apis/docs/client-libraries-explained
  • Loading branch information
gargvishesh authored Dec 14, 2023
1 parent 0436eda commit e43bb74
Show file tree
Hide file tree
Showing 39 changed files with 1,996 additions and 547 deletions.
1 change: 1 addition & 0 deletions distribution/bin/check-licenses.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ def build_compatible_license_names():
compatible_licenses['The BSD 3-Clause License'] = 'BSD-3-Clause License'
compatible_licenses['Revised BSD'] = 'BSD-3-Clause License'
compatible_licenses['New BSD License'] = 'BSD-3-Clause License'
compatible_licenses['BSD New license'] = 'BSD-3-Clause License'
compatible_licenses['3-Clause BSD License'] = 'BSD-3-Clause License'
compatible_licenses['BSD 3-Clause'] = 'BSD-3-Clause License'
compatible_licenses['BSD-3-Clause'] = 'BSD-3-Clause License'
Expand Down
8 changes: 4 additions & 4 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,24 +356,24 @@ SQL-based ingestion supports using durable storage to store intermediate files t

### Durable storage configurations

Durable storage is supported on Amazon S3 storage and Microsoft's Azure Blob Storage.
Durable storage is supported on Amazon S3 storage, Microsoft's Azure Blob Storage and Google Cloud Storage.
There are common configurations that control the behavior regardless of which storage service you use. Apart from these common configurations, there are a few properties specific to S3 and to Azure.

Common properties to configure the behavior of durable storage

|Parameter | Required | Description | Default |
|--|--|--|
|`druid.msq.intermediate.storage.enable` | Yes | Whether to enable durable storage for the cluster. Set it to true to enable durable storage. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md). | false |
|`druid.msq.intermediate.storage.type` | Yes | The type of storage to use. Set it to `s3` for S3 and `azure` for Azure | n/a |
|`druid.msq.intermediate.storage.type` | Yes | The type of storage to use. Set it to `s3` for S3, `azure` for Azure and `google` for Google | n/a |
|`druid.msq.intermediate.storage.tempDir`| Yes | Directory path on the local disk to store temporary files required while uploading and downloading the data | n/a |
|`druid.msq.intermediate.storage.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 |
|`druid.msq.intermediate.storage.chunkSize` | No | Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls made to the durable storage, however it requires more disk space to store the temporary chunks. Druid uses a default of 100MiB if the value is not provided.| 100MiB |

To use S3 for durable storage, you also need to configure the following properties:
To use S3 or Google for durable storage, you also need to configure the following properties:

|Parameter | Required | Description | Default |
|-------------------|----------------------------------------|----------------------| --|
|`druid.msq.intermediate.storage.bucket` | Yes | The S3 bucket where the files are uploaded to and download from | n/a |
|`druid.msq.intermediate.storage.bucket` | Yes | The S3 or Google bucket where the files are uploaded to and download from | n/a |
|`druid.msq.intermediate.storage.prefix` | Yes | Path prepended to all the paths uploaded to the bucket to namespace the connector's files. Provide a unique value for the prefix and do not share the same prefix between different clusters. If the location includes other files or directories, then they might get cleaned up as well. | n/a |

To use Azure for durable storage, you also need to configure the following properties:
Expand Down
2 changes: 1 addition & 1 deletion docs/operations/durable-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ sidebar_label: "Durable storage"

You can use durable storage to improve querying from deep storage and SQL-based ingestion.

> Note that only S3 is supported as a durable storage location.
> Note that S3, Azure and Google are all supported as durable storage locations.
Durable storage for queries from deep storage provides a location where you can write the results of deep storage queries to. Durable storage for SQL-based ingestion is used to temporarily house intermediate files, which can improve reliability.

Expand Down
22 changes: 13 additions & 9 deletions extensions-core/google-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,9 @@
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-storage</artifactId>
<version>${com.google.apis.storage.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
</exclusion>
</exclusions>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>${com.google.cloud.storage.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Expand Down Expand Up @@ -125,6 +119,16 @@
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>2.37.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-core</artifactId>
<version>2.27.0</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>org.apache.druid</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.Iterators;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputSplit;
Expand All @@ -37,12 +36,12 @@
import org.apache.druid.storage.google.GoogleInputDataConfig;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleStorageDruidModule;
import org.apache.druid.storage.google.GoogleStorageObjectMetadata;
import org.apache.druid.storage.google.GoogleUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -139,23 +138,23 @@ public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> pre
@Override
public long getObjectSize(CloudObjectLocation location) throws IOException
{
final StorageObject storageObject = storage.getMetadata(location.getBucket(), location.getPath());
final GoogleStorageObjectMetadata storageObject = storage.getMetadata(location.getBucket(), location.getPath());
return getSize(storageObject);
}
}

return new SplitWidget();
}

private static long getSize(final StorageObject object)
private static long getSize(final GoogleStorageObjectMetadata object)
{
final BigInteger sizeInBigInteger = object.getSize();
final Long sizeInLong = object.getSize();

if (sizeInBigInteger == null) {
if (sizeInLong == null) {
return Long.MAX_VALUE;
} else {
try {
return sizeInBigInteger.longValueExact();
return sizeInLong;
}
catch (ArithmeticException e) {
LOG.warn(
Expand All @@ -164,7 +163,7 @@ private static long getSize(final StorageObject object)
+ "The max long value will be used for its size instead.",
object.getBucket(),
object.getName(),
sizeInBigInteger
sizeInLong
);
return Long.MAX_VALUE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ public String getPath()
@Override
public InputStream openStream() throws IOException
{
return storage.get(bucket, path);
return storage.getInputStream(bucket, path);
}

public InputStream openStream(long start) throws IOException
{
return storage.get(bucket, path, start);
return storage.getInputStream(bucket, path, start);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ FileUtils.FileCopyResult getSegmentFiles(final String bucket, final String path,
public InputStream getInputStream(URI uri) throws IOException
{
String path = StringUtils.maybeRemoveLeadingSlash(uri.getPath());
return storage.get(uri.getHost() != null ? uri.getHost() : uri.getAuthority(), path);
return storage.getInputStream(uri.getHost() != null ? uri.getHost() : uri.getAuthority(), path);
}

@Override
Expand Down
Loading

0 comments on commit e43bb74

Please sign in to comment.