Skip to content

Commit

Permalink
Take parallelism into account, refactor writer code (#204)
Browse files Browse the repository at this point in the history
* Rearrange the connection order

* Intermediate state, still failing in local[1] case

* Logic works, need to cleanup

* Cleanup

* Handle error in main connection creation

* Update version and changelog

* Add project-keeper plugin

* Revert "Add project-keeper plugin"

This reverts commit 98e6685.

* Upgrade pk in workflow

* Unit test on main connection exception

* Unit test on rollback of transaction

* Useless catch

* Fix unused import

* Fix grammar

* Rethrow exception

* Refactor sqlContext creation

---------

Co-authored-by: Christoph Pirkl <[email protected]>
  • Loading branch information
Shmuma and kaklakariada authored Sep 1, 2023
1 parent 881f5a8 commit fd5a879
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 77 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/pk-verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ jobs:
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
distribution: 'temurin'
distribution: "temurin"
java-version: 11
cache: 'maven'
cache: "maven"
- name: Cache SonarCloud packages
uses: actions/cache@v3
with:
path: ~/.sonar/cache
key: ${{ runner.os }}-sonar
restore-keys: ${{ runner.os }}-sonar
- name: Run Project Keeper Separately
run: mvn --batch-mode -DtrimStackTrace=false com.exasol:project-keeper-maven-plugin:2.9.9:verify --projects .
run: mvn --batch-mode -DtrimStackTrace=false com.exasol:project-keeper-maven-plugin:2.9.11:verify --projects .
1 change: 1 addition & 0 deletions doc/changes/changelog.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions doc/changes/changes_2.1.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Spark Connector 2.1.2, released 2023-??-??

Code name:

## Summary

## Features

* 194: Wrong name of Exasol JDBC format in documentation
* 197: Committing transaction in the finally handler

## Dependency Updates

### Spark Exasol Connector Parent POM

#### Plugin Dependency Updates

* Updated `org.apache.maven.plugins:maven-enforcer-plugin:3.3.0` to `3.4.0`
40 changes: 34 additions & 6 deletions exasol-jdbc/src/main/scala/com/exasol/spark/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import org.apache.spark.sql._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import com.exasol.errorreporting.ExaError
import com.exasol.spark.common.ExasolOptions
import com.exasol.spark.util.Constants._
Expand All @@ -15,6 +14,7 @@ import com.exasol.spark.writer.ExasolWriter
import com.exasol.sql.StatementFactory
import com.exasol.sql.dql.select.rendering.SelectRenderer
import com.exasol.sql.rendering.StringRendererConfig
import org.apache.spark.internal.Logging

/**
* The default entry source for creating integration between Exasol and Spark.
Expand All @@ -26,7 +26,8 @@ class DefaultSource
extends RelationProvider
with DataSourceRegister
with SchemaRelationProvider
with CreatableRelationProvider {
with CreatableRelationProvider
with Logging {

override def shortName(): String = "exasol"

Expand Down Expand Up @@ -154,11 +155,38 @@ class DefaultSource
options: ExasolOptions,
manager: ExasolConnectionManager
): Unit = {
val writer = new ExasolWriter(sqlContext.sparkContext, tableName, df.schema, options, manager)
val exaNodesCnt = writer.startParallel()
val newDF = repartitionPerNode(df, exaNodesCnt)
val maxParallel = sqlContext.sparkContext.defaultParallelism
logInfo(s"saveDataFrame, maxParallellelism=$maxParallel")
val mainConnection = manager.writerMainConnection()

if (mainConnection == null) {
throw new RuntimeException(
ExaError
.messageBuilder("F-SEC-7")
.message("Could not create main JDBC connection to Exasol cluster.")
.mitigation("Please make sure that there is a network connection between Spark and Exasol clusters.")
.toString()
)
}

newDF.rdd.foreachPartition(iter => writer.insertPartition(iter))
try {
val exaNodesCnt = manager.initParallel(mainConnection, maxParallel)
val hosts = manager.subConnections(mainConnection)
val newDF = repartitionPerNode(df, exaNodesCnt)
val writer = new ExasolWriter(sqlContext.sparkContext, tableName, df.schema, options, hosts, manager)

logInfo(s"save with nodes=$exaNodesCnt")
newDF.rdd.foreachPartition(iter => writer.insertPartition(iter))
mainConnection.commit()
} catch {
case ex: Exception => {
logError("Exception during writing, roll back transaction", ex)
mainConnection.rollback()
throw ex
}
} finally {
mainConnection.close()
}
}

// Creates an Exasol table that match Spark dataframe
Expand Down
57 changes: 4 additions & 53 deletions exasol-jdbc/src/main/scala/com/exasol/spark/ExasolWriter.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
package com.exasol.spark.writer

import java.sql.SQLException

import org.apache.spark.SparkContext
import org.apache.spark.TaskContext
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.scheduler.SparkListenerApplicationEnd
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType

import com.exasol.errorreporting.ExaError
import com.exasol.jdbc.EXAConnection
import com.exasol.spark.common.ExasolOptions
import com.exasol.spark.util.Constants._
import com.exasol.spark.util.Converter
Expand All @@ -24,45 +17,9 @@ class ExasolWriter(
tableName: String,
rddSchema: StructType,
options: ExasolOptions,
hosts: Seq[String],
manager: ExasolConnectionManager
) extends Serializable {

@transient private var mainConnection: EXAConnection = null
private var hosts: Seq[String] = null

def closeMainResources(): Unit =
if (mainConnection != null && !mainConnection.isClosed) {
mainConnection.close()
}

def startParallel(): Int = {
mainConnection = manager.writerMainConnection()

if (mainConnection == null) {
throw new RuntimeException(
ExaError
.messageBuilder("F-SEC-7")
.message("Could not create main JDBC connection to Exasol cluster.")
.mitigation("Please make sure that there network connection between Spark and Exasol clusters.")
.toString()
)
}

val cnt = manager.initParallel(mainConnection)

// Close Exasol main connection when SparkContext finishes. This is a lifetime of a Spark
// application.
sc.addSparkListener(new SparkListener {
override def onApplicationEnd(appEnd: SparkListenerApplicationEnd): Unit =
closeMainResources()
})

// Populate hosts
hosts = manager.subConnections(mainConnection)

cnt
}

def insertStmt(): String = {
val columns = rddSchema.fields.map(_.name).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
Expand All @@ -72,9 +29,8 @@ class ExasolWriter(
def insertPartition(iter: Iterator[Row]): Unit = {
val partitionId = TaskContext.getPartitionId()
val subConnectionUrl = hosts(partitionId)
val subConn = manager.subConnection(subConnectionUrl)

val stmt = subConn.prepareStatement(insertStmt())
val subConnection = manager.subConnection(subConnectionUrl)
val stmt = subConnection.prepareStatement(insertStmt())

val setters = rddSchema.fields.map(f => Converter.makeSetter(f.dataType))
val nullTypes = rddSchema.fields.map(f => Types.jdbcTypeFromSparkDataType(f.dataType))
Expand Down Expand Up @@ -109,15 +65,10 @@ class ExasolWriter(
val _ = stmt.executeBatch()
totalCnt += rowCnt
}

()
} catch {
case ex: SQLException =>
throw ex
} finally {
stmt.close()
subConn.commit()
subConn.close()
subConnection.close()
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ExasolRDD(
)
}

val cnt = manager.initParallel(conn)
val cnt = manager.initParallel(conn, sc.defaultParallelism)
logInfo(s"Initiated $cnt parallel exasol (sub) connections")

// Close Exasol main connection when SparkContext finishes. This is a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ final case class ExasolConnectionManager(options: ExasolOptions) {
* @param mainConnection the main connection
* @return the number of parallel connections
*/
def initParallel(mainConnection: EXAConnection): Int = {
def initParallel(mainConnection: EXAConnection, maxParallelism: Int): Int = {
val max_nodes = if (options.containsKey(MAX_NODES)) options.get(MAX_NODES).toInt else DEFAULT_MAX_NODES
mainConnection.EnterParallel(max_nodes)
mainConnection.EnterParallel(max_nodes.min(maxParallelism))
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
package com.exasol.spark

import com.exasol.jdbc.EXAConnection
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode

import com.exasol.spark.common.ExasolValidationException

import org.mockito.Mockito.when
import com.exasol.spark.common.{ExasolOptions, ExasolValidationException}
import com.exasol.spark.util.ExasolConnectionManager
import org.apache.spark.SparkContext
import org.mockito.Mockito.{times, verify, when}
import org.scalatest.PrivateMethodTester
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar

class DefaultSourceSuite extends AnyFunSuite with Matchers with MockitoSugar {

test("when reading should throw an Exception if no `query` parameter is provided") {
class DefaultSourceSuite extends AnyFunSuite with Matchers with MockitoSugar with PrivateMethodTester {
def mockedSqlContext(parallelism: Int = 1): SQLContext = {
val sqlContext = mock[SQLContext]
val sparkContext = mock[SparkContext]
when(sqlContext.sparkContext).thenReturn(sparkContext)
when(sparkContext.defaultParallelism).thenReturn(parallelism)
when(sqlContext.getAllConfs).thenReturn(Map.empty[String, String])
sqlContext
}

test("when reading should throw an Exception if no `query` parameter is provided") {
val sqlContext = mockedSqlContext()
val thrown = intercept[ExasolValidationException] {
new DefaultSource().createRelation(sqlContext, Map[String, String]())
}
Expand All @@ -37,8 +46,7 @@ class DefaultSourceSuite extends AnyFunSuite with Matchers with MockitoSugar {

test("when saving should throw an Exception if no `table` parameter is provided") {
val df = mock[DataFrame]
val sqlContext = mock[SQLContext]
when(sqlContext.getAllConfs).thenReturn(Map.empty[String, String])
val sqlContext = mockedSqlContext()
val thrown = intercept[ExasolValidationException] {
new DefaultSource().createRelation(sqlContext, SaveMode.Append, Map[String, String](), df)
}
Expand Down Expand Up @@ -96,4 +104,39 @@ class DefaultSourceSuite extends AnyFunSuite with Matchers with MockitoSugar {
// should not contains irrelevant options for exasol
assert(!newConf.contains("spark.other.options") && !newConf.contains("options"))
}

test("`saveDataFrame` should throw exception on null main connection") {
val sqlContext = mockedSqlContext()
val manager = mock[ExasolConnectionManager]
when(manager.writerMainConnection()).thenReturn(null)

val df = mock[DataFrame]
val options = mock[ExasolOptions]

val saveDataFrame = PrivateMethod[Unit](Symbol("saveDataFrame"))

val thrown = intercept[RuntimeException] {
(new DefaultSource()).invokePrivate(saveDataFrame(sqlContext, df, "TEST", options, manager))
}
assert(thrown.getMessage().startsWith("F-SEC-7"))
}

test("`saveDataFrame` rolls back transaction on exception") {
val sqlContext = mockedSqlContext(2)
val manager = mock[ExasolConnectionManager]
val exaConn = mock[EXAConnection]
when(manager.writerMainConnection()).thenReturn(exaConn)
when(manager.initParallel(exaConn, 2)).thenThrow(new RuntimeException())

val df = mock[DataFrame]
val options = mock[ExasolOptions]

val saveDataFrame = PrivateMethod[Unit](Symbol("saveDataFrame"))

intercept[RuntimeException] {
(new DefaultSource()).invokePrivate(saveDataFrame(sqlContext, df, "TEST", options, manager))
}
verify(exaConn, times(1)).rollback()
verify(exaConn, times(0)).commit()
}
}
3 changes: 1 addition & 2 deletions parent-pom/pk_generated_parent.pom

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion parent-pom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<relativePath>pk_generated_parent.pom</relativePath>
</parent>
<properties>
<revision>2.1.1</revision>
<revision>2.1.2</revision>
<java.version>8</java.version>
<log4j.version>2.20.0</log4j.version>
<junit.version>5.10.0</junit.version>
Expand Down

0 comments on commit fd5a879

Please sign in to comment.