Skip to content

Commit

Permalink
Merge pull request #48 from tarantool/spark-3.x
Browse files Browse the repository at this point in the history
Add support for Apache Spark 3.x
  • Loading branch information
akudiyar authored Apr 10, 2023
2 parents 5d92ed9 + 8d54d91 commit 4910236
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 76 deletions.
29 changes: 17 additions & 12 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ jobs:
strategy:
matrix:
include:
- scala: 2.11.12
tarantool: 2.8
router_port: 3301
router_api_port: 8081
- scala: 2.12.14
tarantool: 2.8
router_port: 3331
router_api_port: 8381
- scala: "2.11.12"
tarantool: "2.8"
router_port: "3301"
router_api_port: "8081"
- scala: "2.12.16"
tarantool: "2.8"
router_port: "3331"
router_api_port: "8381"
- scala: "2.13.10"
tarantool: "2.8"
router_port: "3361"
router_api_port: "8681"
steps:
- uses: actions/checkout@v2
- name: Setup Scala
Expand Down Expand Up @@ -58,7 +62,7 @@ jobs:
TARANTOOL_TOPOLOGY_FILE: 'cartridge/topology_${{ matrix.scala }}_${{ matrix.tarantool }}.lua'
TARANTOOL_ROUTER_PORT: ${{ matrix.router_port }}
TARANTOOL_ROUTER_API_PORT: ${{ matrix.router_api_port }}
run: sbt ++${{ matrix.scala }} coverageOn test coverageReport coverageOff
run: sbt ++${{ matrix.scala }}! test
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
Expand All @@ -69,8 +73,9 @@ jobs:
strategy:
matrix:
scala:
- 2.11.12
- 2.12.14
- "2.11.12"
- "2.12.16"
- "2.13.10"
steps:
- uses: actions/checkout@v2
- name: Setup Scala
Expand All @@ -85,4 +90,4 @@ jobs:
~/.sbt
key: ${{ runner.os }}-sbt-${{ matrix.scala }}-${{ hashFiles('**/build.sbt') }}
- name: Check formatting
run: sbt ++${{ matrix.scala }} scalafmtSbtCheck scalafmtCheck test:scalafmtCheck
run: sbt ++${{ matrix.scala }}! scalafmtSbtCheck scalafmtCheck test:scalafmtCheck
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ continuationIndent {
assumeStandardLibraryStripMargin = true

project.git = true
project.excludeFilters = [".*/metals.sbt", ".*/target/.*"]

align = some
align.tokens = [
Expand Down
71 changes: 53 additions & 18 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import sbt.Keys._
import sbtrelease.ReleaseStateTransformations._

val scala211 = "2.11.12"
val scala212 = "2.12.14"
val supportedScalaVersions = Seq(scala212, scala211)
val scala212 = "2.12.16"
val scala213 = "2.13.10"
val supportedScalaVersions = Seq(scala213, scala212, scala211)

ThisBuild / description := "Spark Connector for Tarantool and Tarantool Cartridge"
ThisBuild / homepage := Some(url("https://github.com/tarantool/cartridge-spark"))
Expand Down Expand Up @@ -58,20 +60,43 @@ lazy val root = (project in file("."))
crossScalaVersions := supportedScalaVersions,
// Dependencies
libraryDependencies ++= (
commonDependencies ++ Seq(
"org.apache.spark" %% "spark-core" % "2.4.8" % "provided",
"org.apache.spark" %% "spark-sql" % "2.4.8" % "provided",
"org.apache.spark" %% "spark-hive" % "2.4.8" % "provided"
).map(
commonDependencies ++ ({
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, scalaMajor)) if scalaMajor < 12 =>
Seq(
"org.apache.spark" %% "spark-core" % "2.4.8" % "provided",
"org.apache.spark" %% "spark-sql" % "2.4.8" % "provided",
"org.apache.spark" %% "spark-hive" % "2.4.8" % "provided"
)
case _ =>
Seq(
"org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-hive" % "3.3.2" % "provided"
)
}
}).map(
_.exclude("org.slf4j", "slf4j-log4j12")
)
),
dependencyOverrides ++= Seq(
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.7.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.3",
"com.fasterxml.jackson.core" % "jackson-core" % "2.6.7",
"io.netty" % "netty-all" % "4.1.70.Final",
"org.slf4j" % "slf4j-api" % "1.7.36" % Test
dependencyOverrides ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, scalaMajor)) if scalaMajor < 12 =>
Seq(
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.7.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.3",
"com.fasterxml.jackson.core" % "jackson-core" % "2.6.7"
)
case _ =>
Seq(
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.14.2",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.14.2",
"com.fasterxml.jackson.core" % "jackson-core" % "2.14.2"
)
}
} ++ Seq(
"io.netty" % "netty-all" % "4.1.74.Final",
"org.slf4j" % "slf4j-api" % "1.7.36" % Test
),
// Compiler options
javacOptions ++= Seq(
Expand Down Expand Up @@ -102,6 +127,21 @@ lazy val root = (project in file("."))
Some("releases".at(nexus + "service/local/staging/deploy/maven2"))
},
publishMavenStyle := true,
// Release settings
crossScalaVersions := Nil,
releaseProcess := Seq[ReleaseStep](
checkSnapshotDependencies,
inquireVersions,
runClean,
releaseStepCommandAndRemaining("+test"),
setReleaseVersion,
commitReleaseVersion,
tagRelease,
releaseStepCommandAndRemaining("+publishSigned"),
setNextVersion,
commitNextVersion,
pushChanges
),
releaseUseGlobalVersion := false,
releasePublishArtifactsAction := PgpKeys.publishSigned.value
)
Expand All @@ -123,8 +163,3 @@ ThisBuild / Test / javaOptions ++= Seq(

// ScalaTest
ThisBuild / Test / logBuffered := false

// Settings for scoverage plug-in
// Disabled until this bug is fixed: https://github.com/scoverage/sbt-scoverage/issues/306
// ThisBuild / coverageEnabled := true
ThisBuild / coverageHighlighting := true
2 changes: 0 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.13")

addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.3")

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.8.2")

addSbtPlugin("com.github.sbt" % "sbt-release" % "1.1.0")

addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.1.2")
30 changes: 10 additions & 20 deletions src/main/scala/io/tarantool/spark/connector/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,17 @@ trait Logging {
// be serialized and used on another machine
@transient private var _log: Logger = _

protected def logInfo(msg: => String) {
protected def logInfo(msg: => String): Unit =
if (log.isInfoEnabled) log.info(msg)
}

protected def logDebug(msg: => String) {
protected def logDebug(msg: => String): Unit =
if (log.isDebugEnabled) log.debug(msg)
}

protected def logTrace(msg: => String) {
protected def logTrace(msg: => String): Unit =
if (log.isTraceEnabled) log.trace(msg)
}

protected def logWarning(msg: => String) {
protected def logWarning(msg: => String): Unit =
if (log.isWarnEnabled) log.warn(msg)
}

protected def log: Logger = {
if (_log == null) {
Expand All @@ -43,29 +39,23 @@ trait Logging {
// Ignore trailing $'s in the class names for Scala objects
this.getClass.getName.stripSuffix("$")

protected def logError(msg: => String) {
protected def logError(msg: => String): Unit =
if (log.isErrorEnabled) log.error(msg)
}

protected def logInfo(msg: => String, throwable: Throwable) {
protected def logInfo(msg: => String, throwable: Throwable): Unit =
if (log.isInfoEnabled) log.info(msg, throwable)
}

protected def logDebug(msg: => String, throwable: Throwable) {
protected def logDebug(msg: => String, throwable: Throwable): Unit =
if (log.isDebugEnabled) log.debug(msg, throwable)
}

protected def logTrace(msg: => String, throwable: Throwable) {
protected def logTrace(msg: => String, throwable: Throwable): Unit =
if (log.isTraceEnabled) log.trace(msg, throwable)
}

protected def logWarning(msg: => String, throwable: Throwable) {
protected def logWarning(msg: => String, throwable: Throwable): Unit =
if (log.isWarnEnabled) log.warn(msg, throwable)
}

protected def logError(msg: => String, throwable: Throwable) {
protected def logError(msg: => String, throwable: Throwable): Unit =
if (log.isErrorEnabled) log.error(msg, throwable)
}

protected def isTraceEnabled: Boolean =
log.isTraceEnabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ object TarantoolConfig {
hosts = Array(new TarantoolServerAddress(TarantoolDefaults.DEFAULT_HOST))
}

hosts
hosts.toSeq
}

def parseTimeouts(cfg: SparkConf): Timeouts =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TarantoolReadRDD[R] private[spark] (
val connection = TarantoolConnection()
context.addTaskCompletionListener {
new Function1[TaskContext, Unit] {
def apply(context: TaskContext) { connection.close() }
def apply(context: TaskContext): Unit = connection.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import java.io.{OptionalDataException, PrintWriter, StringWriter}
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicLong
import java.util.{Collections, LinkedList => JLinkedList, List => JList}
import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.JavaConverters
import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
Expand Down Expand Up @@ -271,13 +270,13 @@ class TarantoolWriteRDD[R] private[spark] (
var savedException: Throwable = null
try {
CompletableFuture
.allOf(allFutures: _*)
.allOf(allFutures.toSeq: _*)
.handle(toJavaBiFunction { (_: Void, exception: Throwable) =>
if (!failedRowsExceptions.isEmpty) {
val sw: StringWriter = new StringWriter()
val pw: PrintWriter = new PrintWriter(sw)
try {
failedRowsExceptions.foreach { exception =>
JavaConverters.collectionAsScalaIterableConverter(failedRowsExceptions).asScala.foreach { exception =>
pw.append("\n\n")
exception.printStackTrace(pw)
}
Expand Down
18 changes: 9 additions & 9 deletions src/main/scala/org/apache/spark/sql/tarantool/MapFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,16 @@ object MapFunctions {

def mapSimpleValue(value: Any): Any =
value match {
case value: BigInt => value.underlying()
case value: BigDecimal => value.underlying()
case value: BigInt => value
case value: BigDecimal => value
case value: Boolean => value.booleanValue().asInstanceOf[JBoolean]
case value: Byte => value.underlying().asInstanceOf[JByte]
case value: Char => value.underlying().asInstanceOf[JCharacter]
case value: Short => value.underlying().asInstanceOf[JShort]
case value: Int => value.underlying().asInstanceOf[JInteger]
case value: Long => value.underlying().asInstanceOf[JLong]
case value: Float => value.underlying().asInstanceOf[JFloat]
case value: Double => value.underlying().asInstanceOf[JDouble]
case value: Byte => value.asInstanceOf[JByte]
case value: Char => value.asInstanceOf[JCharacter]
case value: Short => value.asInstanceOf[JShort]
case value: Int => value.asInstanceOf[JInteger]
case value: Long => value.asInstanceOf[JLong]
case value: Float => value.asInstanceOf[JFloat]
case value: Double => value.asInstanceOf[JDouble]
case value: Any => identity(value)
}
}
5 changes: 0 additions & 5 deletions src/test/resources/cartridge/instances_2.11.12_2.8.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,3 @@ testapp.s2-replica:
workdir: ./tmp/db_dev/3305
advertise_uri: localhost:3305
http_port: 8085

testapp-stateboard:
workdir: ./tmp/db_dev/3310
listen: localhost:3310
password: passwd
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,3 @@ testapp.s2-replica:
workdir: ./tmp/db_dev/3335
advertise_uri: localhost:3335
http_port: 8385

testapp-stateboard:
workdir: ./tmp/db_dev/3340
listen: localhost:3340
password: passwd
29 changes: 29 additions & 0 deletions src/test/resources/cartridge/instances_2.13.10_2.8.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
testapp.router:
workdir: ./tmp/db_dev/3361
advertise_uri: 0.0.0.0:3361
http_port: 8681

testapp.second-router:
workdir: ./tmp/db_dev/3371
advertise_uri: 0.0.0.0:3371
http_port: 8691

testapp.s1-master:
workdir: ./tmp/db_dev/3362
advertise_uri: localhost:3362
http_port: 8682

testapp.s1-replica:
workdir: ./tmp/db_dev/3363
advertise_uri: localhost:3363
http_port: 8683

testapp.s2-master:
workdir: ./tmp/db_dev/3364
advertise_uri: localhost:3364
http_port: 8684

testapp.s2-replica:
workdir: ./tmp/db_dev/3365
advertise_uri: localhost:3365
http_port: 8685
19 changes: 19 additions & 0 deletions src/test/resources/cartridge/topology_2.13.10_2.8.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
cartridge = require('cartridge')
replicasets = {{
alias = 'app-router',
roles = {'vshard-router', 'app.roles.api_router'},
join_servers = {{uri = '0.0.0.0:3361'}}
}, {
alias = 'app-router-second',
roles = {'vshard-router', 'app.roles.api_router'},
join_servers = {{uri = '0.0.0.0:3371'}}
}, {
alias = 's1-storage',
roles = {'vshard-storage', 'app.roles.api_storage'},
join_servers = {{uri = 'localhost:3362'}, {uri = 'localhost:3363'}}
}, {
alias = 's2-storage',
roles = {'vshard-storage', 'app.roles.api_storage'},
join_servers = {{uri = 'localhost:3364'}, {uri = 'localhost:3365'}}
}}
return cartridge.admin_edit_topology({replicasets = replicasets})

0 comments on commit 4910236

Please sign in to comment.