From c7d2bca08d172129c790cdf257d7c770c4812b98 Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Wed, 6 Sep 2023 22:54:19 -0700 Subject: [PATCH] Glue datasource support Signed-off-by: Vamsi Manohar --- .../sql/datasource/model/DataSourceType.java | 4 +- .../glue/GlueDataSourceFactory.java | 56 +++++++++ .../utils/DatasourceValidationUtils.java | 3 +- .../glue/GlueDataSourceFactoryTest.java | 115 ++++++++++++++++++ .../utils/DatasourceValidationUtilsTest.java | 4 +- .../{ => connectors}/prometheus_connector.rst | 0 .../ppl/admin/connectors/s3glue_connector.rst | 68 +++++++++++ .../{ => connectors}/spark_connector.rst | 0 .../org/opensearch/sql/plugin/SQLPlugin.java | 2 + .../storage/PrometheusStorageFactoryTest.java | 6 +- 10 files changed, 250 insertions(+), 8 deletions(-) create mode 100644 datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java create mode 100644 datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java rename docs/user/ppl/admin/{ => connectors}/prometheus_connector.rst (100%) create mode 100644 docs/user/ppl/admin/connectors/s3glue_connector.rst rename docs/user/ppl/admin/{ => connectors}/spark_connector.rst (100%) diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java index 5010e41942..a3c7c73d6b 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java @@ -8,7 +8,9 @@ public enum DataSourceType { PROMETHEUS("prometheus"), OPENSEARCH("opensearch"), - SPARK("spark"); + SPARK("spark"), + S3GLUE("s3glue"); + private String text; DataSourceType(String text) { diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java new file mode 100644 index 0000000000..24f94376bf --- /dev/null +++ b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java @@ -0,0 +1,56 @@ +package org.opensearch.sql.datasources.glue; + +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.Map; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.datasources.utils.DatasourceValidationUtils; +import org.opensearch.sql.storage.DataSourceFactory; + +@RequiredArgsConstructor +public class GlueDataSourceFactory implements DataSourceFactory { + + private final Settings pluginSettings; + + // Glue configuration properties + public static final String GLUE_AUTH_TYPE = "glue.auth.type"; + public static final String GLUE_ROLE_ARN = "glue.auth.role_arn"; + public static final String FLINT_URI = "glue.indexstore.opensearch.uri"; + public static final String FLINT_AUTH = "glue.indexstore.opensearch.auth"; + public static final String FLINT_REGION = "glue.indexstore.opensearch.region"; + + @Override + public DataSourceType getDataSourceType() { + return DataSourceType.S3GLUE; + } + + @Override + public DataSource createDataSource(DataSourceMetadata metadata) { + try { + validateGlueDataSourceConfiguration(metadata.getProperties()); + return new DataSource( + metadata.getName(), + metadata.getConnector(), + (dataSourceSchemaName, tableName) -> { + throw new UnsupportedOperationException("Glue storage engine is not supported."); + }); + } catch (URISyntaxException | UnknownHostException e) { + throw new IllegalArgumentException("Invalid flint host in properties."); + } + } + + private void validateGlueDataSourceConfiguration(Map dataSourceMetadataConfig) + throws URISyntaxException, UnknownHostException { + DatasourceValidationUtils.validateLengthAndRequiredFields( + dataSourceMetadataConfig, + Set.of(GLUE_AUTH_TYPE, GLUE_ROLE_ARN, FLINT_URI, FLINT_REGION, FLINT_AUTH)); + DatasourceValidationUtils.validateHost( + dataSourceMetadataConfig.get(FLINT_URI), + pluginSettings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)); + } +} diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/utils/DatasourceValidationUtils.java b/datasources/src/main/java/org/opensearch/sql/datasources/utils/DatasourceValidationUtils.java index ba7458d0b4..86d3d65d30 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/utils/DatasourceValidationUtils.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/utils/DatasourceValidationUtils.java @@ -42,8 +42,7 @@ public static void validateLengthAndRequiredFields( StringBuilder errorStringBuilder = new StringBuilder(); if (missingFields.size() > 0) { errorStringBuilder.append( - String.format( - "Missing %s fields in the Prometheus connector properties.", missingFields)); + String.format("Missing %s fields in the connector properties.", missingFields)); } if (invalidLengthFields.size() > 0) { diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java new file mode 100644 index 0000000000..b018e5f9dc --- /dev/null +++ b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java @@ -0,0 +1,115 @@ +package org.opensearch.sql.datasources.glue; + +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.DataSourceSchemaName; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; + +@ExtendWith(MockitoExtension.class) +public class GlueDataSourceFactoryTest { + + @Mock private Settings settings; + + @Test + void testGetConnectorType() { + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + Assertions.assertEquals(DataSourceType.S3GLUE, glueDatasourceFactory.getDataSourceType()); + } + + @Test + @SneakyThrows + void testCreateGLueDatSource() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(Collections.emptyList()); + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + DataSourceMetadata metadata = new DataSourceMetadata(); + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "false"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + + metadata.setName("my_glue"); + metadata.setConnector(DataSourceType.S3GLUE); + metadata.setProperties(properties); + DataSource dataSource = glueDatasourceFactory.createDataSource(metadata); + Assertions.assertEquals(DataSourceType.S3GLUE, dataSource.getConnectorType()); + UnsupportedOperationException unsupportedOperationException = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + dataSource + .getStorageEngine() + .getTable(new DataSourceSchemaName("my_glue", "default"), "alb_logs")); + Assertions.assertEquals( + "Glue storage engine is not supported.", unsupportedOperationException.getMessage()); + } + + @Test + @SneakyThrows + void testCreateGLueDatSourceWithInvalidFlintHost() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(List.of("127.0.0.0/8")); + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + DataSourceMetadata metadata = new DataSourceMetadata(); + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "false"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + + metadata.setName("my_glue"); + metadata.setConnector(DataSourceType.S3GLUE); + metadata.setProperties(properties); + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); + Assertions.assertEquals( + "Disallowed hostname in the uri. " + + "Validate with plugins.query.datasources.uri.hosts.denylist config", + illegalArgumentException.getMessage()); + } + + @Test + @SneakyThrows + void testCreateGLueDatSourceWithInvalidFlintHostSyntax() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(List.of("127.0.0.0/8")); + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + DataSourceMetadata metadata = new DataSourceMetadata(); + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put( + "glue.indexstore.opensearch.uri", + "http://dummyprometheus.com:9090? paramt::localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "false"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + + metadata.setName("my_glue"); + metadata.setConnector(DataSourceType.S3GLUE); + metadata.setProperties(properties); + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); + Assertions.assertEquals( + "Invalid flint host in properties.", illegalArgumentException.getMessage()); + } +} diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/utils/DatasourceValidationUtilsTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/utils/DatasourceValidationUtilsTest.java index 836c61f647..114179aa45 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/utils/DatasourceValidationUtilsTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/utils/DatasourceValidationUtilsTest.java @@ -62,7 +62,7 @@ public void testValidateLengthAndRequiredFieldsWithAbsentField() { DatasourceValidationUtils.validateLengthAndRequiredFields( config, Set.of("s3.uri", "s3.auth.type"))); Assertions.assertEquals( - "Missing [s3.auth.type] fields in the Prometheus connector properties.", + "Missing [s3.auth.type] fields in the connector properties.", illegalArgumentException.getMessage()); } @@ -77,7 +77,7 @@ public void testValidateLengthAndRequiredFieldsWithInvalidLength() { DatasourceValidationUtils.validateLengthAndRequiredFields( config, Set.of("s3.uri", "s3.auth.type"))); Assertions.assertEquals( - "Missing [s3.auth.type] fields in the Prometheus connector properties.Fields " + "Missing [s3.auth.type] fields in the connector properties.Fields " + "[s3.uri] exceeds more than 1000 characters.", illegalArgumentException.getMessage()); } diff --git a/docs/user/ppl/admin/prometheus_connector.rst b/docs/user/ppl/admin/connectors/prometheus_connector.rst similarity index 100% rename from docs/user/ppl/admin/prometheus_connector.rst rename to docs/user/ppl/admin/connectors/prometheus_connector.rst diff --git a/docs/user/ppl/admin/connectors/s3glue_connector.rst b/docs/user/ppl/admin/connectors/s3glue_connector.rst new file mode 100644 index 0000000000..640eb90283 --- /dev/null +++ b/docs/user/ppl/admin/connectors/s3glue_connector.rst @@ -0,0 +1,68 @@ +.. highlight:: sh + +==================== +S3Glue Connector +==================== + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 1 + + +Introduction +============ + +s3Glue connector provides a way to query s3 files using glue as metadata store and spark as execution engine. +This page covers s3Glue datasource configuration and also how to query and s3Glue datasource. + + +Required resources for s3 Glue Connector +=================================== +* S3: This is where the data lies. +* Spark Execution Engine: Query Execution happens on spark. +* Glue Metadata store: Glue takes care of table metadata. +* Opensearch: Index for s3 data lies in opensearch and also acts as temporary buffer for query results. + +We currently only support emr-serverless as spark execution engine and Glue as metadata store. we will add more support in future. + +Glue Connector Properties in DataSource Configuration +======================================================== +Glue Connector Properties. + +* ``glue.auth.type`` [Required] + * This parameters provides the authentication type information required for execution engine to connect to glue. + * S3 Glue connector currently only supports ``iam_role`` authentication and the below parameters is required. + * ``glue.auth.role_arn`` +* ``glue.indexstore.opensearch.*`` [Required] + * This parameters provides the Opensearch domain host information for glue connector. This opensearch instance is used for writing index data back and also + * ``glue.indexstore.opensearch.uri`` [Required] + * ``glue.indexstore.opensearch.auth`` [Required] + * Default value for auth is ``false``. + * ``glue.indexstore.opensearch.region`` [Required] + * Default value for auth is ``us-west-2``. + +Sample Glue dataSource configuration +======================================== + +Glue datasource configuration:: + + [{ + "name" : "my_glue", + "connector": "s3glue", + "properties" : { + "glue.auth.type": "iam_role", + "glue.auth.role_arn": "role_arn", + "glue.indexstore.opensearch.uri": "http://localhost:9200", + "glue.indexstore.opensearch.auth" :"false", + "glue.indexstore.opensearch.region": "us-west-2" + } + }] + + +Sample s3Glue datasource queries +================================ + + + diff --git a/docs/user/ppl/admin/spark_connector.rst b/docs/user/ppl/admin/connectors/spark_connector.rst similarity index 100% rename from docs/user/ppl/admin/spark_connector.rst rename to docs/user/ppl/admin/connectors/spark_connector.rst diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index d2c1e61c3a..80e1a6b1a3 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -51,6 +51,7 @@ import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; import org.opensearch.sql.datasources.encryptor.EncryptorImpl; +import org.opensearch.sql.datasources.glue.GlueDataSourceFactory; import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse; import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse; import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse; @@ -264,6 +265,7 @@ private DataSourceServiceImpl createDataSourceService() { new OpenSearchNodeClient(this.client), pluginSettings)) .add(new PrometheusStorageFactory(pluginSettings)) .add(new SparkStorageFactory(this.client, pluginSettings)) + .add(new GlueDataSourceFactory(pluginSettings)) .build(), dataSourceMetadataStorage, dataSourceUserAuthorizationHelper); diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java index 46658699ca..bd64f98d2b 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java @@ -81,7 +81,7 @@ void testGetStorageEngineWithMissingURI() { IllegalArgumentException.class, () -> prometheusStorageFactory.getStorageEngine(properties)); Assertions.assertEquals( - "Missing [prometheus.uri] fields " + "in the Prometheus connector properties.", + "Missing [prometheus.uri] fields " + "in the connector properties.", exception.getMessage()); } @@ -99,7 +99,7 @@ void testGetStorageEngineWithMissingRegionInAWS() { IllegalArgumentException.class, () -> prometheusStorageFactory.getStorageEngine(properties)); Assertions.assertEquals( - "Missing [prometheus.auth.region] fields in the " + "Prometheus connector properties.", + "Missing [prometheus.auth.region] fields in the connector properties.", exception.getMessage()); } @@ -118,7 +118,7 @@ void testGetStorageEngineWithLongConfigProperties() { () -> prometheusStorageFactory.getStorageEngine(properties)); Assertions.assertEquals( "Missing [prometheus.auth.region] fields in the " - + "Prometheus connector properties." + + "connector properties." + "Fields [prometheus.uri] exceeds more than 1000 characters.", exception.getMessage()); }