From fd5a8793040498deffbcdff22e09c91e828a067e Mon Sep 17 00:00:00 2001 From: Max Lapan Date: Fri, 1 Sep 2023 16:22:18 +0200 Subject: [PATCH] Take parallelism into account, refactor writer code (#204) * Rearrange the connection order * Intermediate state, still failing in local[1] case * Logic works, need to cleanup * Cleanup * Handle error in main connection creation * Update version and changelog * Add project-keeper plugin * Revert "Add project-keeper plugin" This reverts commit 98e668582cf7ac493c6f979773a3e895e8a6b2cc. * Upgrade pk in workflow * Unit test on main connection exception * Unit test on rollback of transaction * Useless catch * Fix unused import * Fix grammar * Rethrow exception * Refactor sqlContext creation --------- Co-authored-by: Christoph Pirkl --- .github/workflows/pk-verify.yml | 6 +- doc/changes/changelog.md | 1 + doc/changes/changes_2.1.2.md | 18 ++++++ .../com/exasol/spark/DefaultSource.scala | 40 ++++++++++-- .../scala/com/exasol/spark/ExasolWriter.scala | 57 ++--------------- .../com/exasol/spark/rdd/ExasolRDD.scala | 2 +- .../spark/util/ExasolConnectionManager.scala | 4 +- .../com/exasol/spark/DefaultSourceSuite.scala | 61 ++++++++++++++++--- parent-pom/pk_generated_parent.pom | 3 +- parent-pom/pom.xml | 2 +- 10 files changed, 117 insertions(+), 77 deletions(-) create mode 100644 doc/changes/changes_2.1.2.md diff --git a/.github/workflows/pk-verify.yml b/.github/workflows/pk-verify.yml index 2ee0d3b6..5f2d1e04 100644 --- a/.github/workflows/pk-verify.yml +++ b/.github/workflows/pk-verify.yml @@ -22,9 +22,9 @@ jobs: - name: Set up JDK 11 uses: actions/setup-java@v3 with: - distribution: 'temurin' + distribution: "temurin" java-version: 11 - cache: 'maven' + cache: "maven" - name: Cache SonarCloud packages uses: actions/cache@v3 with: @@ -32,4 +32,4 @@ jobs: key: ${{ runner.os }}-sonar restore-keys: ${{ runner.os }}-sonar - name: Run Project Keeper Separately - run: mvn --batch-mode -DtrimStackTrace=false com.exasol:project-keeper-maven-plugin:2.9.9:verify --projects . + run: mvn --batch-mode -DtrimStackTrace=false com.exasol:project-keeper-maven-plugin:2.9.11:verify --projects . diff --git a/doc/changes/changelog.md b/doc/changes/changelog.md index cd32feb3..8d25b8d7 100644 --- a/doc/changes/changelog.md +++ b/doc/changes/changelog.md @@ -1,5 +1,6 @@ # Changes +* [2.1.2](changes_2.1.2.md) * [2.1.1](changes_2.1.1.md) * [2.1.0](changes_2.1.0.md) * [2.0.0](changes_2.0.0.md) diff --git a/doc/changes/changes_2.1.2.md b/doc/changes/changes_2.1.2.md new file mode 100644 index 00000000..a7bcbfae --- /dev/null +++ b/doc/changes/changes_2.1.2.md @@ -0,0 +1,18 @@ +# Spark Connector 2.1.2, released 2023-??-?? + +Code name: + +## Summary + +## Features + +* 194: Wrong name of Exasol JDBC format in documentation +* 197: Committing transaction in the finally handler + +## Dependency Updates + +### Spark Exasol Connector Parent POM + +#### Plugin Dependency Updates + +* Updated `org.apache.maven.plugins:maven-enforcer-plugin:3.3.0` to `3.4.0` diff --git a/exasol-jdbc/src/main/scala/com/exasol/spark/DefaultSource.scala b/exasol-jdbc/src/main/scala/com/exasol/spark/DefaultSource.scala index 214e575e..7693dbec 100644 --- a/exasol-jdbc/src/main/scala/com/exasol/spark/DefaultSource.scala +++ b/exasol-jdbc/src/main/scala/com/exasol/spark/DefaultSource.scala @@ -4,7 +4,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap - import com.exasol.errorreporting.ExaError import com.exasol.spark.common.ExasolOptions import com.exasol.spark.util.Constants._ @@ -15,6 +14,7 @@ import com.exasol.spark.writer.ExasolWriter import com.exasol.sql.StatementFactory import com.exasol.sql.dql.select.rendering.SelectRenderer import com.exasol.sql.rendering.StringRendererConfig +import org.apache.spark.internal.Logging /** * The default entry source for creating integration between Exasol and Spark. @@ -26,7 +26,8 @@ class DefaultSource extends RelationProvider with DataSourceRegister with SchemaRelationProvider - with CreatableRelationProvider { + with CreatableRelationProvider + with Logging { override def shortName(): String = "exasol" @@ -154,11 +155,38 @@ class DefaultSource options: ExasolOptions, manager: ExasolConnectionManager ): Unit = { - val writer = new ExasolWriter(sqlContext.sparkContext, tableName, df.schema, options, manager) - val exaNodesCnt = writer.startParallel() - val newDF = repartitionPerNode(df, exaNodesCnt) + val maxParallel = sqlContext.sparkContext.defaultParallelism + logInfo(s"saveDataFrame, maxParallellelism=$maxParallel") + val mainConnection = manager.writerMainConnection() + + if (mainConnection == null) { + throw new RuntimeException( + ExaError + .messageBuilder("F-SEC-7") + .message("Could not create main JDBC connection to Exasol cluster.") + .mitigation("Please make sure that there is a network connection between Spark and Exasol clusters.") + .toString() + ) + } - newDF.rdd.foreachPartition(iter => writer.insertPartition(iter)) + try { + val exaNodesCnt = manager.initParallel(mainConnection, maxParallel) + val hosts = manager.subConnections(mainConnection) + val newDF = repartitionPerNode(df, exaNodesCnt) + val writer = new ExasolWriter(sqlContext.sparkContext, tableName, df.schema, options, hosts, manager) + + logInfo(s"save with nodes=$exaNodesCnt") + newDF.rdd.foreachPartition(iter => writer.insertPartition(iter)) + mainConnection.commit() + } catch { + case ex: Exception => { + logError("Exception during writing, roll back transaction", ex) + mainConnection.rollback() + throw ex + } + } finally { + mainConnection.close() + } } // Creates an Exasol table that match Spark dataframe diff --git a/exasol-jdbc/src/main/scala/com/exasol/spark/ExasolWriter.scala b/exasol-jdbc/src/main/scala/com/exasol/spark/ExasolWriter.scala index b0d4ecf1..00dd921c 100644 --- a/exasol-jdbc/src/main/scala/com/exasol/spark/ExasolWriter.scala +++ b/exasol-jdbc/src/main/scala/com/exasol/spark/ExasolWriter.scala @@ -1,16 +1,9 @@ package com.exasol.spark.writer -import java.sql.SQLException - import org.apache.spark.SparkContext import org.apache.spark.TaskContext -import org.apache.spark.scheduler.SparkListener -import org.apache.spark.scheduler.SparkListenerApplicationEnd import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType - -import com.exasol.errorreporting.ExaError -import com.exasol.jdbc.EXAConnection import com.exasol.spark.common.ExasolOptions import com.exasol.spark.util.Constants._ import com.exasol.spark.util.Converter @@ -24,45 +17,9 @@ class ExasolWriter( tableName: String, rddSchema: StructType, options: ExasolOptions, + hosts: Seq[String], manager: ExasolConnectionManager ) extends Serializable { - - @transient private var mainConnection: EXAConnection = null - private var hosts: Seq[String] = null - - def closeMainResources(): Unit = - if (mainConnection != null && !mainConnection.isClosed) { - mainConnection.close() - } - - def startParallel(): Int = { - mainConnection = manager.writerMainConnection() - - if (mainConnection == null) { - throw new RuntimeException( - ExaError - .messageBuilder("F-SEC-7") - .message("Could not create main JDBC connection to Exasol cluster.") - .mitigation("Please make sure that there network connection between Spark and Exasol clusters.") - .toString() - ) - } - - val cnt = manager.initParallel(mainConnection) - - // Close Exasol main connection when SparkContext finishes. This is a lifetime of a Spark - // application. - sc.addSparkListener(new SparkListener { - override def onApplicationEnd(appEnd: SparkListenerApplicationEnd): Unit = - closeMainResources() - }) - - // Populate hosts - hosts = manager.subConnections(mainConnection) - - cnt - } - def insertStmt(): String = { val columns = rddSchema.fields.map(_.name).mkString(",") val placeholders = rddSchema.fields.map(_ => "?").mkString(",") @@ -72,9 +29,8 @@ class ExasolWriter( def insertPartition(iter: Iterator[Row]): Unit = { val partitionId = TaskContext.getPartitionId() val subConnectionUrl = hosts(partitionId) - val subConn = manager.subConnection(subConnectionUrl) - - val stmt = subConn.prepareStatement(insertStmt()) + val subConnection = manager.subConnection(subConnectionUrl) + val stmt = subConnection.prepareStatement(insertStmt()) val setters = rddSchema.fields.map(f => Converter.makeSetter(f.dataType)) val nullTypes = rddSchema.fields.map(f => Types.jdbcTypeFromSparkDataType(f.dataType)) @@ -109,15 +65,10 @@ class ExasolWriter( val _ = stmt.executeBatch() totalCnt += rowCnt } - () - } catch { - case ex: SQLException => - throw ex } finally { stmt.close() - subConn.commit() - subConn.close() + subConnection.close() } } diff --git a/exasol-jdbc/src/main/scala/com/exasol/spark/rdd/ExasolRDD.scala b/exasol-jdbc/src/main/scala/com/exasol/spark/rdd/ExasolRDD.scala index 0cac9b8e..b77466eb 100644 --- a/exasol-jdbc/src/main/scala/com/exasol/spark/rdd/ExasolRDD.scala +++ b/exasol-jdbc/src/main/scala/com/exasol/spark/rdd/ExasolRDD.scala @@ -64,7 +64,7 @@ class ExasolRDD( ) } - val cnt = manager.initParallel(conn) + val cnt = manager.initParallel(conn, sc.defaultParallelism) logInfo(s"Initiated $cnt parallel exasol (sub) connections") // Close Exasol main connection when SparkContext finishes. This is a diff --git a/exasol-jdbc/src/main/scala/com/exasol/spark/util/ExasolConnectionManager.scala b/exasol-jdbc/src/main/scala/com/exasol/spark/util/ExasolConnectionManager.scala index 5c181232..804ebef8 100644 --- a/exasol-jdbc/src/main/scala/com/exasol/spark/util/ExasolConnectionManager.scala +++ b/exasol-jdbc/src/main/scala/com/exasol/spark/util/ExasolConnectionManager.scala @@ -61,9 +61,9 @@ final case class ExasolConnectionManager(options: ExasolOptions) { * @param mainConnection the main connection * @return the number of parallel connections */ - def initParallel(mainConnection: EXAConnection): Int = { + def initParallel(mainConnection: EXAConnection, maxParallelism: Int): Int = { val max_nodes = if (options.containsKey(MAX_NODES)) options.get(MAX_NODES).toInt else DEFAULT_MAX_NODES - mainConnection.EnterParallel(max_nodes) + mainConnection.EnterParallel(max_nodes.min(maxParallelism)) } /** diff --git a/exasol-jdbc/src/test/scala/com/exasol/spark/DefaultSourceSuite.scala b/exasol-jdbc/src/test/scala/com/exasol/spark/DefaultSourceSuite.scala index b8db8014..b8e30caf 100644 --- a/exasol-jdbc/src/test/scala/com/exasol/spark/DefaultSourceSuite.scala +++ b/exasol-jdbc/src/test/scala/com/exasol/spark/DefaultSourceSuite.scala @@ -1,23 +1,32 @@ package com.exasol.spark +import com.exasol.jdbc.EXAConnection import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SaveMode - -import com.exasol.spark.common.ExasolValidationException - -import org.mockito.Mockito.when +import com.exasol.spark.common.{ExasolOptions, ExasolValidationException} +import com.exasol.spark.util.ExasolConnectionManager +import org.apache.spark.SparkContext +import org.mockito.Mockito.{times, verify, when} +import org.scalatest.PrivateMethodTester import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar -class DefaultSourceSuite extends AnyFunSuite with Matchers with MockitoSugar { - - test("when reading should throw an Exception if no `query` parameter is provided") { +class DefaultSourceSuite extends AnyFunSuite with Matchers with MockitoSugar with PrivateMethodTester { + def mockedSqlContext(parallelism: Int = 1): SQLContext = { val sqlContext = mock[SQLContext] + val sparkContext = mock[SparkContext] + when(sqlContext.sparkContext).thenReturn(sparkContext) + when(sparkContext.defaultParallelism).thenReturn(parallelism) when(sqlContext.getAllConfs).thenReturn(Map.empty[String, String]) + sqlContext + } + + test("when reading should throw an Exception if no `query` parameter is provided") { + val sqlContext = mockedSqlContext() val thrown = intercept[ExasolValidationException] { new DefaultSource().createRelation(sqlContext, Map[String, String]()) } @@ -37,8 +46,7 @@ class DefaultSourceSuite extends AnyFunSuite with Matchers with MockitoSugar { test("when saving should throw an Exception if no `table` parameter is provided") { val df = mock[DataFrame] - val sqlContext = mock[SQLContext] - when(sqlContext.getAllConfs).thenReturn(Map.empty[String, String]) + val sqlContext = mockedSqlContext() val thrown = intercept[ExasolValidationException] { new DefaultSource().createRelation(sqlContext, SaveMode.Append, Map[String, String](), df) } @@ -96,4 +104,39 @@ class DefaultSourceSuite extends AnyFunSuite with Matchers with MockitoSugar { // should not contains irrelevant options for exasol assert(!newConf.contains("spark.other.options") && !newConf.contains("options")) } + + test("`saveDataFrame` should throw exception on null main connection") { + val sqlContext = mockedSqlContext() + val manager = mock[ExasolConnectionManager] + when(manager.writerMainConnection()).thenReturn(null) + + val df = mock[DataFrame] + val options = mock[ExasolOptions] + + val saveDataFrame = PrivateMethod[Unit](Symbol("saveDataFrame")) + + val thrown = intercept[RuntimeException] { + (new DefaultSource()).invokePrivate(saveDataFrame(sqlContext, df, "TEST", options, manager)) + } + assert(thrown.getMessage().startsWith("F-SEC-7")) + } + + test("`saveDataFrame` rolls back transaction on exception") { + val sqlContext = mockedSqlContext(2) + val manager = mock[ExasolConnectionManager] + val exaConn = mock[EXAConnection] + when(manager.writerMainConnection()).thenReturn(exaConn) + when(manager.initParallel(exaConn, 2)).thenThrow(new RuntimeException()) + + val df = mock[DataFrame] + val options = mock[ExasolOptions] + + val saveDataFrame = PrivateMethod[Unit](Symbol("saveDataFrame")) + + intercept[RuntimeException] { + (new DefaultSource()).invokePrivate(saveDataFrame(sqlContext, df, "TEST", options, manager)) + } + verify(exaConn, times(1)).rollback() + verify(exaConn, times(0)).commit() + } } diff --git a/parent-pom/pk_generated_parent.pom b/parent-pom/pk_generated_parent.pom index 499591bc..481a18c9 100644 --- a/parent-pom/pk_generated_parent.pom +++ b/parent-pom/pk_generated_parent.pom @@ -51,7 +51,7 @@ org.apache.maven.plugins maven-enforcer-plugin - 3.3.0 + 3.4.0 enforce-maven @@ -157,7 +157,6 @@ true true false - true true false diff --git a/parent-pom/pom.xml b/parent-pom/pom.xml index 0c4bdca6..e73707eb 100644 --- a/parent-pom/pom.xml +++ b/parent-pom/pom.xml @@ -15,7 +15,7 @@ pk_generated_parent.pom - 2.1.1 + 2.1.2 8 2.20.0 5.10.0