Skip to content

Commit

Permalink
Azure deep storage (#58)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: fabricebaranski <[email protected]>
  • Loading branch information
juhoautio-rovio and fabricebaranski authored Sep 12, 2024
1 parent 31153c7 commit 349e396
Show file tree
Hide file tree
Showing 9 changed files with 472 additions and 2 deletions.
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,22 @@ These are the options for `DruidSource`, to be passed with `write.options()`.
| `druid.segment_storage.hdfs.security.kerberos.principal` | Kerberos principal |
| `druid.segment_storage.hdfs.security.kerberos.keytab` | Kerberos keytab |

4. **If Deep Storage is `azure`:**

| Property | Description |
| --- | --- |
| `druid.azure.account` | Azure account |
| `druid.azure.key` | Azure key |
| `druid.azure.sharedAccessStorageToken` | Azure token (if no key) |
| `druid.azure.useAzureCredentialsChain` | Use DefaultAzureCredential for authentication |
| `druid.azure.managedIdentityClientId` | If you want to use managed identity authentication in the DefaultAzureCredential, useAzureCredentialsChain must be true. |
| `druid.azure.endpointSuffix` | The endpoint suffix to use. Override the default value to connect to |
| `druid.azure.container` | Azure container |
| `druid.azure.prefix` | Azure prefix |
| `druid.azure.protocol` | Azure protocol (http or https ) |
| `druid.azure.maxTries` | Max tries to connect to Azure |
| `druid.azure.maxListingLength` | Azure max listing length |

#### Optional properties

| Property | Description | Default |
Expand All @@ -342,7 +358,7 @@ These are the options for `DruidSource`, to be passed with `write.options()`.
| `druid.exclude_dimensions` | Comma separated list of Spark input columns that have to be excluded in Druid ingestion | |
| `druid.segment.max_rows` | Max number of rows per segment | `5000000` |
| `druid.memory.max_rows` | Max number of rows to keep in memory in spark data writer | `75000` |
| `druid.segment_storage.type` | Type of Deep Storage to use. Allowed values: `s3`, `local`, `hdfs`. | `s3` |
| `druid.segment_storage.type` | Type of Deep Storage to use. Allowed values: `s3`, `local`, `hdfs`, `azure`. | `s3` |
| `druid.segment_storage.s3.disableacl` | Whether to disable ACL in S3 config. | `false` |
| `druid.datasource.init` | Boolean flag for (re-)initializing Druid datasource. If `true`, any pre-existing segments for the datasource is marked as unused. | `false` |
| `druid.bitmap_factory` | Compression format for bitmap indexes. Possible values: `concise`, `roaring`. For type `roaring`, the boolean property compressRunOnSerialization is always set to `true`. `rovio-ingest` uses `concise` by default regardless of Druid library version. | `concise` |
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@
<artifactId>druid-hdfs-storage</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-azure-extensions</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-datasketches</artifactId>
Expand Down
84 changes: 83 additions & 1 deletion src/main/java/com/rovio/ingest/WriterContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ public class WriterContext implements Serializable {
private final String hdfsDefaultFS;
private final String hdfsSecurityKerberosPrincipal;
private final String hdfsSecurityKerberosKeytab;
private final String azureAccount;
private final String azureKey;
private final String azureSharedAccessStorageToken;
private final Boolean azureUseAzureCredentialsChain;
private final String azureContainer;
private final String azurePrefix;
private final String azureManagedIdentityClientId;
private final String azureProtocol;
private final int azureMaxTries;
private final int azureMaxListingLength;
private final String azureEndpointSuffix;
private final String deepStorageType;
private final boolean initDataSource;
private final String version;
Expand Down Expand Up @@ -108,9 +119,20 @@ private WriterContext(CaseInsensitiveStringMap options, String version) {
this.hdfsDefaultFS = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_DEFAULT_FS, null);
this.hdfsSecurityKerberosPrincipal = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_SECURITY_KERBEROS_PRINCIPAL, null);
this.hdfsSecurityKerberosKeytab = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_SECURITY_KERBEROS_KEYTAB, null);
this.azureAccount = options.getOrDefault(ConfKeys.DEEP_STORAGE_AZURE_ACCOUNT, null);
this.azureKey = options.getOrDefault(ConfKeys.DEEP_STORAGE_AZURE_KEY, null);
this.azureSharedAccessStorageToken = options.getOrDefault(ConfKeys.DEEP_STORAGE_AZURE_SHAREDACCESSSTORAGETOKEN, null);
this.azureUseAzureCredentialsChain = options.getBoolean(ConfKeys.DEEP_STORAGE_AZURE_USEAZURECRENDENTIALSCHAIN, false);
this.azureContainer = options.getOrDefault(ConfKeys.DEEP_STORAGE_AZURE_CONTAINER, null);
this.azurePrefix = options.getOrDefault(ConfKeys.DEEP_STORAGE_AZURE_PREFIX, "");
this.azureProtocol = options.getOrDefault(ConfKeys.DEEP_STORAGE_AZURE_PROTOCOL, "https");
this.azureMaxTries = options.getInt(ConfKeys.DEEP_STORAGE_AZURE_MAXTRIES, 3);
this.azureMaxListingLength = options.getInt(ConfKeys.DEEP_STORAGE_AZURE_MAXLISTINGLENGTH, 1024);
this.azureEndpointSuffix = options.getOrDefault(ConfKeys.DEEP_STORAGE_AZURE_ENDPOINTSUFFIX, "core.windows.net");
this.azureManagedIdentityClientId = options.getOrDefault(ConfKeys.DEEP_STORAGE_AZURE_MANAGEDIDENTITYCLIENTID, null);

this.deepStorageType = options.getOrDefault(ConfKeys.DEEP_STORAGE_TYPE, DEFAULT_DRUID_DEEP_STORAGE_TYPE);
Preconditions.checkArgument(Arrays.asList("s3", "local", "hdfs").contains(this.deepStorageType),
Preconditions.checkArgument(Arrays.asList("s3", "local", "hdfs", "azure").contains(this.deepStorageType),
String.format("Invalid %s: %s", ConfKeys.DEEP_STORAGE_TYPE, this.deepStorageType));

this.initDataSource = options.getBoolean(ConfKeys.DATASOURCE_INIT, false);
Expand Down Expand Up @@ -228,6 +250,50 @@ public String getHdfsSecurityKerberosKeytab() {
return hdfsSecurityKerberosKeytab;
}

public String getAzureAccount() {
return azureAccount;
}

public String getAzureKey() {
return azureKey;
}

public String getAzureSharedAccessStorageToken() {
return azureSharedAccessStorageToken;
}

public Boolean getAzureUseAzureCredentialsChain() {
return azureUseAzureCredentialsChain;
}

public String getAzureContainer() {
return azureContainer;
}

public String getAzurePrefix() {
return azurePrefix;
}

public String getAzureProtocol() {
return azureProtocol;
}

public int getAzureMaxTries() {
return azureMaxTries;
}

public int getAzureMaxListingLength() {
return azureMaxListingLength;
}

public String getAzureEndpointSuffix() {
return azureEndpointSuffix;
}

public String getAzureManagedIdentityClientId() {
return azureManagedIdentityClientId;
}

public boolean isInitDataSource() {
return initDataSource;
}
Expand All @@ -244,6 +310,10 @@ public boolean isHdfsDeepStorage() {
return "hdfs".equals(deepStorageType);
}

public boolean isAzureDeepStorage() {
return "azure".equals(deepStorageType);
}

public boolean isRollup() {
return rollup;
}
Expand Down Expand Up @@ -306,5 +376,17 @@ public static class ConfKeys {
public static final String DEEP_STORAGE_HDFS_DEFAULT_FS = "druid.segment_storage.hdfs.default.fs";
public static final String DEEP_STORAGE_HDFS_SECURITY_KERBEROS_PRINCIPAL = "druid.segment_storage.hdfs.security.kerberos.principal";
public static final String DEEP_STORAGE_HDFS_SECURITY_KERBEROS_KEYTAB = "druid.segment_storage.hdfs.security.kerberos.keytab";
// Azure config
public static final String DEEP_STORAGE_AZURE_ACCOUNT = "druid.azure.account";
public static final String DEEP_STORAGE_AZURE_KEY = "druid.azure.key";
public static final String DEEP_STORAGE_AZURE_SHAREDACCESSSTORAGETOKEN = "druid.azure.sharedAccessStorageToken";
public static final String DEEP_STORAGE_AZURE_USEAZURECRENDENTIALSCHAIN = "druid.azure.useAzureCredentialsChain";
public static final String DEEP_STORAGE_AZURE_CONTAINER = "druid.azure.container";
public static final String DEEP_STORAGE_AZURE_PREFIX = "druid.azure.prefix";
public static final String DEEP_STORAGE_AZURE_PROTOCOL = "druid.azure.protocol";
public static final String DEEP_STORAGE_AZURE_MAXTRIES = "druid.azure.maxTries";
public static final String DEEP_STORAGE_AZURE_MAXLISTINGLENGTH = "druid.azure.maxListingLength";
public static final String DEEP_STORAGE_AZURE_ENDPOINTSUFFIX = "druid.azure.endpointSuffix";
public static final String DEEP_STORAGE_AZURE_MANAGEDIDENTITYCLIENTID = "druid.azure.managedIdentityClientId";
}
}
60 changes: 60 additions & 0 deletions src/main/java/com/rovio/ingest/util/SegmentStorageUpdater.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,19 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.rovio.ingest.WriterContext;
import com.rovio.ingest.util.azure.LocalAzureAccountConfig;
import com.rovio.ingest.util.azure.LocalAzureClientFactory;
import com.rovio.ingest.util.azure.LocalAzureCloudBlobIterableFactory;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentKiller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.storage.azure.AzureDataSegmentConfig;
import org.apache.druid.storage.azure.AzureDataSegmentKiller;
import org.apache.druid.storage.azure.AzureDataSegmentPusher;
import org.apache.druid.storage.azure.AzureInputDataConfig;
import org.apache.druid.storage.azure.AzureStorage;
import org.apache.druid.storage.hdfs.HdfsDataSegmentKiller;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPusher;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
Expand Down Expand Up @@ -59,6 +67,30 @@ public static DataSegmentPusher createPusher(WriterContext param) {
getHdfsHadoopConfiguration(param.getHdfsCoreSitePath(), param.getHdfsHdfsSitePath(), param.getHdfsDefaultFS()),
MAPPER
);
} else if (param.isAzureDeepStorage()) {
LocalAzureAccountConfig azureAccountConfig = new LocalAzureAccountConfig();
azureAccountConfig.setAccount(param.getAzureAccount());
if (param.getAzureKey() != null && !param.getAzureKey().isEmpty()) {
azureAccountConfig.setKey(param.getAzureKey());
}
if (param.getAzureSharedAccessStorageToken() != null && !param.getAzureSharedAccessStorageToken().isEmpty()) {
azureAccountConfig.setSharedAccessStorageToken(param.getAzureSharedAccessStorageToken());
}
if (param.getAzureEndpointSuffix() != null && !param.getAzureEndpointSuffix().isEmpty()) {
azureAccountConfig.setEndpointSuffix(param.getAzureEndpointSuffix());
}
if (param.getAzureManagedIdentityClientId() != null && !param.getAzureManagedIdentityClientId().isEmpty()) {
azureAccountConfig.setManagedIdentityClientId(param.getAzureManagedIdentityClientId());
}
azureAccountConfig.setUseAzureCredentialsChain(param.getAzureUseAzureCredentialsChain());
azureAccountConfig.setProtocol(param.getAzureProtocol());
azureAccountConfig.setMaxTries(param.getAzureMaxTries());
LocalAzureClientFactory azureClientFactory = new LocalAzureClientFactory(azureAccountConfig);
AzureStorage azureStorage = new AzureStorage(azureClientFactory);
AzureDataSegmentConfig azureDataSegmentConfig = new AzureDataSegmentConfig();
azureDataSegmentConfig.setContainer(param.getAzureContainer());
azureDataSegmentConfig.setPrefix(param.getAzurePrefix());
return new AzureDataSegmentPusher(azureStorage, azureAccountConfig, azureDataSegmentConfig);
} else {
ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3 = getAmazonS3().get();
S3DataSegmentPusherConfig s3Config = new S3DataSegmentPusherConfig();
Expand All @@ -84,6 +116,34 @@ public static DataSegmentKiller createKiller(WriterContext param) {
getHdfsHadoopConfiguration(param.getHdfsCoreSitePath(), param.getHdfsHdfsSitePath(), param.getHdfsDefaultFS())
)
);
} else if (param.isAzureDeepStorage()) {

LocalAzureAccountConfig azureAccountConfig = new LocalAzureAccountConfig();
azureAccountConfig.setAccount(param.getAzureAccount());
if (param.getAzureKey() != null && !param.getAzureKey().isEmpty()) {
azureAccountConfig.setKey(param.getAzureKey());
}
if (param.getAzureSharedAccessStorageToken() != null && !param.getAzureSharedAccessStorageToken().isEmpty()) {
azureAccountConfig.setSharedAccessStorageToken(param.getAzureSharedAccessStorageToken());
}
if (param.getAzureEndpointSuffix() != null && !param.getAzureEndpointSuffix().isEmpty()) {
azureAccountConfig.setEndpointSuffix(param.getAzureEndpointSuffix());
}
if (param.getAzureManagedIdentityClientId() != null && !param.getAzureManagedIdentityClientId().isEmpty()) {
azureAccountConfig.setManagedIdentityClientId(param.getAzureManagedIdentityClientId());
}
azureAccountConfig.setUseAzureCredentialsChain(param.getAzureUseAzureCredentialsChain());
azureAccountConfig.setProtocol(param.getAzureProtocol());
azureAccountConfig.setMaxTries(param.getAzureMaxTries());
LocalAzureClientFactory azureClientFactory = new LocalAzureClientFactory(azureAccountConfig);
AzureStorage azureStorage = new AzureStorage(azureClientFactory);
AzureDataSegmentConfig azureDataSegmentConfig = new AzureDataSegmentConfig();
azureDataSegmentConfig.setContainer(param.getAzureContainer());
azureDataSegmentConfig.setPrefix(param.getAzurePrefix());
AzureInputDataConfig azureInputDataConfig = new AzureInputDataConfig();
azureInputDataConfig.setMaxListingLength(param.getAzureMaxListingLength());
LocalAzureCloudBlobIterableFactory azureFactory = new LocalAzureCloudBlobIterableFactory();
return new AzureDataSegmentKiller(azureDataSegmentConfig, azureInputDataConfig, azureAccountConfig, azureStorage, azureFactory);
} else {
Supplier<ServerSideEncryptingAmazonS3> serverSideEncryptingAmazonS3 = getAmazonS3();
S3DataSegmentPusherConfig s3Config = new S3DataSegmentPusherConfig();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2021 Rovio Entertainment Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.rovio.ingest.util.azure;

import com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.druid.storage.azure.AzureAccountConfig;

public class LocalAzureAccountConfig extends AzureAccountConfig {
@JsonProperty
private String managedIdentityClientId;

@SuppressWarnings("unused") // Used by Jackson deserialization?
public void setManagedIdentityClientId(String managedIdentityClientId) {
this.managedIdentityClientId = managedIdentityClientId;
}

public String getManagedIdentityClientId() {
return managedIdentityClientId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2021 Rovio Entertainment Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.rovio.ingest.util.azure;

import com.azure.core.http.policy.ExponentialBackoffOptions;
import com.azure.core.http.policy.RetryOptions;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import org.apache.druid.storage.azure.AzureClientFactory;

public class LocalAzureClientFactory extends AzureClientFactory {

private final LocalAzureAccountConfig config;
private final Map<Integer, BlobServiceClient> cachedBlobServiceClients;

public LocalAzureClientFactory(LocalAzureAccountConfig config) {
super(config);
this.config = config;
this.cachedBlobServiceClients = new HashMap<>();
}

// It's okay to store clients in a map here because all the configs for specifying azure retries are static, and there are only 2 of them.
// The 2 configs are AzureAccountConfig.maxTries and AzureOutputConfig.maxRetry.
// We will only ever have at most 2 clients in cachedBlobServiceClients.
public BlobServiceClient getBlobServiceClient(Integer retryCount) {
if (!cachedBlobServiceClients.containsKey(retryCount)) {
BlobServiceClientBuilder clientBuilder = getAuthenticatedBlobServiceClientBuilder()
.retryOptions(new RetryOptions(
new ExponentialBackoffOptions()
.setMaxRetries(retryCount != null ? retryCount : config.getMaxTries())
.setBaseDelay(Duration.ofMillis(1000))
.setMaxDelay(Duration.ofMillis(60000))
));
cachedBlobServiceClients.put(retryCount, clientBuilder.buildClient());
}

return cachedBlobServiceClients.get(retryCount);
}

// Mainly here to make testing easier.
public BlobBatchClient getBlobBatchClient(BlobContainerClient blobContainerClient) {
return new BlobBatchClientBuilder(blobContainerClient).buildClient();
}

private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder() {
BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder()
.endpoint(config.getProtocol() + "://" + config.getAccount() + "." + config.getBlobStorageEndpoint());

if (config.getKey() != null) {
clientBuilder.credential(new StorageSharedKeyCredential(config.getAccount(), config.getKey()));
} else if (config.getSharedAccessStorageToken() != null) {
clientBuilder.sasToken(config.getSharedAccessStorageToken());
} else if (config.getUseAzureCredentialsChain()) {
// We might not use the managed identity client id in the credential chain but we can just set it here and it will no-op.
DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder()
.managedIdentityClientId(config.getManagedIdentityClientId());
clientBuilder.credential(defaultAzureCredentialBuilder.build());
}
return clientBuilder;
}
}
Loading

0 comments on commit 349e396

Please sign in to comment.