Skip to content

Commit

Permalink
Refactor Flint Auth
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Oct 3, 2023
1 parent 9a2151b commit 82a348f
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Map;

public enum AuthenticationType {
NOAUTH("noauth"),
BASICAUTH("basicauth"),
AWSSIGV4AUTH("awssigv4");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.auth.AuthenticationType;
import org.opensearch.sql.datasources.utils.DatasourceValidationUtils;
import org.opensearch.sql.storage.DataSourceFactory;

Expand All @@ -20,9 +21,14 @@ public class GlueDataSourceFactory implements DataSourceFactory {
// Glue configuration properties
public static final String GLUE_AUTH_TYPE = "glue.auth.type";
public static final String GLUE_ROLE_ARN = "glue.auth.role_arn";
public static final String FLINT_URI = "glue.indexstore.opensearch.uri";
public static final String FLINT_AUTH = "glue.indexstore.opensearch.auth";
public static final String FLINT_REGION = "glue.indexstore.opensearch.region";
public static final String GLUE_INDEX_STORE_OPENSEARCH_URI = "glue.indexstore.opensearch.uri";
public static final String GLUE_INDEX_STORE_OPENSEARCH_AUTH = "glue.indexstore.opensearch.auth";
public static final String GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME =
"glue.indexstore.opensearch.auth.username";
public static final String GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD =
"glue.indexstore.opensearch.auth.password";
public static final String GLUE_INDEX_STORE_OPENSEARCH_REGION =
"glue.indexstore.opensearch.region";

@Override
public DataSourceType getDataSourceType() {
Expand All @@ -46,11 +52,34 @@ public DataSource createDataSource(DataSourceMetadata metadata) {

private void validateGlueDataSourceConfiguration(Map<String, String> dataSourceMetadataConfig)
throws URISyntaxException, UnknownHostException {
DatasourceValidationUtils.validateLengthAndRequiredFields(
dataSourceMetadataConfig,
Set.of(GLUE_AUTH_TYPE, GLUE_ROLE_ARN, FLINT_URI, FLINT_REGION, FLINT_AUTH));
AuthenticationType authenticationType =
AuthenticationType.get(dataSourceMetadataConfig.get(GLUE_INDEX_STORE_OPENSEARCH_AUTH));
if (AuthenticationType.BASICAUTH.equals(authenticationType)) {
DatasourceValidationUtils.validateLengthAndRequiredFields(
dataSourceMetadataConfig,
Set.of(
GLUE_AUTH_TYPE,
GLUE_ROLE_ARN,
GLUE_INDEX_STORE_OPENSEARCH_URI,
GLUE_INDEX_STORE_OPENSEARCH_REGION,
GLUE_INDEX_STORE_OPENSEARCH_AUTH,
GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME,
GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD));
} else if (AuthenticationType.NOAUTH.equals(authenticationType)) {
DatasourceValidationUtils.validateLengthAndRequiredFields(
dataSourceMetadataConfig,
Set.of(
GLUE_AUTH_TYPE,
GLUE_ROLE_ARN,
GLUE_INDEX_STORE_OPENSEARCH_URI,
GLUE_INDEX_STORE_OPENSEARCH_REGION,
GLUE_INDEX_STORE_OPENSEARCH_AUTH));
} else {
throw new UnsupportedOperationException(
String.format("Unsupported IndexStore Authentication Type : %s", authenticationType));
}
DatasourceValidationUtils.validateHost(
dataSourceMetadataConfig.get(FLINT_URI),
dataSourceMetadataConfig.get(GLUE_INDEX_STORE_OPENSEARCH_URI),
pluginSettings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.auth.AuthenticationType;
import org.opensearch.sql.datasources.encryptor.Encryptor;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.datasources.service.DataSourceMetadataStorage;
Expand Down Expand Up @@ -252,26 +251,13 @@ private List<DataSourceMetadata> searchInDataSourcesIndex(QueryBuilder query) {
}
}

@SuppressWarnings("missingswitchdefault")
// Encrypt and Decrypt irrespective of auth type.If properties name ends in username, password,
// secret_key and access_key.
private DataSourceMetadata encryptDecryptAuthenticationData(
DataSourceMetadata dataSourceMetadata, Boolean isEncryption) {
Map<String, String> propertiesMap = dataSourceMetadata.getProperties();
Optional<AuthenticationType> authTypeOptional =
propertiesMap.keySet().stream()
.filter(s -> s.endsWith("auth.type"))
.findFirst()
.map(propertiesMap::get)
.map(AuthenticationType::get);
if (authTypeOptional.isPresent()) {
switch (authTypeOptional.get()) {
case BASICAUTH:
handleBasicAuthPropertiesEncryptionDecryption(propertiesMap, isEncryption);
break;
case AWSSIGV4AUTH:
handleSigV4PropertiesEncryptionDecryption(propertiesMap, isEncryption);
break;
}
}
handleBasicAuthPropertiesEncryptionDecryption(propertiesMap, isEncryption);
handleSigV4PropertiesEncryptionDecryption(propertiesMap, isEncryption);
return dataSourceMetadata;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void testCreateGLueDatSource() {
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "false");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
Expand All @@ -59,6 +59,89 @@ void testCreateGLueDatSource() {
"Glue storage engine is not supported.", unsupportedOperationException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithBasicAuthForIndexStore() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(Collections.emptyList());
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "basicauth");
properties.put("glue.indexstore.opensearch.auth.username", "username");
properties.put("glue.indexstore.opensearch.auth.password", "password");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
DataSource dataSource = glueDatasourceFactory.createDataSource(metadata);
Assertions.assertEquals(DataSourceType.S3GLUE, dataSource.getConnectorType());
UnsupportedOperationException unsupportedOperationException =
Assertions.assertThrows(
UnsupportedOperationException.class,
() ->
dataSource
.getStorageEngine()
.getTable(new DataSourceSchemaName("my_glue", "default"), "alb_logs"));
Assertions.assertEquals(
"Glue storage engine is not supported.", unsupportedOperationException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithAwsSigV4AuthForIndexStore() {
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "awssigv4");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
UnsupportedOperationException unsupportedOperationException =
Assertions.assertThrows(
UnsupportedOperationException.class,
() -> glueDatasourceFactory.createDataSource(metadata));
Assertions.assertEquals(
"Unsupported IndexStore Authentication Type : AWSSIGV4AUTH",
unsupportedOperationException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithBasicAuthForIndexStoreAndMissingFields() {
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "basicauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata));
Assertions.assertEquals(
"Missing [glue.indexstore.opensearch.auth.password,"
+ " glue.indexstore.opensearch.auth.username] fields in the connector properties.",
illegalArgumentException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithInvalidFlintHost() {
Expand All @@ -71,7 +154,7 @@ void testCreateGLueDatSourceWithInvalidFlintHost() {
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "false");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
Expand Down Expand Up @@ -100,7 +183,7 @@ void testCreateGLueDatSourceWithInvalidFlintHostSyntax() {
properties.put(
"glue.indexstore.opensearch.uri",
"http://dummyprometheus.com:9090? paramt::localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "false");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
Expand Down
7 changes: 4 additions & 3 deletions docs/user/ppl/admin/connectors/s3glue_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ Glue Connector Properties.
* This parameters provides the Opensearch domain host information for glue connector. This opensearch instance is used for writing index data back and also
* ``glue.indexstore.opensearch.uri`` [Required]
* ``glue.indexstore.opensearch.auth`` [Required]
* Default value for auth is ``false``.
* Accepted values include ["noauth", "basicauth"]
* ``glue.indexstore.opensearch.region`` [Required]
* Default value for auth is ``us-west-2``.

Sample Glue dataSource configuration
========================================
Expand All @@ -55,7 +54,9 @@ Glue datasource configuration::
"glue.auth.type": "iam_role",
"glue.auth.role_arn": "role_arn",
"glue.indexstore.opensearch.uri": "http://localhost:9200",
"glue.indexstore.opensearch.auth" :"false",
"glue.indexstore.opensearch.auth" :"basicauth",
"glue.indexstore.opensearch.auth.username" :"username"
"glue.indexstore.opensearch.auth.password" :"password"
"glue.indexstore.opensearch.region": "us-west-2"
}
}]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public class PrometheusStorageFactory implements DataSourceFactory {
public static final String REGION = "prometheus.auth.region";
public static final String ACCESS_KEY = "prometheus.auth.access_key";
public static final String SECRET_KEY = "prometheus.auth.secret_key";
private static final Integer MAX_LENGTH_FOR_CONFIG_PROPERTY = 1000;

private final Settings settings;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public class SparkConstants {
public static final String FLINT_INDEX_STORE_PORT_KEY = "spark.datasource.flint.port";
public static final String FLINT_INDEX_STORE_SCHEME_KEY = "spark.datasource.flint.scheme";
public static final String FLINT_INDEX_STORE_AUTH_KEY = "spark.datasource.flint.auth";
public static final String FLINT_INDEX_STORE_AUTH_USERNAME =
"spark.datasource.flint.auth.username";
public static final String FLINT_INDEX_STORE_AUTH_PASSWORD =
"spark.datasource.flint.auth.password";
public static final String FLINT_INDEX_STORE_AWSREGION_KEY = "spark.datasource.flint.region";
public static final String FLINT_CREDENTIALS_PROVIDER_KEY =
"spark.datasource.flint.customAWSCredentialsProvider";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@

package org.opensearch.sql.spark.dispatcher;

import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DELEGATE_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_PASSWORD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_USERNAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY;
Expand All @@ -27,6 +33,7 @@
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.auth.AuthenticationType;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.spark.asyncquery.model.S3GlueSparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand Down Expand Up @@ -117,19 +124,38 @@ private String constructSparkParameters(String datasourceName) {
String.format(
"Bad URI in indexstore configuration of the : %s datasoure.", datasourceName));
}
String auth = dataSourceMetadata.getProperties().get("glue.indexstore.opensearch.auth");
String region = dataSourceMetadata.getProperties().get("glue.indexstore.opensearch.region");
String auth = dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH);
String region = dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION);

s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_HOST_KEY, uri.getHost());
s3GlueSparkSubmitParameters.addParameter(
FLINT_INDEX_STORE_PORT_KEY, String.valueOf(uri.getPort()));
s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_SCHEME_KEY, uri.getScheme());
s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_KEY, auth);
setFlintIndexStoreAuthProperties(dataSourceMetadata, s3GlueSparkSubmitParameters, auth);
s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AWSREGION_KEY, region);
s3GlueSparkSubmitParameters.addParameter(
"spark.sql.catalog." + datasourceName, FLINT_DELEGATE_CATALOG);
return s3GlueSparkSubmitParameters.toString();
}

private static void setFlintIndexStoreAuthProperties(DataSourceMetadata dataSourceMetadata,
S3GlueSparkSubmitParameters s3GlueSparkSubmitParameters,
String authType) {
if (AuthenticationType.get(authType).equals(AuthenticationType.BASICAUTH)) {
s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_KEY, authType);
String username =
dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME);
String password =
dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD);
s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_USERNAME, username);
s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_PASSWORD, password);

Check warning on line 151 in spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java#L145-L151

Added lines #L145 - L151 were not covered by tests
} else if(AuthenticationType.get(authType).equals(AuthenticationType.AWSSIGV4AUTH)) {
s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_KEY, "sigv4");
} else {
s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_KEY, authType);

Check warning on line 155 in spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java#L155

Added line #L155 was not covered by tests
}
}

private StartJobRequest getStartJobRequestForNonIndexQueries(
DispatchQueryRequest dispatchQueryRequest) {
StartJobRequest startJobRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ private DataSourceMetadata constructMyGlueDataSourceMetadata() {
properties.put(
"glue.indexstore.opensearch.uri",
"https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com");
properties.put("glue.indexstore.opensearch.auth", "sigv4");
properties.put("glue.indexstore.opensearch.auth", "awssigv4");
properties.put("glue.indexstore.opensearch.region", "eu-west-1");
dataSourceMetadata.setProperties(properties);
return dataSourceMetadata;
Expand All @@ -415,7 +415,7 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBadURISyntax() {
properties.put(
"glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9090? param");
properties.put("glue.indexstore.opensearch.auth", "sigv4");
properties.put("glue.indexstore.opensearch.auth", "awssigv4");
properties.put("glue.indexstore.opensearch.region", "eu-west-1");
dataSourceMetadata.setProperties(properties);
return dataSourceMetadata;
Expand Down

0 comments on commit 82a348f

Please sign in to comment.