From c81cd77b76ac849403000a81b580f9c66844e8f4 Mon Sep 17 00:00:00 2001 From: Malcolm Greaves Date: Fri, 6 Jan 2017 19:03:35 -0800 Subject: [PATCH] Published 0.0.0 on sonatype, scalafmt --- build.sbt | 52 ++++----- data-tc-extra/build.sbt | 13 +-- .../src/main/scala/fif/ops/ToMap.scala | 6 +- .../src/test/scala/fif/TestHelpers.scala | 2 +- .../src/test/scala/fif/ops/SumTest.scala | 1 - .../src/test/scala/fif/ops/ToMapTest.scala | 5 +- data-tc-flink/build.sbt | 11 +- .../scala/com/nitro/absnlp/FlinkData.scala | 97 ++++++++-------- .../scala/com/nitro/absnlp/FlinkHelper.scala | 41 ++++--- .../com/nitro/absnlp/FlinkDataTest.scala | 105 +++++++++--------- data-tc-scala/build.sbt | 12 +- .../src/main/scala/fif/ArrayData.scala | 25 +++-- data-tc-scala/src/main/scala/fif/Data.scala | 27 +++-- .../src/main/scala/fif/SeqData.scala | 20 ++-- .../src/main/scala/fif/TravData.scala | 29 +++-- .../src/test/scala/fif/ArrayDataTest.scala | 2 +- .../test/scala/fif/CollectionDataTest.scala | 11 +- .../fif/ImplicitCollectionsDataTest.scala | 2 +- .../src/test/scala/fif/SeqDataTest.scala | 2 +- .../src/test/scala/fif/TravDataTest.scala | 2 +- data-tc-spark/build.sbt | 17 +-- .../src/main/scala/fif/ImplicitRddData.scala | 2 +- .../src/main/scala/fif/RddData.scala | 22 ++-- .../fif/spark/KryoSerializationWrapper.scala | 67 +++++------ .../main/scala/fif/spark/KryoSerializer.scala | 34 +++--- .../scala/fif/spark/RddSerializedOps.scala | 78 +++++++------ .../fif/spark/avroparquet/RddHelpers.scala | 44 +++++--- .../src/test/scala/fif/RddDataTest.scala | 19 +++- .../fif/spark/KryoSerializationTest.scala | 19 ++-- .../fif/spark/RddSerializedOpsTest.scala | 21 ++-- .../spark/avroparquet/RddHelpersTest.scala | 18 +-- .../fif/spark/avroparquet/SampleEntity.scala | 56 ++++++---- project/SharedBuild.scala | 47 ++++---- project/plugins.sbt | 6 +- 34 files changed, 485 insertions(+), 430 deletions(-) diff --git a/build.sbt b/build.sbt index c0af1c9..5d31a04 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := "data-tc" organization in ThisBuild := "io.malcolmgreaves" -version in ThisBuild := { +version in ThisBuild := { val major: Int = 0 val minor: Int = 0 val patch: Int = 0 @@ -8,9 +8,7 @@ version in ThisBuild := { } import SharedBuild._ -com.typesafe.sbt.SbtScalariform.defaultScalariformSettings -ScalariformKeys.preferences := sharedCodeFmt - + lazy val root = project .in(file(".")) .aggregate( @@ -21,35 +19,26 @@ lazy val root = project ) .settings { publishArtifact := false - publishLocal := {} - publish := {} + publishLocal := {} + publish := {} } -lazy val `data-tc-scala` = project - .in(file("data-tc-scala")) - .settings { - publishArtifact := true - } +lazy val `data-tc-scala` = project.in(file("data-tc-scala")).settings { + publishArtifact := true +} -lazy val `data-tc-spark` = project - .in(file("data-tc-spark")) - .dependsOn(`data-tc-scala`) - .settings { +lazy val `data-tc-spark` = + project.in(file("data-tc-spark")).dependsOn(`data-tc-scala`).settings { publishArtifact := true } -lazy val `data-tc-flink` = project - .in(file("data-tc-flink")) - .dependsOn(`data-tc-scala`) - .settings { +lazy val `data-tc-flink` = + project.in(file("data-tc-flink")).dependsOn(`data-tc-scala`).settings { publishArtifact := true } - -lazy val `data-tc-extra` = project - .in(file("data-tc-extra")) - .dependsOn(`data-tc-scala`) - .settings { +lazy val `data-tc-extra` = + project.in(file("data-tc-extra")).dependsOn(`data-tc-scala`).settings { publishArtifact := true } @@ -58,22 +47,20 @@ lazy val publishTasks = subprojects.map { publish.in } resolvers in ThisBuild := Seq( // sonatype, maven central - "Sonatype Releases" at "https://oss.sonatype.org/content/repositories/releases/", + "Sonatype Releases" at "https://oss.sonatype.org/content/repositories/releases/", "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/", - // bintray "Scalaz Bintray" at "http://dl.bintray.com/scalaz/releases", Resolver.bintrayRepo("mfglabs", "maven"), Resolver.bintrayRepo("dwhjames", "maven"), - // etc. "Confluent" at "http://packages.confluent.io/maven/" ) // runtime & compiliation -lazy val javaV = "1.8" -scalaVersion in ThisBuild := "2.11.8" +lazy val javaV = "1.8" +scalaVersion in ThisBuild := "2.11.8" scalacOptions in ThisBuild := Seq( "-optimize", "-deprecation", @@ -96,12 +83,13 @@ scalacOptions in ThisBuild := Seq( "-Xfatal-warnings" // Every warning is esclated to an error. ) javacOptions in ThisBuild := Seq("-source", javaV, "-target", javaV) -javaOptions in ThisBuild := Seq( - "-server", - "-XX:+AggressiveOpts", +javaOptions in ThisBuild := Seq( + "-server", + "-XX:+AggressiveOpts", "-XX:+TieredCompilation", "-XX:CompileThreshold=100", "-Xmx3000M", "-XX:+UseG1GC" ) +publishArtifact := false diff --git a/data-tc-extra/build.sbt b/data-tc-extra/build.sbt index 3412059..7c9abd6 100644 --- a/data-tc-extra/build.sbt +++ b/data-tc-extra/build.sbt @@ -2,15 +2,11 @@ name := "data-tc-extra" import SharedBuild._ -com.typesafe.sbt.SbtScalariform.defaultScalariformSettings -ScalariformKeys.preferences := sharedCodeFmt - addCompilerPlugin(scalaMacros) -libraryDependencies ++= +libraryDependencies ++= extraDeps ++ - testDeps - + testDeps // doc hacks @@ -21,7 +17,8 @@ sources in (Compile, doc) ~= (_ filter (_.getName endsWith "DataOps.scala")) // // test, runtime settings // -fork in run := true -fork in Test := true +fork in run := true +fork in Test := true parallelExecution in Test := true +pomExtra := pomExtraInfo diff --git a/data-tc-extra/src/main/scala/fif/ops/ToMap.scala b/data-tc-extra/src/main/scala/fif/ops/ToMap.scala index c78be49..6adb595 100644 --- a/data-tc-extra/src/main/scala/fif/ops/ToMap.scala +++ b/data-tc-extra/src/main/scala/fif/ops/ToMap.scala @@ -29,12 +29,14 @@ object ToMap extends Serializable { } } - def apply[T: ClassTag, U: ClassTag: Semigroup, D[_]: Data](data: D[(T, U)]): Map[T, U] = { + def apply[T: ClassTag, U: ClassTag: Semigroup, D[_]: Data]( + data: D[(T, U)]): Map[T, U] = { implicit val _ = identity[(T, U)] _ apply[(T, U), T, U, D](data) } - def apply[A, T: ClassTag, U: ClassTag: Semigroup, D[_]: Data](data: D[A])(implicit ev: A <:< (T, U)): Map[T, U] = { + def apply[A, T: ClassTag, U: ClassTag: Semigroup, D[_]: Data](data: D[A])( + implicit ev: A <:< (T, U)): Map[T, U] = { val sg = implicitly[Semigroup[U]] diff --git a/data-tc-extra/src/test/scala/fif/TestHelpers.scala b/data-tc-extra/src/test/scala/fif/TestHelpers.scala index 3dc4382..df9ee5f 100644 --- a/data-tc-extra/src/test/scala/fif/TestHelpers.scala +++ b/data-tc-extra/src/test/scala/fif/TestHelpers.scala @@ -8,4 +8,4 @@ object TestHelpers { override def combine(a: Int, b: Int) = a + b } -} \ No newline at end of file +} diff --git a/data-tc-extra/src/test/scala/fif/ops/SumTest.scala b/data-tc-extra/src/test/scala/fif/ops/SumTest.scala index 3f04b2c..fdfe1f9 100644 --- a/data-tc-extra/src/test/scala/fif/ops/SumTest.scala +++ b/data-tc-extra/src/test/scala/fif/ops/SumTest.scala @@ -27,4 +27,3 @@ class SumTest extends FunSuite { } } - diff --git a/data-tc-extra/src/test/scala/fif/ops/ToMapTest.scala b/data-tc-extra/src/test/scala/fif/ops/ToMapTest.scala index 513baaa..d5a73a1 100644 --- a/data-tc-extra/src/test/scala/fif/ops/ToMapTest.scala +++ b/data-tc-extra/src/test/scala/fif/ops/ToMapTest.scala @@ -19,7 +19,8 @@ class ToMapTest extends FunSuite { test("ToMap list of many elements") { val l = Traversable( - ("hello", 10), ("hello", 20), + ("hello", 10), + ("hello", 20), ("world", 30), ("sunday funday", 40), ("sunday funday", 50), @@ -56,4 +57,4 @@ class ToMapTest extends FunSuite { assert(ToMap.combine(first, second) == expected) } -} \ No newline at end of file +} diff --git a/data-tc-flink/build.sbt b/data-tc-flink/build.sbt index 29cd07e..182047d 100644 --- a/data-tc-flink/build.sbt +++ b/data-tc-flink/build.sbt @@ -2,16 +2,15 @@ name := "data-tc-flink" import SharedBuild._ -com.typesafe.sbt.SbtScalariform.defaultScalariformSettings -ScalariformKeys.preferences := sharedCodeFmt - addCompilerPlugin(scalaMacros) -libraryDependencies ++= +libraryDependencies ++= flinkTcDeps ++ - testDeps + testDeps testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-v") testOptions in Test += Tests.Argument("-oF") -fork in Test := true +fork in Test := true parallelExecution in Test := true + +pomExtra := pomExtraInfo diff --git a/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkData.scala b/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkData.scala index e8d8614..9c8d4cd 100644 --- a/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkData.scala +++ b/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkData.scala @@ -10,11 +10,11 @@ import scala.util.Try import fif.Data /** - * Implementation of the Data typeclass with the Flink DataSet type. - * - * Since DataSet instances are lazy, this typeclass implementation's methods - * have lazy (vs. eager) semantics. - */ + * Implementation of the Data typeclass with the Flink DataSet type. + * + * Since DataSet instances are lazy, this typeclass implementation's methods + * have lazy (vs. eager) semantics. + */ case object FlinkData extends Data[DataSet] { import Data.ops._ @@ -24,7 +24,8 @@ case object FlinkData extends Data[DataSet] { data.map(f) } - override def mapParition[A, B: ClassTag](d: DataSet[A])(f: Iterable[A] => Iterable[B]): DataSet[B] = { + override def mapParition[A, B: ClassTag](d: DataSet[A])( + f: Iterable[A] => Iterable[B]): DataSet[B] = { implicit val ti = FlinkHelper.typeInfo[B] d.mapPartition { partition => f(partition.toIterable) @@ -41,7 +42,8 @@ case object FlinkData extends Data[DataSet] { () } - override def foreachPartition[A](d: DataSet[A])(f: Iterable[A] => Any): Unit = { + override def foreachPartition[A](d: DataSet[A])( + f: Iterable[A] => Any): Unit = { // ignore this map operation's return type implicit val ti = FlinkHelper.unitTypeInformation d.mapPartition { partition => @@ -54,24 +56,22 @@ case object FlinkData extends Data[DataSet] { override def filter[A](d: DataSet[A])(f: A => Boolean): DataSet[A] = d.filter(f) - override def aggregate[A, B: ClassTag](data: DataSet[A])(zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = { + override def aggregate[A, B: ClassTag](data: DataSet[A])( + zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = { implicit val ti = FlinkHelper.typeInfo[B] - data - .mapPartition { partition => - Seq(partition.foldLeft(zero)(seqOp)) - } - .reduce(combOp) - .collect() - .reduce(combOp) + data.mapPartition { partition => + Seq(partition.foldLeft(zero)(seqOp)) + }.reduce(combOp).collect().reduce(combOp) } /** - * Unimplemented! - * - * Flink doesn't support an API for total sorting. Must determine a correct - * implementation using the lower-level API. - */ - override def sortBy[A, B: ClassTag](d: DataSet[A])(f: (A) ⇒ B)(implicit ord: math.Ordering[B]): DataSet[A] = + * Unimplemented! + * + * Flink doesn't support an API for total sorting. Must determine a correct + * implementation using the lower-level API. + */ + override def sortBy[A, B: ClassTag](d: DataSet[A])(f: (A) ⇒ B)( + implicit ord: math.Ordering[B]): DataSet[A] = ??? override def take[A](d: DataSet[A])(k: Int): Traversable[A] = @@ -83,40 +83,38 @@ case object FlinkData extends Data[DataSet] { override def toSeq[A](d: DataSet[A]): Seq[A] = d.collect() - override def flatMap[A, B: ClassTag](d: DataSet[A])(f: A => TraversableOnce[B]): DataSet[B] = { + override def flatMap[A, B: ClassTag](d: DataSet[A])( + f: A => TraversableOnce[B]): DataSet[B] = { implicit val ti = FlinkHelper.typeInfo[B] d.flatMap(f) } - override def flatten[A, B: ClassTag](d: DataSet[A])(implicit asDataSet: A => TraversableOnce[B]): DataSet[B] = + override def flatten[A, B: ClassTag](d: DataSet[A])( + implicit asDataSet: A => TraversableOnce[B]): DataSet[B] = flatMap(d)(asDataSet) - override def groupBy[A, B: ClassTag](data: DataSet[A])(f: A => B): DataSet[(B, Iterable[A])] = { + override def groupBy[A, B: ClassTag](data: DataSet[A])( + f: A => B): DataSet[(B, Iterable[A])] = { val reducedToMaps = { - implicit val ti: TypeInformation[scala.collection.immutable.Map[B, Iterable[A]]] = + implicit val ti: TypeInformation[ + scala.collection.immutable.Map[B, Iterable[A]]] = FlinkHelper.typeInfo(ClassTag(classOf[Map[B, Iterable[A]]])) - data - .mapPartition { partition => - Seq( - partition - .toIterable - .groupBy(f) - .map { - case (key, values) => (key, values.toIterable) - } - ) - } - .reduce(FlinkHelper.mapCombine[B, A] _) + data.mapPartition { partition => + Seq( + partition.toIterable.groupBy(f).map { + case (key, values) => (key, values.toIterable) + } + ) + }.reduce(FlinkHelper.mapCombine[B, A] _) } implicit val ti: TypeInformation[(B, Iterable[A])] = FlinkHelper.typeInfo(ClassTag(classOf[(B, Iterable[A])])) - reducedToMaps - .flatMap(_.toSeq) + reducedToMaps.flatMap(_.toSeq) } override def reduce[A](d: DataSet[A])(op: (A, A) => A): A = @@ -129,19 +127,20 @@ case object FlinkData extends Data[DataSet] { size(d) == 0 /** - * Unimplemented! - * - * Waiting on support coming in Flink 0.10 ! - */ - override def zip[A, B: ClassTag](d: DataSet[A])(that: DataSet[B]): DataSet[(A, B)] = + * Unimplemented! + * + * Waiting on support coming in Flink 0.10 ! + */ + override def zip[A, B: ClassTag](d: DataSet[A])( + that: DataSet[B]): DataSet[(A, B)] = ??? /** - * Unimplemented! - * - * Waiting on support coming in Flink 0.10 ! - */ + * Unimplemented! + * + * Waiting on support coming in Flink 0.10 ! + */ override def zipWithIndex[A](d: DataSet[A]): DataSet[(A, Long)] = ??? -} \ No newline at end of file +} diff --git a/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkHelper.scala b/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkHelper.scala index 92ebe17..7edfd9e 100644 --- a/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkHelper.scala +++ b/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkHelper.scala @@ -9,11 +9,11 @@ import scala.language.higherKinds import scala.reflect.ClassTag /** - * Methods, values, and functions that provide some common functionality - * necessary for interacting with Flink DataSet objects. Most importantly - * is the typeInfo method that generates a TypeInformation instance from - * ClassTag evidence. - */ + * Methods, values, and functions that provide some common functionality + * necessary for interacting with Flink DataSet objects. Most importantly + * is the typeInfo method that generates a TypeInformation instance from + * ClassTag evidence. + */ object FlinkHelper extends Serializable { private[absnlp] val productClass: Class[Product] = @@ -24,7 +24,6 @@ object FlinkHelper extends Serializable { val fields = c.getFields if (fields.isEmpty) 1 - else fields.foldLeft(0) { case (result, field) => @@ -32,9 +31,9 @@ object FlinkHelper extends Serializable { } } - private type _M[B,A]=Map[B, Iterable[A]] + private type _M[B, A] = Map[B, Iterable[A]] - def mapCombine[B, A](m1: _M[B,A], m2: _M[B,A]): _M[B,A] = { + def mapCombine[B, A](m1: _M[B, A], m2: _M[B, A]): _M[B, A] = { val (larger, smaller) = if (m1.size > m2.size) @@ -66,7 +65,7 @@ object FlinkHelper extends Serializable { new TypeInformation[A] { - override def canEqual(x: Any): Boolean = + override def canEqual(x: Any): Boolean = x.getClass.isAssignableFrom(ct.runtimeClass) override lazy val isBasicType: Boolean = @@ -84,21 +83,19 @@ object FlinkHelper extends Serializable { override lazy val getTypeClass: Class[A] = ct.runtimeClass.asInstanceOf[Class[A]] - override lazy val getGenericParameters: java.util.List[TypeInformation[_]] = { + override lazy val getGenericParameters: java.util.List[TypeInformation[ + _]] = { import scala.collection.JavaConversions._ val tVars = ct.getClass.getTypeParameters if (tVars.isEmpty) emptyTypeInfoList - else - tVars - .map { typeVariable => - val genericClass = typeVariable.getGenericDeclaration - typeInfo(ClassTag(genericClass)) - } - .toList + tVars.map { typeVariable => + val genericClass = typeVariable.getGenericDeclaration + typeInfo(ClassTag(genericClass)) + }.toList } override lazy val isKeyType: Boolean = @@ -107,19 +104,19 @@ object FlinkHelper extends Serializable { override lazy val isSortKeyType: Boolean = isKeyType - override def createSerializer(config: ExecutionConfig): TypeSerializer[A] = + override def createSerializer( + config: ExecutionConfig): TypeSerializer[A] = new KryoSerializer[A](getTypeClass, config) - override val toString: String = + override val toString: String = s"TypeInformation for ${ct.runtimeClass.toString}" - override def equals(x: Any): Boolean = + override def equals(x: Any): Boolean = x != null && x.isInstanceOf[TypeInformation[_]] && this == x - override val hashCode: Int = + override val hashCode: Int = ct.hashCode } } } - diff --git a/data-tc-flink/src/test/scala/com/nitro/absnlp/FlinkDataTest.scala b/data-tc-flink/src/test/scala/com/nitro/absnlp/FlinkDataTest.scala index 5619d6d..12eb090 100644 --- a/data-tc-flink/src/test/scala/com/nitro/absnlp/FlinkDataTest.scala +++ b/data-tc-flink/src/test/scala/com/nitro/absnlp/FlinkDataTest.scala @@ -24,11 +24,11 @@ class FlinkDataTest extends FunSuite { test("test map") { - def addElementwise10[D[_]: Data](data: D[Int]): D[Int] = - data.map(_ + 10) + def addElementwise10[D[_]: Data](data: D[Int]): D[Int] = + data.map(_ + 10) - def addElementwise10_tc[D[_]](data: D[Int])(implicit ev: Data[D]): D[Int] = - ev.map(data)(_ + 10) + def addElementwise10_tc[D[_]](data: D[Int])(implicit ev: Data[D]): D[Int] = + ev.map(data)(_ + 10) { val changed = addElementwise10(data) @@ -47,8 +47,10 @@ class FlinkDataTest extends FunSuite { test("mapPartition") { - def mapParition10[D[_]: Data](data: D[Int]): D[Int] = - data.mapParition { elements => elements.map(_ + 10) } + def mapParition10[D[_]: Data](data: D[Int]): D[Int] = + data.mapParition { elements => + elements.map(_ + 10) + } val changed = mapParition10(data) assert(changed != data) @@ -57,64 +59,67 @@ class FlinkDataTest extends FunSuite { test("foreach") { - def testForeach[D[_]: Data](data: D[Int]): Unit = - data.foreach { x => - val res = x >= 1 && x <= 3 - if (!res) throw new RuntimeException - } + def testForeach[D[_]: Data](data: D[Int]): Unit = + data.foreach { x => + val res = x >= 1 && x <= 3 + if (!res) throw new RuntimeException + } testForeach(data) } test("foreachPartition") { - def testForeachPart[D[_]: Data](data: D[Int]): Unit = - data.foreachPartition(_.foreach { x => - val res = x >= 1 && x <= 3 - if (!res) throw new RuntimeException - }) + def testForeachPart[D[_]: Data](data: D[Int]): Unit = + data.foreachPartition(_.foreach { x => + val res = x >= 1 && x <= 3 + if (!res) throw new RuntimeException + }) testForeachPart(data) } test("aggregate") { - def aggregateTest[D[_]: Data](data: D[Int]): Int = - data.aggregate(0)(_ + _, _ + _) + def aggregateTest[D[_]: Data](data: D[Int]): Int = + data.aggregate(0)(_ + _, _ + _) assert(aggregateTest(data) == 6) } ignore("sortBy") { - def reverseSort[D[_]: Data](data: D[Int]): D[Int] = - data.sortBy(x => -x) + def reverseSort[D[_]: Data](data: D[Int]): D[Int] = + data.sortBy(x => -x) assert(reverseSort(data).collect() == Seq(3, 2, 1)) } test("take") { - def testTake[D[_]: Data](data: D[Int]): Boolean = - data.take(1) == Seq(1) && data.take(2) == Seq(1, 2) && data.take(3) == Seq(1, 2, 3) + def testTake[D[_]: Data](data: D[Int]): Boolean = + data.take(1) == Seq(1) && data.take(2) == Seq(1, 2) && data.take(3) == Seq( + 1, + 2, + 3) assert(testTake(data)) } test("toSeq") { - def testToSeqIs123[D[_]: Data](data: D[Int]): Boolean = - data.toSeq == Seq(1, 2, 3) + def testToSeqIs123[D[_]: Data](data: D[Int]): Boolean = + data.toSeq == Seq(1, 2, 3) assert(testToSeqIs123(data)) } test("flatMap") { - def testFlat[D[_]: Data](data: D[Int]): D[Int] = - data.flatMap { number => - (0 until number).map(_ => number) - } + def testFlat[D[_]: Data](data: D[Int]): D[Int] = + data.flatMap { number => + (0 until number).map(_ => number) + } val result = testFlat(data) assert(result.collect() == Seq(1, 2, 2, 3, 3, 3)) @@ -122,8 +127,8 @@ class FlinkDataTest extends FunSuite { test("flatten") { - def flattenTest[D[_]: Data](data: D[Seq[Int]]): D[Int] = - data.flatten + def flattenTest[D[_]: Data](data: D[Seq[Int]]): D[Int] = + data.flatten val expanded = data.map(x => Seq(x)) val flattened = flattenTest(expanded) @@ -132,8 +137,8 @@ class FlinkDataTest extends FunSuite { test("groupBy") { - def groupIt[D[_]: Data](data: D[Int]) = - data.groupBy { _ % 2 == 0 } + def groupIt[D[_]: Data](data: D[Int]) = + data.groupBy { _ % 2 == 0 } val evenGroup = groupIt(data).toSeq.toMap @@ -148,42 +153,41 @@ class FlinkDataTest extends FunSuite { test("size") { - def sizeIs3[D[_]: Data](data: D[Int]): Boolean = - data.size == 3 + def sizeIs3[D[_]: Data](data: D[Int]): Boolean = + data.size == 3 assert(sizeIs3(data)) } test("reduce") { - def foo[D[_]: Data](data: D[Int]): Int = - data.reduce { - case (a, b) => 1 + a + b - } + def foo[D[_]: Data](data: D[Int]): Int = + data.reduce { + case (a, b) => 1 + a + b + } val result = foo(data) assert(result == 8) } - test("filter") { - def f[D[_]: Data](data: D[Int]): D[Int] = - data.filter(_ % 2 == 0) + def f[D[_]: Data](data: D[Int]): D[Int] = + data.filter(_ % 2 == 0) assert(f(data).collect() == Seq(2)) } test("headOption") { - def h[D[_]: Data](data: D[Int]): Option[Int] = - data.headOption + def h[D[_]: Data](data: D[Int]): Option[Int] = + data.headOption assert(h(data) == Some(1)) assert(h(empty[Int]) == None) } test("isEmpty") { - def e[D[_]: Data](data: D[_]): Boolean = - data.isEmpty + def e[D[_]: Data](data: D[_]): Boolean = + data.isEmpty assert(!e(data)) assert(e(empty[Int])) @@ -196,7 +200,6 @@ class FlinkDataTest extends FunSuite { // assert(toM(data) == Map(1 -> 1, 2 -> 2, 3 -> 3)) // } - // test("sum") { // def s[D[_]: Data](data: D[Int]): Int = // fif.ops.Sum(data) @@ -205,17 +208,17 @@ class FlinkDataTest extends FunSuite { // } ignore("zipWithIndex") { - def foo[D[_]: Data](data: D[Int]): Unit = - assert(data.zipWithIndex == Seq((1, 0), (2, 1), (3, 2))) + def foo[D[_]: Data](data: D[Int]): Unit = + assert(data.zipWithIndex == Seq((1, 0), (2, 1), (3, 2))) foo(data) } ignore("zip") { - def foo[D[_]: Data](data: D[Int]): D[(Int, Int)] = - data.zip(data) + def foo[D[_]: Data](data: D[Int]): D[(Int, Int)] = + data.zip(data) assert(foo(data).collect() == Seq((1, 1), (2, 2), (3, 3))) } -} \ No newline at end of file +} diff --git a/data-tc-scala/build.sbt b/data-tc-scala/build.sbt index 16f946e..8ecb913 100644 --- a/data-tc-scala/build.sbt +++ b/data-tc-scala/build.sbt @@ -2,19 +2,17 @@ name := "data-tc-scala" import SharedBuild._ -com.typesafe.sbt.SbtScalariform.defaultScalariformSettings -ScalariformKeys.preferences := sharedCodeFmt - addCompilerPlugin(scalaMacros) -libraryDependencies ++= +libraryDependencies ++= scalaTcDeps ++ - testDeps + testDeps // // test, runtime settings // -fork in run := true -fork in Test := true +fork in run := true +fork in Test := true parallelExecution in Test := true +pomExtra := pomExtraInfo diff --git a/data-tc-scala/src/main/scala/fif/ArrayData.scala b/data-tc-scala/src/main/scala/fif/ArrayData.scala index 5aa18c5..ecf24d4 100644 --- a/data-tc-scala/src/main/scala/fif/ArrayData.scala +++ b/data-tc-scala/src/main/scala/fif/ArrayData.scala @@ -1,6 +1,6 @@ package fif -import scala.language.{ higherKinds, implicitConversions } +import scala.language.{higherKinds, implicitConversions} import scala.reflect.ClassTag object ArrayData extends Data[Array] { @@ -9,7 +9,8 @@ object ArrayData extends Data[Array] { override def map[A, B: ClassTag](data: Array[A])(f: (A) => B): Array[B] = data.map(f) - override def mapParition[A, B: ClassTag](d: Array[A])(f: Iterable[A] => Iterable[B]): Array[B] = + override def mapParition[A, B: ClassTag](d: Array[A])( + f: Iterable[A] => Iterable[B]): Array[B] = f(d.toIterable).toArray /** Apply a side-effecting function to each element. */ @@ -23,11 +24,13 @@ object ArrayData extends Data[Array] { override def filter[A](d: Array[A])(f: A => Boolean): Array[A] = d.filter(f) - override def aggregate[A, B: ClassTag](d: Array[A])(zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = + override def aggregate[A, B: ClassTag](d: Array[A])( + zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = d.aggregate(zero)(seqOp, combOp) /** Sort the dataset using a function f that evaluates each element to an orderable type */ - override def sortBy[A, B: ClassTag](d: Array[A])(f: (A) => B)(implicit ord: math.Ordering[B]): Array[A] = + override def sortBy[A, B: ClassTag](d: Array[A])(f: (A) => B)( + implicit ord: math.Ordering[B]): Array[A] = d.sortBy(f) /** Construct a traversable for the first k elements of a dataset. Will load into main mem. */ @@ -41,13 +44,16 @@ object ArrayData extends Data[Array] { override def toSeq[A](d: Array[A]): Seq[A] = d.toSeq - override def flatMap[A, B: ClassTag](d: Array[A])(f: A => TraversableOnce[B]): Array[B] = + override def flatMap[A, B: ClassTag](d: Array[A])( + f: A => TraversableOnce[B]): Array[B] = d.flatMap(f) - override def flatten[A, B: ClassTag](d: Array[A])(implicit asTraversable: A => TraversableOnce[B]): Array[B] = + override def flatten[A, B: ClassTag](d: Array[A])( + implicit asTraversable: A => TraversableOnce[B]): Array[B] = d.flatMap(asTraversable) - override def groupBy[A, B: ClassTag](d: Array[A])(f: A => B): Array[(B, Iterable[A])] = + override def groupBy[A, B: ClassTag](d: Array[A])( + f: A => B): Array[(B, Iterable[A])] = d.groupBy(f).map { case (a, b) => (a, b.toIterable) }.toArray /** This has type A as opposed to B >: A due to the RDD limitations */ @@ -60,10 +66,11 @@ object ArrayData extends Data[Array] { override def isEmpty[A](d: Array[A]): Boolean = d.isEmpty - override def zip[A, B: ClassTag](d: Array[A])(that: Array[B]): Array[(A, B)] = + override def zip[A, B: ClassTag](d: Array[A])( + that: Array[B]): Array[(A, B)] = d.zip(that) override def zipWithIndex[A](d: Array[A]): Array[(A, Long)] = d.zipWithIndex.map { case (a, i) => (a, i.toLong) } -} \ No newline at end of file +} diff --git a/data-tc-scala/src/main/scala/fif/Data.scala b/data-tc-scala/src/main/scala/fif/Data.scala index 03a3e35..d88e4bd 100644 --- a/data-tc-scala/src/main/scala/fif/Data.scala +++ b/data-tc-scala/src/main/scala/fif/Data.scala @@ -6,11 +6,12 @@ import scala.reflect.ClassTag import simulacrum._ /** - * Trait that abstractly represents operations that can be performed on a dataset. - * The implementation of Data is suitable for both large-scale, distributed data - * or in-memory structures. - */ -@typeclass trait Data[D[_]] extends Serializable { + * Trait that abstractly represents operations that can be performed on a dataset. + * The implementation of Data is suitable for both large-scale, distributed data + * or in-memory structures. + */ +@typeclass +trait Data[D[_]] extends Serializable { /** Transform a dataset by applying f to each element. */ def map[A, B: ClassTag](d: D[A])(f: A => B): D[B] @@ -25,13 +26,15 @@ import simulacrum._ def filter[A](d: D[A])(f: A => Boolean): D[A] /** - * Starting from a defined zero value, perform an operation seqOp on each element - * of a dataset. Combine results of seqOp using combOp for a final value. - */ - def aggregate[A, B: ClassTag](d: D[A])(zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B + * Starting from a defined zero value, perform an operation seqOp on each element + * of a dataset. Combine results of seqOp using combOp for a final value. + */ + def aggregate[A, B: ClassTag](d: D[A])(zero: B)(seqOp: (B, A) => B, + combOp: (B, B) => B): B /** Sort the dataset using a function f that evaluates each element to an orderable type */ - def sortBy[A, B: ClassTag](d: D[A])(f: (A) => B)(implicit ord: math.Ordering[B]): D[A] + def sortBy[A, B: ClassTag](d: D[A])(f: (A) => B)( + implicit ord: math.Ordering[B]): D[A] /** Construct a traversable for the first k elements of a dataset. Will load into main mem. */ def take[A](d: D[A])(k: Int): Traversable[A] @@ -43,7 +46,8 @@ import simulacrum._ def flatMap[A, B: ClassTag](d: D[A])(f: A => TraversableOnce[B]): D[B] - def flatten[A, B: ClassTag](d: D[A])(implicit asTraversable: A => TraversableOnce[B]): D[B] + def flatten[A, B: ClassTag](d: D[A])( + implicit asTraversable: A => TraversableOnce[B]): D[B] def groupBy[A, B: ClassTag](d: D[A])(f: A => B): D[(B, Iterable[A])] @@ -58,4 +62,3 @@ import simulacrum._ def zipWithIndex[A](d: D[A]): D[(A, Long)] } - diff --git a/data-tc-scala/src/main/scala/fif/SeqData.scala b/data-tc-scala/src/main/scala/fif/SeqData.scala index 9618b24..508cd94 100644 --- a/data-tc-scala/src/main/scala/fif/SeqData.scala +++ b/data-tc-scala/src/main/scala/fif/SeqData.scala @@ -1,6 +1,6 @@ package fif -import scala.language.{ higherKinds, implicitConversions } +import scala.language.{higherKinds, implicitConversions} import scala.reflect.ClassTag object SeqData extends Data[Seq] { @@ -9,7 +9,8 @@ object SeqData extends Data[Seq] { override def map[A, B: ClassTag](data: Seq[A])(f: (A) => B): Seq[B] = data.map(f) - override def mapParition[A, B: ClassTag](d: Seq[A])(f: Iterable[A] => Iterable[B]): Seq[B] = + override def mapParition[A, B: ClassTag](d: Seq[A])( + f: Iterable[A] => Iterable[B]): Seq[B] = f(d.toIterable).toSeq /** Apply a side-effecting function to each element. */ @@ -23,11 +24,13 @@ object SeqData extends Data[Seq] { override def filter[A](d: Seq[A])(f: A => Boolean): Seq[A] = d.filter(f) - override def aggregate[A, B: ClassTag](d: Seq[A])(zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = + override def aggregate[A, B: ClassTag](d: Seq[A])( + zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = d.aggregate(zero)(seqOp, combOp) /** Sort the dataset using a function f that evaluates each element to an orderable type */ - override def sortBy[A, B: ClassTag](d: Seq[A])(f: (A) => B)(implicit ord: math.Ordering[B]): Seq[A] = + override def sortBy[A, B: ClassTag](d: Seq[A])(f: (A) => B)( + implicit ord: math.Ordering[B]): Seq[A] = d.toSeq.sortBy(f) /** Construct a traversable for the first k elements of a dataset. Will load into main mem. */ @@ -41,13 +44,16 @@ object SeqData extends Data[Seq] { override def toSeq[A](d: Seq[A]): Seq[A] = d.toSeq - override def flatMap[A, B: ClassTag](d: Seq[A])(f: A => TraversableOnce[B]): Seq[B] = + override def flatMap[A, B: ClassTag](d: Seq[A])( + f: A => TraversableOnce[B]): Seq[B] = d.flatMap(f) - override def flatten[A, B: ClassTag](d: Seq[A])(implicit asTraversable: A => TraversableOnce[B]): Seq[B] = + override def flatten[A, B: ClassTag](d: Seq[A])( + implicit asTraversable: A => TraversableOnce[B]): Seq[B] = d.flatten - override def groupBy[A, B: ClassTag](d: Seq[A])(f: A => B): Seq[(B, Iterable[A])] = + override def groupBy[A, B: ClassTag](d: Seq[A])( + f: A => B): Seq[(B, Iterable[A])] = d.groupBy(f).toSeq.map { case (a, b) => (a, b.toIterable) } /** This has type A as opposed to B >: A due to the RDD limitations */ diff --git a/data-tc-scala/src/main/scala/fif/TravData.scala b/data-tc-scala/src/main/scala/fif/TravData.scala index 4e50cf3..a073513 100644 --- a/data-tc-scala/src/main/scala/fif/TravData.scala +++ b/data-tc-scala/src/main/scala/fif/TravData.scala @@ -1,33 +1,38 @@ package fif -import scala.language.{ implicitConversions, higherKinds } +import scala.language.{implicitConversions, higherKinds} import scala.reflect.ClassTag object TravData extends Data[Traversable] { /** Transform a dataset by applying f to each element. */ - override def map[A, B: ClassTag](data: Traversable[A])(f: (A) => B): Traversable[B] = + override def map[A, B: ClassTag](data: Traversable[A])( + f: (A) => B): Traversable[B] = data.map(f) - override def mapParition[A, B: ClassTag](d: Traversable[A])(f: Iterable[A] => Iterable[B]): Traversable[B] = + override def mapParition[A, B: ClassTag](d: Traversable[A])( + f: Iterable[A] => Iterable[B]): Traversable[B] = f(d.toIterable).toTraversable /** Apply a side-effecting function to each element. */ override def foreach[A](d: Traversable[A])(f: A => Any): Unit = d.foreach(f) - override def foreachPartition[A](d: Traversable[A])(f: Iterable[A] => Any): Unit = { + override def foreachPartition[A](d: Traversable[A])( + f: Iterable[A] => Any): Unit = { val _ = f(d.toIterable) } override def filter[A](d: Traversable[A])(f: A => Boolean): Traversable[A] = d.filter(f) - override def aggregate[A, B: ClassTag](d: Traversable[A])(zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = + override def aggregate[A, B: ClassTag](d: Traversable[A])( + zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = d.aggregate(zero)(seqOp, combOp) /** Sort the dataset using a function f that evaluates each element to an orderable type */ - override def sortBy[A, B: ClassTag](d: Traversable[A])(f: (A) => B)(implicit ord: math.Ordering[B]): Traversable[A] = + override def sortBy[A, B: ClassTag](d: Traversable[A])(f: (A) => B)( + implicit ord: math.Ordering[B]): Traversable[A] = d.toSeq.sortBy(f) /** Construct a traversable for the first k elements of a dataset. Will load into main mem. */ @@ -41,13 +46,16 @@ object TravData extends Data[Traversable] { override def toSeq[A](d: Traversable[A]): Seq[A] = d.toSeq - override def flatMap[A, B: ClassTag](d: Traversable[A])(f: A => TraversableOnce[B]): Traversable[B] = + override def flatMap[A, B: ClassTag](d: Traversable[A])( + f: A => TraversableOnce[B]): Traversable[B] = d.flatMap(f) - override def flatten[A, B: ClassTag](d: Traversable[A])(implicit asTraversable: A => TraversableOnce[B]): Traversable[B] = + override def flatten[A, B: ClassTag](d: Traversable[A])( + implicit asTraversable: A => TraversableOnce[B]): Traversable[B] = d.flatten - override def groupBy[A, B: ClassTag](d: Traversable[A])(f: A => B): Traversable[(B, Iterable[A])] = + override def groupBy[A, B: ClassTag](d: Traversable[A])( + f: A => B): Traversable[(B, Iterable[A])] = d.groupBy(f).toTraversable.map { case (a, b) => (a, b.toIterable) } /** This has type A as opposed to B >: A due to the RDD limitations */ @@ -60,7 +68,8 @@ object TravData extends Data[Traversable] { override def isEmpty[A](d: Traversable[A]): Boolean = d.isEmpty - override def zip[A, B: ClassTag](d: Traversable[A])(that: Traversable[B]): Traversable[(A, B)] = + override def zip[A, B: ClassTag](d: Traversable[A])( + that: Traversable[B]): Traversable[(A, B)] = d.toSeq.zip(that.toSeq) override def zipWithIndex[A](d: Traversable[A]): Traversable[(A, Long)] = diff --git a/data-tc-scala/src/test/scala/fif/ArrayDataTest.scala b/data-tc-scala/src/test/scala/fif/ArrayDataTest.scala index e65a914..a723b66 100644 --- a/data-tc-scala/src/test/scala/fif/ArrayDataTest.scala +++ b/data-tc-scala/src/test/scala/fif/ArrayDataTest.scala @@ -11,4 +11,4 @@ class ArrayDataTest extends CollectionDataTest[Array] { override val empty = Array.empty[Int] -} \ No newline at end of file +} diff --git a/data-tc-scala/src/test/scala/fif/CollectionDataTest.scala b/data-tc-scala/src/test/scala/fif/CollectionDataTest.scala index 4b27529..a6a5ccd 100644 --- a/data-tc-scala/src/test/scala/fif/CollectionDataTest.scala +++ b/data-tc-scala/src/test/scala/fif/CollectionDataTest.scala @@ -27,7 +27,9 @@ protected trait CollectionDataTest[D[_]] extends FunSuite { test("mapPartition") { def mapParition10(data: D[Int]): D[Int] = - data.mapParition { elements => elements.map(_ + 10) } + data.mapParition { elements => + elements.map(_ + 10) + } val changed = mapParition10(data) assert(changed.toSeq !== data) @@ -69,7 +71,10 @@ protected trait CollectionDataTest[D[_]] extends FunSuite { test("take") { def testTake(data: D[Int]): Boolean = - data.take(1) == Seq(1) && data.take(2) == Seq(1, 2) && data.take(3) == Seq(1, 2, 3) + data.take(1) == Seq(1) && data.take(2) == Seq(1, 2) && data.take(3) == Seq( + 1, + 2, + 3) assert(testTake(data)) assert(data.take(0).toSeq === Seq.empty[Int]) @@ -176,4 +181,4 @@ protected trait CollectionDataTest[D[_]] extends FunSuite { assert(foo(data).toSeq === Seq((1, 1), (2, 2), (3, 3))) } -} \ No newline at end of file +} diff --git a/data-tc-scala/src/test/scala/fif/ImplicitCollectionsDataTest.scala b/data-tc-scala/src/test/scala/fif/ImplicitCollectionsDataTest.scala index 4f52876..f41df15 100644 --- a/data-tc-scala/src/test/scala/fif/ImplicitCollectionsDataTest.scala +++ b/data-tc-scala/src/test/scala/fif/ImplicitCollectionsDataTest.scala @@ -24,4 +24,4 @@ class ImplicitCollectionsDataTest extends FunSuite { def simpleTest[D[_]: Data](data: D[Int]): Unit = data.foreach(x => assert(x > 0 && x < 10)) -} \ No newline at end of file +} diff --git a/data-tc-scala/src/test/scala/fif/SeqDataTest.scala b/data-tc-scala/src/test/scala/fif/SeqDataTest.scala index 3ffeb19..7a2d528 100644 --- a/data-tc-scala/src/test/scala/fif/SeqDataTest.scala +++ b/data-tc-scala/src/test/scala/fif/SeqDataTest.scala @@ -11,4 +11,4 @@ class SeqDataTest extends CollectionDataTest[Seq] { override val empty = Seq.empty[Int] -} \ No newline at end of file +} diff --git a/data-tc-scala/src/test/scala/fif/TravDataTest.scala b/data-tc-scala/src/test/scala/fif/TravDataTest.scala index faa20cc..41a622a 100644 --- a/data-tc-scala/src/test/scala/fif/TravDataTest.scala +++ b/data-tc-scala/src/test/scala/fif/TravDataTest.scala @@ -11,4 +11,4 @@ class TravDataTest extends CollectionDataTest[Traversable] { override val empty = Traversable.empty[Int] -} \ No newline at end of file +} diff --git a/data-tc-spark/build.sbt b/data-tc-spark/build.sbt index 260bcfb..aff17f5 100644 --- a/data-tc-spark/build.sbt +++ b/data-tc-spark/build.sbt @@ -2,23 +2,24 @@ name := "data-tc-spark" import SharedBuild._ -com.typesafe.sbt.SbtScalariform.defaultScalariformSettings -ScalariformKeys.preferences := sharedCodeFmt - // >>= scalacOptions := { val badOptionsWhenUsingSpark151 = Set("-Yopt:_") - scalacOptions.value.filter { opt => !badOptionsWhenUsingSpark151.contains(opt) } + scalacOptions.value.filter { opt => + !badOptionsWhenUsingSpark151.contains(opt) + } } addCompilerPlugin(scalaMacros) -libraryDependencies ++= +libraryDependencies ++= sparkTcDeps ++ - testDeps + testDeps // test & misc. configuration // -fork in Test := false +fork in Test := false parallelExecution in Test := false -fork in run := false +fork in run := false + +pomExtra := pomExtraInfo diff --git a/data-tc-spark/src/main/scala/fif/ImplicitRddData.scala b/data-tc-spark/src/main/scala/fif/ImplicitRddData.scala index ae35cbf..b923c63 100644 --- a/data-tc-spark/src/main/scala/fif/ImplicitRddData.scala +++ b/data-tc-spark/src/main/scala/fif/ImplicitRddData.scala @@ -6,4 +6,4 @@ object ImplicitRddData extends Serializable { implicit val rddIsData: Data[RDD] = RddData -} \ No newline at end of file +} diff --git a/data-tc-spark/src/main/scala/fif/RddData.scala b/data-tc-spark/src/main/scala/fif/RddData.scala index 0d8cc1a..4d7ee5f 100644 --- a/data-tc-spark/src/main/scala/fif/RddData.scala +++ b/data-tc-spark/src/main/scala/fif/RddData.scala @@ -2,7 +2,7 @@ package fif import org.apache.spark.rdd.RDD -import scala.language.{ higherKinds, implicitConversions } +import scala.language.{higherKinds, implicitConversions} import scala.reflect.ClassTag import scala.util.Try @@ -12,7 +12,8 @@ object RddData extends Data[RDD] with Serializable { override def map[A, B: ClassTag](data: RDD[A])(f: (A) => B): RDD[B] = data.map(f) - override def mapParition[A, B: ClassTag](d: RDD[A])(f: Iterable[A] => Iterable[B]): RDD[B] = + override def mapParition[A, B: ClassTag](d: RDD[A])( + f: Iterable[A] => Iterable[B]): RDD[B] = d.mapPartitions { partition => f(partition.toIterable).toIterator } @@ -33,11 +34,13 @@ object RddData extends Data[RDD] with Serializable { override def filter[A](d: RDD[A])(f: A => Boolean): RDD[A] = d.filter(f) - override def aggregate[A, B: ClassTag](d: RDD[A])(zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = + override def aggregate[A, B: ClassTag](d: RDD[A])( + zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = d.aggregate(zero)(seqOp, combOp) /** Sort the dataset using a function f that evaluates each element to an orderable type */ - override def sortBy[A, B: ClassTag](d: RDD[A])(f: (A) ⇒ B)(implicit ord: math.Ordering[B]): RDD[A] = + override def sortBy[A, B: ClassTag](d: RDD[A])(f: (A) ⇒ B)( + implicit ord: math.Ordering[B]): RDD[A] = d.sortBy(f) /** Construct a traversable for the first k elements of a dataset. Will load into main mem. */ @@ -51,13 +54,16 @@ object RddData extends Data[RDD] with Serializable { override def toSeq[A](d: RDD[A]): Seq[A] = d.collect().toSeq - override def flatMap[A, B: ClassTag](d: RDD[A])(f: A => TraversableOnce[B]): RDD[B] = + override def flatMap[A, B: ClassTag](d: RDD[A])( + f: A => TraversableOnce[B]): RDD[B] = d.flatMap(f) - override def flatten[A, B: ClassTag](d: RDD[A])(implicit asRDD: A => TraversableOnce[B]): RDD[B] = + override def flatten[A, B: ClassTag](d: RDD[A])( + implicit asRDD: A => TraversableOnce[B]): RDD[B] = d.flatMap(asRDD) - override def groupBy[A, B: ClassTag](d: RDD[A])(f: A => B): RDD[(B, Iterable[A])] = + override def groupBy[A, B: ClassTag](d: RDD[A])( + f: A => B): RDD[(B, Iterable[A])] = d.groupBy(f).map { case (a, b) => (a, b) } /** This has type A as opposed to B >: A due to the RDD limitations */ @@ -76,4 +82,4 @@ object RddData extends Data[RDD] with Serializable { override def zipWithIndex[A](d: RDD[A]): RDD[(A, Long)] = d.zipWithIndex() -} \ No newline at end of file +} diff --git a/data-tc-spark/src/main/scala/fif/spark/KryoSerializationWrapper.scala b/data-tc-spark/src/main/scala/fif/spark/KryoSerializationWrapper.scala index 02f7ef4..f78d4c8 100644 --- a/data-tc-spark/src/main/scala/fif/spark/KryoSerializationWrapper.scala +++ b/data-tc-spark/src/main/scala/fif/spark/KryoSerializationWrapper.scala @@ -3,18 +3,18 @@ package fif.spark import scala.reflect.ClassTag /** - * Wraps a value of an unserialized type T in a KryoSerializationWrapper[T], - * which gives one a way to serialize T. - * - * - * NOTE: - * The vast majority of this code is copied / based off of the classes with the same - * name in the Apache Shark project. - * - * Original file is here (accessed on April 20, 2015): - * https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala - * - */ + * Wraps a value of an unserialized type T in a KryoSerializationWrapper[T], + * which gives one a way to serialize T. + * + * + * NOTE: + * The vast majority of this code is copied / based off of the classes with the same + * name in the Apache Shark project. + * + * Original file is here (accessed on April 20, 2015): + * https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala + * + */ object KryoSerializationWrapper extends Serializable { def apply[T: ClassTag](value: T): KryoSerializationWrapper[T] = @@ -22,19 +22,20 @@ object KryoSerializationWrapper extends Serializable { } /** - * A wrapper around some unserializable objects that make them both Java - * serializable. Internally, Kryo is used for serialization. - * - * Use KryoSerializationWrapper(value) to create a wrapper. - * - * Note that the value contained in the wrapper is mutable. It must be - * initialized using Java Serialization (which calls a private readObject - * method that does the byte-by-byte deserialization). - * - * Also note that this class is both abstract and sealed. The only valid place - * to create such a wrapper is the companion object's apply method. - */ -sealed abstract class KryoSerializationWrapper[T: ClassTag] extends Serializable { + * A wrapper around some unserializable objects that make them both Java + * serializable. Internally, Kryo is used for serialization. + * + * Use KryoSerializationWrapper(value) to create a wrapper. + * + * Note that the value contained in the wrapper is mutable. It must be + * initialized using Java Serialization (which calls a private readObject + * method that does the byte-by-byte deserialization). + * + * Also note that this class is both abstract and sealed. The only valid place + * to create such a wrapper is the companion object's apply method. + */ +sealed abstract class KryoSerializationWrapper[T: ClassTag] + extends Serializable { // the wrapped value // MUST BE TRANSIENT SO THAT IT IS NOT SERIALIZED @@ -45,8 +46,8 @@ sealed abstract class KryoSerializationWrapper[T: ClassTag] extends Serializable private var valueSerialized: Array[Byte] = _ /** - * The only valid constructor. For safety, do not use the no-arg constructor. - */ + * The only valid constructor. For safety, do not use the no-arg constructor. + */ def this(initialValue: T) = { this() this.value = initialValue @@ -57,11 +58,11 @@ sealed abstract class KryoSerializationWrapper[T: ClassTag] extends Serializable value /** - * Gets the currently serialized value as a Sequence of bytes. - * - * If the sequence is empty, then it means that one has not called doSerializeValue(). - * Or the internal value may be null. - */ + * Gets the currently serialized value as a Sequence of bytes. + * + * If the sequence is empty, then it means that one has not called doSerializeValue(). + * Or the internal value may be null. + */ def getValueSerialized: Seq[Byte] = valueSerialized.toSeq @@ -74,4 +75,4 @@ sealed abstract class KryoSerializationWrapper[T: ClassTag] extends Serializable in.defaultReadObject() this.value = KryoSerializer.deserialize[T](this.valueSerialized) } -} \ No newline at end of file +} diff --git a/data-tc-spark/src/main/scala/fif/spark/KryoSerializer.scala b/data-tc-spark/src/main/scala/fif/spark/KryoSerializer.scala index 723c769..9128772 100644 --- a/data-tc-spark/src/main/scala/fif/spark/KryoSerializer.scala +++ b/data-tc-spark/src/main/scala/fif/spark/KryoSerializer.scala @@ -2,31 +2,29 @@ package fif.spark import java.nio.ByteBuffer -import org.apache.spark.serializer.{ KryoSerializer => SparkKryoSerializer } -import org.apache.spark.{ SparkConf, SparkEnv } +import org.apache.spark.serializer.{KryoSerializer => SparkKryoSerializer} +import org.apache.spark.{SparkConf, SparkEnv} import scala.reflect.ClassTag /** - * JVM object serialization using Kryo. This is much more efficient, but Kryo - * sometimes is buggy to use. We use this mainly to serialize the object - * inspectors. - * - * - * NOTE: - * The vast majority of this code is copied / based off of the classes with the same - * name in the Apache Shark project. - * - * Original file is here: - * https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala - */ + * JVM object serialization using Kryo. This is much more efficient, but Kryo + * sometimes is buggy to use. We use this mainly to serialize the object + * inspectors. + * + * + * NOTE: + * The vast majority of this code is copied / based off of the classes with the same + * name in the Apache Shark project. + * + * Original file is here: + * https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala + */ object KryoSerializer extends Serializable { @transient private[this] lazy val s = new SparkKryoSerializer( - Option(SparkEnv.get) - .map(_.conf) - .getOrElse(new SparkConf()) + Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) ) def serialize[T: ClassTag](o: T): Array[Byte] = @@ -35,4 +33,4 @@ object KryoSerializer extends Serializable { def deserialize[T: ClassTag](bytes: Array[Byte]): T = s.newInstance().deserialize[T](ByteBuffer.wrap(bytes)) -} \ No newline at end of file +} diff --git a/data-tc-spark/src/main/scala/fif/spark/RddSerializedOps.scala b/data-tc-spark/src/main/scala/fif/spark/RddSerializedOps.scala index 4fd5e7f..112e428 100644 --- a/data-tc-spark/src/main/scala/fif/spark/RddSerializedOps.scala +++ b/data-tc-spark/src/main/scala/fif/spark/RddSerializedOps.scala @@ -5,29 +5,29 @@ import org.apache.spark.rdd.RDD import scala.reflect.ClassTag /** - * Methods that either wrap, or operate on wrapped, values so that - * common RDD operations are available with a natural, functional syntax. - * - * Let's look at Map as an example: - * - * {{{ - * // implemented using librray that is not extendable and doesn't implement Serialzable - * val f: A => B = ... - * - * // can be anywhere, error will occur even if in local mode - * val data: RDD[A] = ... - * - * // cannot do - * data.map(f) - * // runtime exception :( - * // as f does not implement Serializable - * - * // instead do - * Map(f)(data) - * // will serialize it using Kryo and safely - * // deserialize to perform map on the data RDD - * }}} - */ + * Methods that either wrap, or operate on wrapped, values so that + * common RDD operations are available with a natural, functional syntax. + * + * Let's look at Map as an example: + * + * {{{ + * // implemented using librray that is not extendable and doesn't implement Serialzable + * val f: A => B = ... + * + * // can be anywhere, error will occur even if in local mode + * val data: RDD[A] = ... + * + * // cannot do + * data.map(f) + * // runtime exception :( + * // as f does not implement Serializable + * + * // instead do + * Map(f)(data) + * // will serialize it using Kryo and safely + * // deserialize to perform map on the data RDD + * }}} + */ object RddSerializedOps extends Serializable { object Map extends Serializable { @@ -35,7 +35,8 @@ object RddSerializedOps extends Serializable { def apply[A, B: ClassTag](f: A => B): (RDD[A] => RDD[B]) = apply(KryoSerializationWrapper(f)) - def apply[A, B: ClassTag](fnSerialized: KryoSerializationWrapper[A => B]): (RDD[A] => RDD[B]) = + def apply[A, B: ClassTag]( + fnSerialized: KryoSerializationWrapper[A => B]): (RDD[A] => RDD[B]) = (data: RDD[A]) => data.mapPartitions(partition => { val f = fnSerialized.getValue @@ -48,7 +49,9 @@ object RddSerializedOps extends Serializable { def apply[A, B: ClassTag](f: A => TraversableOnce[B]): (RDD[A] => RDD[B]) = apply(KryoSerializationWrapper(f)) - def apply[A, B: ClassTag](fnSerialized: KryoSerializationWrapper[A => TraversableOnce[B]]): (RDD[A] => RDD[B]) = + def apply[A, B: ClassTag]( + fnSerialized: KryoSerializationWrapper[A => TraversableOnce[B]]) + : (RDD[A] => RDD[B]) = (data: RDD[A]) => data.mapPartitions(partition => { val f = fnSerialized.getValue @@ -61,7 +64,8 @@ object RddSerializedOps extends Serializable { def apply[A](f: A => Any): (RDD[A] => Unit) = apply(KryoSerializationWrapper(f)) - def apply[A](fnSerialized: KryoSerializationWrapper[A => Any]): (RDD[A] => Unit) = + def apply[A]( + fnSerialized: KryoSerializationWrapper[A => Any]): (RDD[A] => Unit) = (data: RDD[A]) => data.foreachPartition(partition => { val f = fnSerialized.getValue @@ -71,27 +75,29 @@ object RddSerializedOps extends Serializable { object Aggregate extends Serializable { - def apply[A, B: ClassTag](zero: B, seqOp: (B, A) => B, combOp: (B, B) => B): (RDD[A] => B) = - apply(zero, KryoSerializationWrapper(seqOp), KryoSerializationWrapper(combOp)) + def apply[A, B: ClassTag](zero: B, + seqOp: (B, A) => B, + combOp: (B, B) => B): (RDD[A] => B) = + apply(zero, + KryoSerializationWrapper(seqOp), + KryoSerializationWrapper(combOp)) def apply[A, B: ClassTag]( - zero: B, - serSeqOp: KryoSerializationWrapper[(B, A) => B], - serCombOp: KryoSerializationWrapper[(B, B) => B]): (RDD[A] => B) = - + zero: B, + serSeqOp: KryoSerializationWrapper[(B, A) => B], + serCombOp: KryoSerializationWrapper[(B, B) => B]): (RDD[A] => B) = (data: RDD[A]) => data.aggregate(zero)( { case (b, a) => val f = serSeqOp.getValue f(b, a) - }, - { + }, { case (b1, b2) => val f = serCombOp.getValue f(b1, b2) } - ) + ) } -} \ No newline at end of file +} diff --git a/data-tc-spark/src/main/scala/fif/spark/avroparquet/RddHelpers.scala b/data-tc-spark/src/main/scala/fif/spark/avroparquet/RddHelpers.scala index 06a00b0..de08004 100644 --- a/data-tc-spark/src/main/scala/fif/spark/avroparquet/RddHelpers.scala +++ b/data-tc-spark/src/main/scala/fif/spark/avroparquet/RddHelpers.scala @@ -1,6 +1,10 @@ package fif.spark.avroparquet -import com.nitro.scalaAvro.runtime.{ GeneratedMessage, Message, GeneratedMessageCompanion } +import com.nitro.scalaAvro.runtime.{ + GeneratedMessage, + Message, + GeneratedMessageCompanion +} import fif.spark.RddSerializedOps import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job @@ -8,7 +12,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import parquet.avro._ -import parquet.hadoop.{ ParquetOutputFormat, ParquetInputFormat } +import parquet.hadoop.{ParquetOutputFormat, ParquetInputFormat} import scala.language.postfixOps import scala.reflect.ClassTag @@ -29,30 +33,32 @@ object RddHelpers extends Serializable { (null, x) /** - * A unique path (either local or network) to a file or resource. - */ + * A unique path (either local or network) to a file or resource. + */ type Path = String def getRdd[K: ClassTag, W: ClassTag, F <: FileInputFormat[K, W]: ClassTag]( - sc: SparkContext + sc: SparkContext )( - p: Path + p: Path ): RDD[(K, W)] = sc.newAPIHadoopFile[K, W, F](p) /** - * Sets ParquetInputFormat's read support for a type of AvroReadSupport[V]. - * Evaluates to an RDD containg Vs, using Parquet + Avro for reading. - */ - def rddFromParquet[V <: GeneratedMessage with Message[V]: ClassTag: GeneratedMessageCompanion]( - sc: SparkContext + * Sets ParquetInputFormat's read support for a type of AvroReadSupport[V]. + * Evaluates to an RDD containg Vs, using Parquet + Avro for reading. + */ + def rddFromParquet[ + V <: GeneratedMessage with Message[V]: ClassTag: GeneratedMessageCompanion]( + sc: SparkContext )( - p: Path + p: Path ): RDD[V] = { // protect against deprecated error...can't get around not using Job val job = Job.getInstance(new Configuration()) - ParquetInputFormat.setReadSupportClass(job, classOf[GenericAvroReadSupport[V]]) + ParquetInputFormat + .setReadSupportClass(job, classOf[GenericAvroReadSupport[V]]) job.getConfiguration.set( GenericAvroReadSupport.HAS_GENERIC_RECORD_KEY, @@ -60,8 +66,7 @@ object RddHelpers extends Serializable { ) // load up that RDD! - sc - .newAPIHadoopFile( + sc.newAPIHadoopFile( p, classOf[ParquetInputFormat[V]], classOf[Void], @@ -75,7 +80,9 @@ object RddHelpers extends Serializable { .map { case (_, value) => value } } - def saveRddAsParquet[V <: GeneratedMessage with Message[V]: ClassTag: GeneratedMessageCompanion](sc: SparkContext)(p: Path)(data: RDD[V]): Unit = { + def saveRddAsParquet[ + V <: GeneratedMessage with Message[V]: ClassTag: GeneratedMessageCompanion]( + sc: SparkContext)(p: Path)(data: RDD[V]): Unit = { // protect against deprecated error...can't get around not using Job val job = Job.getInstance(new Configuration()) @@ -85,7 +92,8 @@ object RddHelpers extends Serializable { implicitly[GeneratedMessageCompanion[V]].schema ) - val mapF = RddSerializedOps.Map(implicitly[GeneratedMessageCompanion[V]].toMutable _) + val mapF = RddSerializedOps.Map( + implicitly[GeneratedMessageCompanion[V]].toMutable _) mapF(data) .map(asVoidTuple) @@ -98,4 +106,4 @@ object RddHelpers extends Serializable { ) } -} \ No newline at end of file +} diff --git a/data-tc-spark/src/test/scala/fif/RddDataTest.scala b/data-tc-spark/src/test/scala/fif/RddDataTest.scala index 7d87132..eb7ae58 100644 --- a/data-tc-spark/src/test/scala/fif/RddDataTest.scala +++ b/data-tc-spark/src/test/scala/fif/RddDataTest.scala @@ -1,9 +1,9 @@ package fif import com.holdenkarau.spark.testing.SharedSparkContext -import org.apache.spark.{ SparkConf, SparkContext } +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD -import org.scalatest.{ BeforeAndAfterAll, FunSuite } +import org.scalatest.{BeforeAndAfterAll, FunSuite} import scala.language.higherKinds import scala.reflect.ClassTag @@ -47,7 +47,9 @@ class RddDataTest extends FunSuite with SharedSparkContext with Serializable { test("mapPartition") { def mapParition10[D[_]: Data](data: D[Int]): D[Int] = - data.mapParition { elements => elements.map(_ + 10) } + data.mapParition { elements => + elements.map(_ + 10) + } val changed = mapParition10(data) assert(changed !== data) @@ -95,7 +97,10 @@ class RddDataTest extends FunSuite with SharedSparkContext with Serializable { test("take") { def testTake[D[_]: Data](data: D[Int]): Boolean = - data.take(1) == Seq(1) && data.take(2) == Seq(1, 2) && data.take(3) == Seq(1, 2, 3) + data.take(1) == Seq(1) && data.take(2) == Seq(1, 2) && data.take(3) == Seq( + 1, + 2, + 3) assert(testTake(data)) } @@ -132,7 +137,9 @@ class RddDataTest extends FunSuite with SharedSparkContext with Serializable { test("groupBy") { def groupIt[D[_]: Data](data: D[Int]): D[(Boolean, Iterable[Int])] = - data.groupBy { n => n % 2 == 0 } + data.groupBy { n => + n % 2 == 0 + } val evenGroup = groupIt(data).toSeq.toMap @@ -209,4 +216,4 @@ class RddDataTest extends FunSuite with SharedSparkContext with Serializable { // assert(ToMap(data.map(x => (x, x))) === Map(1 -> 1, 2 -> 2, 3 -> 3)) // } -} \ No newline at end of file +} diff --git a/data-tc-spark/src/test/scala/fif/spark/KryoSerializationTest.scala b/data-tc-spark/src/test/scala/fif/spark/KryoSerializationTest.scala index 4ee70ac..c31face 100644 --- a/data-tc-spark/src/test/scala/fif/spark/KryoSerializationTest.scala +++ b/data-tc-spark/src/test/scala/fif/spark/KryoSerializationTest.scala @@ -3,21 +3,20 @@ package fif.spark import org.scalatest.FunSuite /** - * Tests KryoSerializationWrapper's correctness and attempts - * to test for presence of race conditions. - */ + * Tests KryoSerializationWrapper's correctness and attempts + * to test for presence of race conditions. + */ class KryoSerializationTest extends FunSuite { test("test simple Kryo serialization with wrapper class") { val serialized = KryoSerializationWrapper(KryoSerializationTest) - serialized.getValue.dumbData - .foreach(x => { - val ba = serialized.getValue.foo(x) - val serBa = KryoSerializationWrapper(ba) - assert(ba == serBa.getValue) - }) + serialized.getValue.dumbData.foreach(x => { + val ba = serialized.getValue.foo(x) + val serBa = KryoSerializationWrapper(ba) + assert(ba == serBa.getValue) + }) } } @@ -30,4 +29,4 @@ object KryoSerializationTest { def foo(x: String) = new BadApple("foo foo!") -} \ No newline at end of file +} diff --git a/data-tc-spark/src/test/scala/fif/spark/RddSerializedOpsTest.scala b/data-tc-spark/src/test/scala/fif/spark/RddSerializedOpsTest.scala index ba02af9..2eabef2 100644 --- a/data-tc-spark/src/test/scala/fif/spark/RddSerializedOpsTest.scala +++ b/data-tc-spark/src/test/scala/fif/spark/RddSerializedOpsTest.scala @@ -4,21 +4,20 @@ import com.holdenkarau.spark.testing.SharedSparkContext import org.scalatest.FunSuite /** - * Tests SparkModule higher-order-functions. - */ + * Tests SparkModule higher-order-functions. + */ class RddSerializedOpsTest extends FunSuite with SharedSparkContext { import RddSerializedOpsTest._ /** - * In class (vs. companion object) so that we have access to `assert` from `FunSuite`. - */ + * In class (vs. companion object) so that we have access to `assert` from `FunSuite`. + */ private def checkResults[A](correct: Seq[A], results: Seq[A]): Unit = - correct.zip(results) - .foreach { - case (c, r) => - assert(c === r) - } + correct.zip(results).foreach { + case (c, r) => + assert(c === r) + } test("Map") { val f = (s: String) => s"${s}_$s" @@ -43,7 +42,7 @@ class RddSerializedOpsTest extends FunSuite with SharedSparkContext { if (!(dumbDataSet contains s)) throw new RuntimeException(s"unexpected input: $s") else - Unit + Unit val foreacher = RddSerializedOps.Foreach(f) foreacher(sc.parallelize(dumbData)) @@ -69,4 +68,4 @@ object RddSerializedOpsTest { val dumbDataSet = dumbData.toSet -} \ No newline at end of file +} diff --git a/data-tc-spark/src/test/scala/fif/spark/avroparquet/RddHelpersTest.scala b/data-tc-spark/src/test/scala/fif/spark/avroparquet/RddHelpersTest.scala index b370172..283a3e9 100644 --- a/data-tc-spark/src/test/scala/fif/spark/avroparquet/RddHelpersTest.scala +++ b/data-tc-spark/src/test/scala/fif/spark/avroparquet/RddHelpersTest.scala @@ -23,12 +23,13 @@ class RddHelpersTest extends FunSuite with SharedSparkContext { test("serialize and deserialize same RDD, ensure contents do not change") { runWithTemp(deleteBefore = true) { temp => - val rddEntities = sc.parallelize(entities) - RddHelpers.saveRddAsParquet[SampleEntity](sc)(temp.getAbsolutePath)(rddEntities) + RddHelpers.saveRddAsParquet[SampleEntity](sc)(temp.getAbsolutePath)( + rddEntities) val loadedRddEntities = - RddHelpers.rddFromParquet[SampleEntity](sc)(temp.getAbsolutePath) + RddHelpers + .rddFromParquet[SampleEntity](sc)(temp.getAbsolutePath) .sortBy(sortFn) rddEntities @@ -45,11 +46,13 @@ class RddHelpersTest extends FunSuite with SharedSparkContext { object RddHelpersTest { - val preSerializedRddDocumentEntitiesPath = "data-tc-spark/src/test/resources/avroparquet_sample_entities_rdd/" + val preSerializedRddDocumentEntitiesPath = + "data-tc-spark/src/test/resources/avroparquet_sample_entities_rdd/" def runWithTemp(deleteBefore: Boolean)(f: File => Unit): Unit = synchronized { - val temp = File.createTempFile("sparkmod-RddHelpersTest", UUID.randomUUID().toString) + val temp = File.createTempFile("sparkmod-RddHelpersTest", + UUID.randomUUID().toString) try { if (deleteBefore) temp.delete() @@ -99,7 +102,6 @@ object RddHelpersTest { "MONEY", Vector(42) ) - ) - .sortBy(sortFn) + ).sortBy(sortFn) -} \ No newline at end of file +} diff --git a/data-tc-spark/src/test/scala/fif/spark/avroparquet/SampleEntity.scala b/data-tc-spark/src/test/scala/fif/spark/avroparquet/SampleEntity.scala index 1f49c21..ac33bee 100644 --- a/data-tc-spark/src/test/scala/fif/spark/avroparquet/SampleEntity.scala +++ b/data-tc-spark/src/test/scala/fif/spark/avroparquet/SampleEntity.scala @@ -1,44 +1,60 @@ package fif.spark.avroparquet -/** - * Code generated from avro schemas by scalaAvro. Do not modify. - * "ALL THESE FILES ARE YOURS—EXCEPT SAMPLEENTITY.SCALA / ATTEMPT NO MODIFICATIONS THERE" - */ -final case class SampleEntity( - entityName: String, - entityType: String, - pages: Vector[Int]) extends com.nitro.scalaAvro.runtime.GeneratedMessage with com.nitro.scalaAvro.runtime.Message[SampleEntity] { +/** + * Code generated from avro schemas by scalaAvro. Do not modify. + * "ALL THESE FILES ARE YOURS—EXCEPT SAMPLEENTITY.SCALA / ATTEMPT NO MODIFICATIONS THERE" + */ +final case class SampleEntity(entityName: String, + entityType: String, + pages: Vector[Int]) + extends com.nitro.scalaAvro.runtime.GeneratedMessage + with com.nitro.scalaAvro.runtime.Message[SampleEntity] { def withEntityName(__v: String): SampleEntity = copy(entityName = __v) def withEntityType(__v: String): SampleEntity = copy(entityType = __v) def withPages(__v: Vector[Int]): SampleEntity = copy(pages = __v) def toMutable: org.apache.avro.generic.GenericRecord = { - val __out__ = new org.apache.avro.generic.GenericData.Record(SampleEntity.schema) + val __out__ = + new org.apache.avro.generic.GenericData.Record(SampleEntity.schema) __out__.put("entityName", entityName) __out__.put("entityType", entityType) - __out__.put("pages", scala.collection.JavaConversions.asJavaCollection(pages.map(_e => _e))) + __out__.put( + "pages", + scala.collection.JavaConversions.asJavaCollection(pages.map(_e => _e))) __out__ } def companion = SampleEntity } -object SampleEntity extends com.nitro.scalaAvro.runtime.GeneratedMessageCompanion[SampleEntity] { - implicit def messageCompanion: com.nitro.scalaAvro.runtime.GeneratedMessageCompanion[SampleEntity] = this +object SampleEntity + extends com.nitro.scalaAvro.runtime.GeneratedMessageCompanion[SampleEntity] { + implicit def messageCompanion: com.nitro.scalaAvro.runtime.GeneratedMessageCompanion[ + SampleEntity] = this def schema: org.apache.avro.Schema = - new org.apache.avro.Schema.Parser().parse("""{"type":"record","name":"SampleEntity","namespace":"fif.spark.avroparquet","fields":[{"name":"entityName","type":"string"},{"name":"entityType","type":"string"},{"name":"pages","type":{"type":"array","items":"int"}}]}""") + new org.apache.avro.Schema.Parser().parse( + """{"type":"record","name":"SampleEntity","namespace":"fif.spark.avroparquet","fields":[{"name":"entityName","type":"string"},{"name":"entityType","type":"string"},{"name":"pages","type":{"type":"array","items":"int"}}]}""") val _arbitrary: org.scalacheck.Gen[SampleEntity] = for { entityName <- com.nitro.scalaAvro.runtime.AvroGenUtils.genAvroString entityType <- com.nitro.scalaAvro.runtime.AvroGenUtils.genAvroString pages <- com.nitro.scalaAvro.runtime.AvroGenUtils.genAvroArray( org.scalacheck.Arbitrary.arbInt.arbitrary ) - } yield SampleEntity( - entityName = entityName, - entityType = entityType, - pages = pages - ) - def fromMutable(generic: org.apache.avro.generic.GenericRecord): SampleEntity = + } yield + SampleEntity( + entityName = entityName, + entityType = entityType, + pages = pages + ) + def fromMutable( + generic: org.apache.avro.generic.GenericRecord): SampleEntity = SampleEntity( entityName = convertString(generic.get("entityName")), entityType = convertString(generic.get("entityType")), - pages = scala.collection.JavaConversions.asScalaIterator(generic.get("pages").asInstanceOf[org.apache.avro.generic.GenericArray[Any]].iterator()).map(_elem => _elem.asInstanceOf[Int]).toVector + pages = scala.collection.JavaConversions + .asScalaIterator( + generic + .get("pages") + .asInstanceOf[org.apache.avro.generic.GenericArray[Any]] + .iterator()) + .map(_elem => _elem.asInstanceOf[Int]) + .toVector ) } diff --git a/project/SharedBuild.scala b/project/SharedBuild.scala index 28cf432..5a3af15 100644 --- a/project/SharedBuild.scala +++ b/project/SharedBuild.scala @@ -52,29 +52,28 @@ object SharedBuild { lazy val scalaMacros = "org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full - ////////////////////////////////////////////////// - // Code formatting settings for scalariform // - ////////////////////////////////////////////////// - lazy val sharedCodeFmt = { - import scalariform.formatter.preferences._ - FormattingPreferences() - .setPreference(AlignParameters, true ) - .setPreference(AlignSingleLineCaseStatements, true ) - .setPreference(CompactControlReadability, false ) - .setPreference(CompactStringConcatenation, true ) - .setPreference(DoubleIndentClassDeclaration, true ) - .setPreference(FormatXml, true ) - .setPreference(IndentLocalDefs, true ) - .setPreference(IndentPackageBlocks, true ) - .setPreference(IndentSpaces, 2 ) - .setPreference(MultilineScaladocCommentsStartOnFirstLine, false ) - .setPreference(PreserveDanglingCloseParenthesis, true ) - .setPreference(PreserveSpaceBeforeArguments, false ) - .setPreference(RewriteArrowSymbols, false ) - .setPreference(SpaceBeforeColon, false ) - .setPreference(SpaceInsideBrackets, false ) - .setPreference(SpacesWithinPatternBinders, true ) - } + lazy val pomExtraInfo = { + https://github.com/malcolmgreaves/data-tc + + + Apache 2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + git@github.com:malcolmgreaves/data-tc.git + scm:git@github.com:malcolmgreaves/data-tc.git + + + + malcolmgreaves + Malcolm Greaves + greaves.malcolm@gmail.com + https://malcolmgreaves.io/ + + + } -} +} \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt index ad130bb..7f05603 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,8 +1,8 @@ logLevel := Level.Warn -addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0") -addSbtPlugin("com.gonitro" % "avro-codegen-compiler" % "0.3.4") +addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "0.4.10") + +addSbtPlugin("com.gonitro" % "avro-codegen-compiler" % "0.3.4") addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.0.4") addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.0.0") -