Skip to content

Commit

Permalink
align with latest spark-sql application
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Oct 5, 2023
1 parent 9df968a commit 3397e4a
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER;
import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CATALOG_JAR;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CREDENTIALS_PROVIDER_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_AUTH;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_HOST;
Expand All @@ -45,6 +44,7 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_LAUNCHER_PACKAGE;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_STANDALONE_PACKAGE;

Expand Down Expand Up @@ -80,8 +80,8 @@ private Builder() {
config.put(
HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY,
DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY);
config.put(SPARK_JARS_KEY, GLUE_CATALOG_HIVE_JAR + "," + FLINT_CATALOG_JAR);
config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE);
config.put(SPARK_JARS_KEY, GLUE_CATALOG_HIVE_JAR);
config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE);
config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY);
config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
Expand Down Expand Up @@ -115,6 +115,7 @@ public Builder dataSource(DataSourceMetadata metadata) {
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION));
config.put("spark.flint.datasource.name", metadata.getName());
return this;
}
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class SparkConstants {
public static final String STEP_ID_FIELD = "stepId.keyword";
// TODO should be replaced with mvn jar.
public static final String SPARK_SQL_APPLICATION_JAR =
"s3://flint-data-dp-eu-west-1-beta/code/flint/sql-job.jar";
"s3://flint-data-dp-eu-west-1-beta/code/flint/opensearch-spark-sql-application_2.12-0.1.0-SNAPSHOT.jar";
public static final String SPARK_RESPONSE_BUFFER_INDEX_NAME = ".query_execution_result";
// TODO should be replaced with mvn jar.
public static final String FLINT_INTEGRATION_JAR =
Expand All @@ -26,7 +26,7 @@ public class SparkConstants {
public static final String FLINT_DEFAULT_SCHEME = "http";
public static final String FLINT_DEFAULT_AUTH = "noauth";
public static final String FLINT_DEFAULT_REGION = "us-west-2";
public static final String DEFAULT_CLASS_NAME = "org.opensearch.sql.FlintJob";
public static final String DEFAULT_CLASS_NAME = "org.apache.spark.sql.FlintJob";
public static final String S3_AWS_CREDENTIALS_PROVIDER_KEY =
"spark.hadoop.fs.s3.customAWSCredentialsProvider";
public static final String DRIVER_ENV_ASSUME_ROLE_ARN_KEY =
Expand Down Expand Up @@ -62,11 +62,14 @@ public class SparkConstants {
"com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory";
public static final String SPARK_STANDALONE_PACKAGE =
"org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT";
public static final String SPARK_LAUNCHER_PACKAGE =
"org.opensearch:opensearch-spark-sql-application_2.12:0.1.0-SNAPSHOT";
public static final String AWS_SNAPSHOT_REPOSITORY =
"https://aws.oss.sonatype.org/content/repositories/snapshots";
public static final String GLUE_HIVE_CATALOG_FACTORY_CLASS =
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory";
public static final String FLINT_DELEGATE_CATALOG = "org.opensearch.sql.FlintDelegateCatalog";
public static final String FLINT_DELEGATE_CATALOG =
"org.opensearch.sql.FlintDelegatingSessionCatalog";
public static final String FLINT_SQL_EXTENSION =
"org.opensearch.flint.spark.FlintSparkExtensions";
public static final String EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.sql.spark.response;

import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STEP_ID_FIELD;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -34,7 +33,7 @@ public JobExecutionResponseReader(Client client) {
}

public JSONObject getResultFromOpensearchIndex(String jobId) {
return searchInSparkIndex(QueryBuilders.termQuery(STEP_ID_FIELD, jobId));
return searchInSparkIndex(QueryBuilders.termQuery("jobRunId", jobId));
}

private JSONObject searchInSparkIndex(QueryBuilder query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,14 +597,14 @@ private String constructExpectedSparkSubmitParameterString(
authParamConfigBuilder.append(authParams.get(key));
authParamConfigBuilder.append(" ");
}
return " --class org.opensearch.sql.FlintJob --conf"
return " --class org.apache.spark.sql.FlintJob --conf"
+ " spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider"
+ " --conf"
+ " spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory"
+ " --conf"
+ " spark.jars=s3://flint-data-dp-eu-west-1-beta/code/flint/AWSGlueDataCatalogHiveMetaStoreAuth-1.0.jar,s3://flint-data-dp-eu-west-1-beta/code/flint/flint-catalog.jar"
+ " spark.jars=s3://flint-data-dp-eu-west-1-beta/code/flint/AWSGlueDataCatalogHiveMetaStoreAuth-1.0.jar"
+ " --conf"
+ " spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT"
+ " spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.1.0-SNAPSHOT"
+ " --conf"
+ " spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots"
+ " --conf"
Expand All @@ -625,8 +625,9 @@ private String constructExpectedSparkSubmitParameterString(
+ " spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"
+ " --conf"
+ " spark.hive.metastore.glue.role.arn=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"
+ " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegateCatalog "
+ authParamConfigBuilder;
+ " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegatingSessionCatalog "
+ authParamConfigBuilder
+ " --conf spark.flint.datasource.name=my_glue ";
}

private String withStructuredStreaming(String parameters) {
Expand Down

0 comments on commit 3397e4a

Please sign in to comment.