diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index 3e5d07a65b..f9f0b8ed8d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -45,8 +45,7 @@ private Builder() { config.put( HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY, DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); - config.put(SPARK_JARS_KEY, GLUE_CATALOG_HIVE_JAR + "," + FLINT_CATALOG_JAR); - config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE); + config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE); config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY); config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); @@ -81,6 +80,7 @@ public Builder dataSource(DataSourceMetadata metadata) { () -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME), () -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD), () -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION)); + config.put("spark.flint.datasource.name", metadata.getName()); return this; } throw new UnsupportedOperationException( diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 3a0ad8f4a9..326acee529 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -17,9 +17,9 @@ public class SparkConstants { public static final String ERROR_FIELD = "error"; - // TODO should be replaced with mvn jar. + // EMR-S will download JAR to local maven public static final String SPARK_SQL_APPLICATION_JAR = - "s3://flint-data-dp-eu-west-1-beta/code/flint/sql-job.jar"; + "file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.1.0-SNAPSHOT.jar"; public static final String SPARK_RESPONSE_BUFFER_INDEX_NAME = ".query_execution_result"; // TODO should be replaced with mvn jar. public static final String FLINT_INTEGRATION_JAR = @@ -72,11 +72,14 @@ public class SparkConstants { "com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory"; public static final String SPARK_STANDALONE_PACKAGE = "org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT"; + public static final String SPARK_LAUNCHER_PACKAGE = + "org.opensearch:opensearch-spark-sql-application_2.12:0.1.0-SNAPSHOT"; public static final String AWS_SNAPSHOT_REPOSITORY = "https://aws.oss.sonatype.org/content/repositories/snapshots"; public static final String GLUE_HIVE_CATALOG_FACTORY_CLASS = "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"; - public static final String FLINT_DELEGATE_CATALOG = "org.opensearch.sql.FlintDelegateCatalog"; + public static final String FLINT_DELEGATE_CATALOG = + "org.opensearch.sql.FlintDelegatingSessionCatalog"; public static final String FLINT_SQL_EXTENSION = "org.opensearch.flint.spark.FlintSparkExtensions"; public static final String EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER = diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 48f3b89ca1..4c04381f36 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -774,9 +774,7 @@ private String constructExpectedSparkSubmitParameterString( + " --conf" + " spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory" + " --conf" - + " spark.jars=s3://flint-data-dp-eu-west-1-beta/code/flint/AWSGlueDataCatalogHiveMetaStoreAuth-1.0.jar,s3://flint-data-dp-eu-west-1-beta/code/flint/flint-catalog.jar" - + " --conf" - + " spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT" + + " spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.1.0-SNAPSHOT" + " --conf" + " spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots" + " --conf" @@ -797,8 +795,9 @@ private String constructExpectedSparkSubmitParameterString( + " spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" + " --conf" + " spark.hive.metastore.glue.role.arn=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" - + " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegateCatalog " - + authParamConfigBuilder; + + " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegatingSessionCatalog " + + authParamConfigBuilder + + " --conf spark.flint.datasource.name=my_glue "; } private String withStructuredStreaming(String parameters) {