Skip to content

Commit

Permalink
Merge pull request #1223 from datastax/YannMoisan-SPARKC-458-scala-21…
Browse files Browse the repository at this point in the history
…2-support

Yann moisan sparkc 458 scala 212 support
  • Loading branch information
RussellSpitzer authored Nov 25, 2019
2 parents 29e31d3 + d55e33d commit 912201b
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 39 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ metastore_db
.worksheet
.idea
.idea_modules
*.ipr
*.iws

checkpoint
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ sudo: required
dist: trusty
scala:
- 2.11.12
- 2.12.10

env:
- CASSANDRA_VERSION=2.1.15
Expand Down
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
2.4.2
* Support for Scala 2.12 (SPARKC-458)

2.4.1
* Includes all up to 2.3.3

Expand Down
3 changes: 2 additions & 1 deletion project/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ object Settings extends Build {
lazy val buildSettings = Seq(
organization := "com.datastax.spark",
version in ThisBuild := currentVersion,
scalaVersion := Versions.scalaVersion,
scalaVersion := Versions.scala211,
crossScalaVersions := Seq(Versions.scala211, Versions.scala212),
crossVersion := CrossVersion.binary,
versionStatus := Versions.status(scalaVersion.value, scalaBinaryVersion.value)
)
Expand Down
2 changes: 1 addition & 1 deletion project/SparkCassandraConnectorBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object CassandraSparkBuild extends Build {
lazy val cassandraServerProject = Project(
id = "cassandra-server",
base = file(namespace),
settings = defaultSettings ++ Seq(
settings = defaultSettings ++ sparkPackageSettings ++ Seq(
libraryDependencies ++= Seq(Artifacts.cassandraServer % "it", Artifacts.airlift),
Testing.cassandraServerClasspath := {
(fullClasspath in IntegrationTest).value.map(_.data.getAbsoluteFile).mkString(File.pathSeparator)
Expand Down
5 changes: 3 additions & 2 deletions project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import scala.util.Properties
object Versions {


lazy val scalaVersion = "2.11.12"
lazy val scala211 = "2.11.7"
lazy val scala212 = "2.12.8"

/* For `scalaBinaryVersion.value outside an sbt task. */
lazy val scalaBinary = scalaVersion.dropRight(2)
lazy val scalaBinary = scala212.dropRight(2)

val Akka = "2.3.4"
val Cassandra = "3.11.3"
Expand Down
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.8.5")

addSbtPlugin("com.scalapenos" % "sbt-prompt" % "1.0.0")

addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1")

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.0.4")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0")

//SbtAssembly 0.12.0 is included in sbt-spark-package
resolvers += "Spark Packages Main repo" at "https://dl.bintray.com/spark-packages/maven"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.datastax.spark.connector.embedded

import java.io._
import java.net.URLClassLoader

import org.apache.spark.SparkConf
import org.apache.spark.repl.{Main, SparkILoop}

import scala.collection.mutable.ArrayBuffer
import scala.tools.nsc.GenericRunnerSettings

object SparkRepl {

def runInterpreter(input: String, conf: SparkConf): String = {
val in = new BufferedReader(new StringReader(input + "\n"))
val out = new StringWriter()
val cl = getClass.getClassLoader
var paths = new ArrayBuffer[String]
cl match {
case urlLoader: URLClassLoader =>
for (url <- urlLoader.getURLs) {
if (url.getProtocol == "file") {
paths += url.getFile
}
}
case _ =>
}

Main.conf.setAll(conf.getAll)
val interp = new SparkILoop(Some(in), new PrintWriter(out))
Main.interp = interp
val separator = System.getProperty("path.separator")
val settings = new GenericRunnerSettings(s => throw new RuntimeException(s"Scala options error: $s"))
settings.processArguments(List("-classpath", paths.mkString(separator)), true)
interp.process(settings) // Repl starts and goes in loop of R.E.P.L
Main.interp = null
Option(Main.sparkContext).foreach(_.stop())
System.clearProperty("spark.driver.port")
out.toString
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.datastax.spark.connector.util

import scala.reflect.runtime.universe._

private[connector] object Reflect {

def constructor(tpe: Type): Symbol = tpe.decl(termNames.CONSTRUCTOR)

def member(tpe: Type, name: String): Symbol = tpe.member(TermName(name))

def methodSymbol(tpe: Type): MethodSymbol = {
val constructors = constructor(tpe).asTerm.alternatives.map(_.asMethod)
val paramCount = constructors.map(_.paramLists.flatten.size).max
constructors.filter(_.paramLists.flatten.size == paramCount) match {
case List(onlyOne) => onlyOne
case _ => throw new IllegalArgumentException(
"Multiple constructors with the same number of parameters not allowed.")
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import java.util.Map;
import java.util.Set;

import com.datastax.spark.connector.japi.CassandraJavaUtil;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import scala.reflect.api.TypeTags;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;
Expand All @@ -31,61 +35,71 @@
@SuppressWarnings("unchecked")
public class CassandraJavaUtilTest {

/**
* Scala refelection type tags change the string reprsentation of some types, in scala 2.11 java.lang
* is included, in scala 2.12 it is removed. To remove this conflict we just always remove the java.lang
* portion
*/
private String removeJavaLang(String target) {
return target.replaceAll("java.lang.", "");
}

private final String STRING = removeJavaLang(String.class.getName());
private final String LIST_STRING =
removeJavaLang(String.format("%s[%s]", List.class.getName(), String.class.getName()));
private final String MAP_STRING_INT =
removeJavaLang(String.format("%s[%s,%s]", Map.class.getName(), String.class.getName(), Integer.class.getName()));
private final String LIST_SET_MAP_STRING_INT =
removeJavaLang(String.format("%s[%s[%s[%s,%s]]]", List.class.getName(), Set.class.getName(), Map.class.getName(), String.class.getName(), Integer.class.getName()));

@Test
public void testTypeTag1() throws Exception {
TypeTags.TypeTag<String> tt = typeTag(String.class);
assertThat(tt.tpe().toString(), is(String.class.getName()));
assertThat(removeJavaLang(tt.tpe().toString()), is(STRING));
}

@Test
public void testTypeTag2() throws Exception {
TypeTags.TypeTag<List> tt1 = typeTag(List.class, String.class);
assertThat(tt1.tpe().toString(), is(String.format("%s[%s]",
List.class.getName(), String.class.getName())));
assertThat(removeJavaLang(removeJavaLang(tt1.tpe().toString())), is(LIST_STRING));

TypeTags.TypeTag<Map> tt2 = typeTag(Map.class, String.class, Integer.class);
assertThat(tt2.tpe().toString(), is(String.format("%s[%s,%s]",
Map.class.getName(), String.class.getName(), Integer.class.getName())));
assertThat(removeJavaLang(removeJavaLang(tt2.tpe().toString())), is(MAP_STRING_INT));
}

@Test
public void testTypeTag3() throws Exception {
TypeTags.TypeTag<List> tt = typeTag(List.class, typeTag(Set.class, typeTag(Map.class, typeTag(String.class), typeTag(Integer.class))));
assertThat(tt.tpe().toString(), is(String.format("%s[%s[%s[%s,%s]]]",
List.class.getName(), Set.class.getName(), Map.class.getName(), String.class.getName(), Integer.class.getName())));
assertThat(removeJavaLang(tt.tpe().toString()), is(LIST_SET_MAP_STRING_INT));
}

@Test
public void testTypeConverter1() throws Exception {
TypeConverter<List<String>> tc = typeConverter(String.class);
assertThat(tc.targetTypeName(), is(String.class.getSimpleName()));
assertThat(removeJavaLang(tc.targetTypeName()), is(STRING));
}

@Test
public void testTypeConverter2() throws Exception {
TypeConverter<List<String>> tc1 = typeConverter(List.class, String.class);
assertThat(tc1.targetTypeName(), is(String.format("%s[%s]",
List.class.getName(), String.class.getSimpleName())));
assertThat(removeJavaLang(tc1.targetTypeName()), is(LIST_STRING));

TypeConverter<Map<String, Integer>> tc2 = typeConverter(Map.class, String.class, Integer.class);
assertThat(tc2.targetTypeName(), is(String.format("%s[%s,%s]",
Map.class.getName(), String.class.getSimpleName(), Integer.class.getName())));
assertThat(removeJavaLang(tc2.targetTypeName()), is(MAP_STRING_INT));

}

@Test
public void testTypeConverter3() throws Exception {
TypeConverter<List> tc = typeConverter(List.class, typeTag(Set.class, typeTag(Map.class, typeTag(String.class), typeTag(Integer.class))));
assertThat(tc.targetTypeName(), is(String.format("%s[%s[%s[%s,%s]]]",
List.class.getName(), Set.class.getName(), Map.class.getName(), String.class.getSimpleName(), Integer.class.getName())));
assertThat(removeJavaLang(tc.targetTypeName()), is(LIST_SET_MAP_STRING_INT));
}

@Test
public void testTypeConverter4() throws Exception {
TypeTags.TypeTag<List> tt = typeTag(List.class, typeTag(Set.class, typeTag(Map.class, typeTag(String.class), typeTag(Integer.class))));
TypeConverter<List> tc = typeConverter(tt);
assertThat(tc.targetTypeName(), is(String.format("%s[%s[%s[%s,%s]]]",
List.class.getName(), Set.class.getName(), Map.class.getName(), String.class.getSimpleName(), Integer.class.getName())));
assertThat(removeJavaLang(tc.targetTypeName()), is(LIST_SET_MAP_STRING_INT));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.datastax.spark.connector.util.{CountingIterator, CqlWhereParser}
import com.datastax.spark.connector.writer._
import org.apache.spark.metrics.InputMetricsUpdater
import org.apache.spark.rdd.RDD
import org.apache.spark.util.TaskCompletionListener
import org.apache.spark.{Partition, TaskContext}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -167,15 +168,19 @@ private[rdd] trait AbstractCassandraJoin[L, R] {
val rowIterator = fetchIterator(session, bsb, rowMetadata, left.iterator(split, context))
val countingIterator = new CountingIterator(rowIterator, None)

context.addTaskCompletionListener { (context) =>
val duration = metricsUpdater.finish() / 1000000000d
logDebug(
f"Fetched ${countingIterator.count} rows " +
f"from $keyspaceName.$tableName " +
f"for partition ${split.index} in $duration%.3f s."
)
session.close()
val listener : TaskCompletionListener = new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = {
val duration = metricsUpdater.finish() / 1000000000d
logDebug(
f"Fetched ${countingIterator.count} rows " +
f"from $keyspaceName.$tableName " +
f"for partition ${split.index} in $duration%.3f s."
)
session.close()
}
}

context.addTaskCompletionListener(listener)
countingIterator
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import com.datastax.spark.connector.util.{CountingIterator, CqlWhereParser, Refl
import com.datastax.spark.connector.writer.RowWriterFactory
import org.apache.spark.metrics.InputMetricsUpdater
import org.apache.spark.rdd.{PartitionCoalescer, RDD}
import org.apache.spark.util.TaskCompletionListener
import org.apache.spark.{Partition, Partitioner, SparkContext, TaskContext}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -367,12 +368,16 @@ class CassandraTableScanRDD[R] private[connector](
fetchTokenRange(scanner, _: CqlTokenRange[_, _], metricsUpdater))
val countingIterator = new CountingIterator(rowIterator, limitForIterator(limit))

context.addTaskCompletionListener { (context) =>
val duration = metricsUpdater.finish() / 1000000000d
logDebug(f"Fetched ${countingIterator.count} rows from $keyspaceName.$tableName " +
f"for partition ${partition.index} in $duration%.3f s.")
scanner.close()
val listener : TaskCompletionListener = new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = {
val duration = metricsUpdater.finish() / 1000000000d
logDebug(f"Fetched ${countingIterator.count} rows from $keyspaceName.$tableName " +
f"for partition ${partition.index} in $duration%.3f s.")
scanner.close()
}
}

context.addTaskCompletionListener(listener)
countingIterator
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.datastax.spark.connector.rdd.partitioner.dht.TokenFactory.forSystemLo
import com.datastax.spark.connector.types.{InetType, UUIDType, VarIntType}
import com.datastax.spark.connector.util.Quote._
import com.datastax.spark.connector.util.{ConfigParameter, Logging, ReflectionUtil}
import com.datastax.spark.connector.writer.{SqlRowWriter, WriteConf}
import com.datastax.spark.connector.writer.{RowWriterFactory, SqlRowWriter, WriteConf}
import com.datastax.spark.connector.{SomeColumns, _}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -43,6 +43,10 @@ private[cassandra] class CassandraSourceRelation(
with PrunedFilteredScan
with Logging {

implicit val readconf: ReadConf = readConf
implicit val rwf: RowWriterFactory[Row] = SqlRowWriter.Factory
implicit val cassandraConnector: CassandraConnector = connector

private[this] val tableDef = Schema.tableFromCassandra(
connector,
tableRef.keyspace,
Expand All @@ -52,6 +56,7 @@ private[cassandra] class CassandraSourceRelation(
userSpecifiedSchema.getOrElse(StructType(tableDef.columns.map(toStructField)))
}


override def insert(data: DataFrame, overwrite: Boolean): Unit = {
if (overwrite) {
if (confirmTruncate) {
Expand All @@ -71,18 +76,17 @@ private[cassandra] class CassandraSourceRelation(

}

implicit val rwf = SqlRowWriter.Factory
val columns = SomeColumns(data.columns.map(x => x: ColumnRef): _*)
data.rdd.saveToCassandra(tableRef.keyspace, tableRef.table, columns, writeConf)
}



override def sizeInBytes: Long = {
// If it's not found, use SQLConf default setting
tableSizeInBytes.getOrElse(sqlContext.conf.defaultSizeInBytes)
}

implicit val cassandraConnector = connector
implicit val readconf = readConf
private[this] val baseRdd =
sqlContext.sparkContext.cassandraTable[CassandraSQLRow](tableRef.keyspace, tableRef.table)

Expand Down

0 comments on commit 912201b

Please sign in to comment.