From 696d4217edb0eeb2139a249257294cb69ba4148e Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Fri, 27 Dec 2024 10:41:46 -0800 Subject: [PATCH] Remove spark-sql-application (#1916) Signed-off-by: Rupal Mahajan --- spark-sql-application/.gitignore | 14 --- spark-sql-application/README.md | 109 ----------------- spark-sql-application/build.sbt | 28 ----- .../project/build.properties | 1 - spark-sql-application/project/plugins.sbt | 6 - spark-sql-application/scalastyle-config.xml | 106 ----------------- .../scala/org/opensearch/sql/SQLJob.scala | 110 ------------------ .../scala/org/opensearch/sql/SQLJobTest.scala | 56 --------- 8 files changed, 430 deletions(-) delete mode 100644 spark-sql-application/.gitignore delete mode 100644 spark-sql-application/README.md delete mode 100644 spark-sql-application/build.sbt delete mode 100644 spark-sql-application/project/build.properties delete mode 100644 spark-sql-application/project/plugins.sbt delete mode 100644 spark-sql-application/scalastyle-config.xml delete mode 100644 spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala delete mode 100644 spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala diff --git a/spark-sql-application/.gitignore b/spark-sql-application/.gitignore deleted file mode 100644 index ec13a702be..0000000000 --- a/spark-sql-application/.gitignore +++ /dev/null @@ -1,14 +0,0 @@ -# Compiled output -target/ -project/target/ - -# sbt-specific files -.sbtserver -.sbt/ -.bsp/ - -# Miscellaneous -.DS_Store -*.class -*.log -*.zip \ No newline at end of file diff --git a/spark-sql-application/README.md b/spark-sql-application/README.md deleted file mode 100644 index 6422f294cd..0000000000 --- a/spark-sql-application/README.md +++ /dev/null @@ -1,109 +0,0 @@ -# Spark SQL Application - -This application execute sql query and store the result in OpenSearch index in following format -``` -"stepId":"", -"applicationId":"" -"schema": "json blob", -"result": "json blob" -``` - -## Prerequisites - -+ Spark 3.3.1 -+ Scala 2.12.15 -+ flint-spark-integration - -## Usage - -To use this application, you can run Spark with Flint extension: - -``` -./bin/spark-submit \ - --class org.opensearch.sql.SQLJob \ - --jars \ - sql-job.jar \ - \ - \ - \ - \ - \ - \ - \ -``` - -## Result Specifications - -Following example shows how the result is written to OpenSearch index after query execution. - -Let's assume sql query result is -``` -+------+------+ -|Letter|Number| -+------+------+ -|A |1 | -|B |2 | -|C |3 | -+------+------+ -``` -OpenSearch index document will look like -```json -{ - "_index" : ".query_execution_result", - "_id" : "A2WOsYgBMUoqCqlDJHrn", - "_score" : 1.0, - "_source" : { - "result" : [ - "{'Letter':'A','Number':1}", - "{'Letter':'B','Number':2}", - "{'Letter':'C','Number':3}" - ], - "schema" : [ - "{'column_name':'Letter','data_type':'string'}", - "{'column_name':'Number','data_type':'integer'}" - ], - "stepId" : "s-JZSB1139WIVU", - "applicationId" : "application_1687726870985_0003" - } -} -``` - -## Build - -To build and run this application with Spark, you can run: - -``` -sbt clean publishLocal -``` - -## Test - -To run tests, you can use: - -``` -sbt test -``` - -## Scalastyle - -To check code with scalastyle, you can run: - -``` -sbt scalastyle -``` - -## Code of Conduct - -This project has adopted an [Open Source Code of Conduct](../CODE_OF_CONDUCT.md). - -## Security - -If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public GitHub issue. - -## License - -See the [LICENSE](../LICENSE.txt) file for our project's licensing. We will ask you to confirm the licensing of your contribution. - -## Copyright - -Copyright OpenSearch Contributors. See [NOTICE](../NOTICE) for details. \ No newline at end of file diff --git a/spark-sql-application/build.sbt b/spark-sql-application/build.sbt deleted file mode 100644 index 79d69a30d1..0000000000 --- a/spark-sql-application/build.sbt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -name := "sql-job" - -version := "1.0" - -scalaVersion := "2.12.15" - -val sparkVersion = "3.3.2" - -mainClass := Some("org.opensearch.sql.SQLJob") - -artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) => - "sql-job.jar" -} - -resolvers ++= Seq( - ("apache-snapshots" at "http://repository.apache.org/snapshots/").withAllowInsecureProtocol(true) -) - -libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-core" % sparkVersion % "provided", - "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", - "org.scalatest" %% "scalatest" % "3.2.15" % Test -) diff --git a/spark-sql-application/project/build.properties b/spark-sql-application/project/build.properties deleted file mode 100644 index 46e43a97ed..0000000000 --- a/spark-sql-application/project/build.properties +++ /dev/null @@ -1 +0,0 @@ -sbt.version=1.8.2 diff --git a/spark-sql-application/project/plugins.sbt b/spark-sql-application/project/plugins.sbt deleted file mode 100644 index 4d14ba6c10..0000000000 --- a/spark-sql-application/project/plugins.sbt +++ /dev/null @@ -1,6 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - - addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0") \ No newline at end of file diff --git a/spark-sql-application/scalastyle-config.xml b/spark-sql-application/scalastyle-config.xml deleted file mode 100644 index 37b1978cd7..0000000000 --- a/spark-sql-application/scalastyle-config.xml +++ /dev/null @@ -1,106 +0,0 @@ - - Scalastyle standard configuration - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala deleted file mode 100644 index 98a3a08134..0000000000 --- a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql - -import org.apache.spark.SparkConf -import org.apache.spark.sql.{DataFrame, SparkSession, Row} -import org.apache.spark.sql.types._ - -/** - * Spark SQL Application entrypoint - * - * @param args(0) - * sql query - * @param args(1) - * opensearch index name - * @param args(2-6) - * opensearch connection values required for flint-integration jar. host, port, scheme, auth, region respectively. - * @return - * write sql query result to given opensearch index - */ -object SQLJob { - def main(args: Array[String]) { - // Get the SQL query and Opensearch Config from the command line arguments - val query = args(0) - val index = args(1) - val host = args(2) - val port = args(3) - val scheme = args(4) - val auth = args(5) - val region = args(6) - - val conf: SparkConf = new SparkConf() - .setAppName("SQLJob") - .set("spark.sql.extensions", "org.opensearch.flint.spark.FlintSparkExtensions") - .set("spark.datasource.flint.host", host) - .set("spark.datasource.flint.port", port) - .set("spark.datasource.flint.scheme", scheme) - .set("spark.datasource.flint.auth", auth) - .set("spark.datasource.flint.region", region) - - // Create a SparkSession - val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() - - try { - // Execute SQL query - val result: DataFrame = spark.sql(query) - - // Get Data - val data = getFormattedData(result, spark) - - // Write data to OpenSearch index - val aos = Map( - "host" -> host, - "port" -> port, - "scheme" -> scheme, - "auth" -> auth, - "region" -> region) - - data.write - .format("flint") - .options(aos) - .mode("append") - .save(index) - - } finally { - // Stop SparkSession - spark.stop() - } - } - - /** - * Create a new formatted dataframe with json result, json schema and EMR_STEP_ID. - * - * @param result - * sql query result dataframe - * @param spark - * spark session - * @return - * dataframe with result, schema and emr step id - */ - def getFormattedData(result: DataFrame, spark: SparkSession): DataFrame = { - // Create the schema dataframe - val schemaRows = result.schema.fields.map { field => - Row(field.name, field.dataType.typeName) - } - val resultSchema = spark.createDataFrame(spark.sparkContext.parallelize(schemaRows), StructType(Seq( - StructField("column_name", StringType, nullable = false), - StructField("data_type", StringType, nullable = false)))) - - // Define the data schema - val schema = StructType(Seq( - StructField("result", ArrayType(StringType, containsNull = true), nullable = true), - StructField("schema", ArrayType(StringType, containsNull = true), nullable = true), - StructField("stepId", StringType, nullable = true), - StructField("applicationId", StringType, nullable = true))) - - // Create the data rows - val rows = Seq(( - result.toJSON.collect.toList.map(_.replaceAll("'", "\\\\'").replaceAll("\"", "'")), - resultSchema.toJSON.collect.toList.map(_.replaceAll("\"", "'")), - sys.env.getOrElse("EMR_STEP_ID", "unknown"), - spark.sparkContext.applicationId)) - - // Create the DataFrame for data - spark.createDataFrame(rows).toDF(schema.fields.map(_.name): _*) - } -} diff --git a/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala b/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala deleted file mode 100644 index 7ec4e45450..0000000000 --- a/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql - -import org.scalatest.funsuite.AnyFunSuite -import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType} - - -class SQLJobTest extends AnyFunSuite{ - - val spark = SparkSession.builder().appName("Test").master("local").getOrCreate() - - // Define input dataframe - val inputSchema = StructType(Seq( - StructField("Letter", StringType, nullable = false), - StructField("Number", IntegerType, nullable = false) - )) - val inputRows = Seq( - Row("A", 1), - Row("B", 2), - Row("C", 3) - ) - val input: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(inputRows), inputSchema) - - test("Test getFormattedData method") { - // Define expected dataframe - val expectedSchema = StructType(Seq( - StructField("result", ArrayType(StringType, containsNull = true), nullable = true), - StructField("schema", ArrayType(StringType, containsNull = true), nullable = true), - StructField("stepId", StringType, nullable = true), - StructField("applicationId", StringType, nullable = true) - )) - val expectedRows = Seq( - Row( - Array("{'Letter':'A','Number':1}","{'Letter':'B','Number':2}", "{'Letter':'C','Number':3}"), - Array("{'column_name':'Letter','data_type':'string'}", "{'column_name':'Number','data_type':'integer'}"), - "unknown", - spark.sparkContext.applicationId - ) - ) - val expected: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema) - - // Compare the result - val result = SQLJob.getFormattedData(input, spark) - assertEqualDataframe(expected, result) - } - - def assertEqualDataframe(expected: DataFrame, result: DataFrame): Unit ={ - assert(expected.schema === result.schema) - assert(expected.collect() === result.collect()) - } -}