Skip to content

Commit

Permalink
Published 0.0.0 on sonatype, scalafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
malcolmgreaves committed Jan 7, 2017
1 parent 315dde7 commit c81cd77
Show file tree
Hide file tree
Showing 34 changed files with 485 additions and 430 deletions.
52 changes: 20 additions & 32 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
name := "data-tc"
organization in ThisBuild := "io.malcolmgreaves"
version in ThisBuild := {
version in ThisBuild := {
val major: Int = 0
val minor: Int = 0
val patch: Int = 0
s"$major.$minor.$patch"
}

import SharedBuild._
com.typesafe.sbt.SbtScalariform.defaultScalariformSettings
ScalariformKeys.preferences := sharedCodeFmt


lazy val root = project
.in(file("."))
.aggregate(
Expand All @@ -21,35 +19,26 @@ lazy val root = project
)
.settings {
publishArtifact := false
publishLocal := {}
publish := {}
publishLocal := {}
publish := {}
}

lazy val `data-tc-scala` = project
.in(file("data-tc-scala"))
.settings {
publishArtifact := true
}
lazy val `data-tc-scala` = project.in(file("data-tc-scala")).settings {
publishArtifact := true
}

lazy val `data-tc-spark` = project
.in(file("data-tc-spark"))
.dependsOn(`data-tc-scala`)
.settings {
lazy val `data-tc-spark` =
project.in(file("data-tc-spark")).dependsOn(`data-tc-scala`).settings {
publishArtifact := true
}

lazy val `data-tc-flink` = project
.in(file("data-tc-flink"))
.dependsOn(`data-tc-scala`)
.settings {
lazy val `data-tc-flink` =
project.in(file("data-tc-flink")).dependsOn(`data-tc-scala`).settings {
publishArtifact := true
}


lazy val `data-tc-extra` = project
.in(file("data-tc-extra"))
.dependsOn(`data-tc-scala`)
.settings {
lazy val `data-tc-extra` =
project.in(file("data-tc-extra")).dependsOn(`data-tc-scala`).settings {
publishArtifact := true
}

Expand All @@ -58,22 +47,20 @@ lazy val publishTasks = subprojects.map { publish.in }

resolvers in ThisBuild := Seq(
// sonatype, maven central
"Sonatype Releases" at "https://oss.sonatype.org/content/repositories/releases/",
"Sonatype Releases" at "https://oss.sonatype.org/content/repositories/releases/",
"Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/",

// bintray
"Scalaz Bintray" at "http://dl.bintray.com/scalaz/releases",
Resolver.bintrayRepo("mfglabs", "maven"),
Resolver.bintrayRepo("dwhjames", "maven"),

// etc.
"Confluent" at "http://packages.confluent.io/maven/"
)

// runtime & compiliation

lazy val javaV = "1.8"
scalaVersion in ThisBuild := "2.11.8"
lazy val javaV = "1.8"
scalaVersion in ThisBuild := "2.11.8"
scalacOptions in ThisBuild := Seq(
"-optimize",
"-deprecation",
Expand All @@ -96,12 +83,13 @@ scalacOptions in ThisBuild := Seq(
"-Xfatal-warnings" // Every warning is esclated to an error.
)
javacOptions in ThisBuild := Seq("-source", javaV, "-target", javaV)
javaOptions in ThisBuild := Seq(
"-server",
"-XX:+AggressiveOpts",
javaOptions in ThisBuild := Seq(
"-server",
"-XX:+AggressiveOpts",
"-XX:+TieredCompilation",
"-XX:CompileThreshold=100",
"-Xmx3000M",
"-XX:+UseG1GC"
)

publishArtifact := false
13 changes: 5 additions & 8 deletions data-tc-extra/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@ name := "data-tc-extra"

import SharedBuild._

com.typesafe.sbt.SbtScalariform.defaultScalariformSettings
ScalariformKeys.preferences := sharedCodeFmt

addCompilerPlugin(scalaMacros)

libraryDependencies ++=
libraryDependencies ++=
extraDeps ++
testDeps

testDeps

// doc hacks

Expand All @@ -21,7 +17,8 @@ sources in (Compile, doc) ~= (_ filter (_.getName endsWith "DataOps.scala"))
//
// test, runtime settings
//
fork in run := true
fork in Test := true
fork in run := true
fork in Test := true
parallelExecution in Test := true

pomExtra := pomExtraInfo
6 changes: 4 additions & 2 deletions data-tc-extra/src/main/scala/fif/ops/ToMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ object ToMap extends Serializable {
}
}

def apply[T: ClassTag, U: ClassTag: Semigroup, D[_]: Data](data: D[(T, U)]): Map[T, U] = {
def apply[T: ClassTag, U: ClassTag: Semigroup, D[_]: Data](
data: D[(T, U)]): Map[T, U] = {
implicit val _ = identity[(T, U)] _
apply[(T, U), T, U, D](data)
}

def apply[A, T: ClassTag, U: ClassTag: Semigroup, D[_]: Data](data: D[A])(implicit ev: A <:< (T, U)): Map[T, U] = {
def apply[A, T: ClassTag, U: ClassTag: Semigroup, D[_]: Data](data: D[A])(
implicit ev: A <:< (T, U)): Map[T, U] = {

val sg = implicitly[Semigroup[U]]

Expand Down
2 changes: 1 addition & 1 deletion data-tc-extra/src/test/scala/fif/TestHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ object TestHelpers {
override def combine(a: Int, b: Int) = a + b
}

}
}
1 change: 0 additions & 1 deletion data-tc-extra/src/test/scala/fif/ops/SumTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,3 @@ class SumTest extends FunSuite {
}

}

5 changes: 3 additions & 2 deletions data-tc-extra/src/test/scala/fif/ops/ToMapTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class ToMapTest extends FunSuite {

test("ToMap list of many elements") {
val l = Traversable(
("hello", 10), ("hello", 20),
("hello", 10),
("hello", 20),
("world", 30),
("sunday funday", 40),
("sunday funday", 50),
Expand Down Expand Up @@ -56,4 +57,4 @@ class ToMapTest extends FunSuite {
assert(ToMap.combine(first, second) == expected)
}

}
}
11 changes: 5 additions & 6 deletions data-tc-flink/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ name := "data-tc-flink"

import SharedBuild._

com.typesafe.sbt.SbtScalariform.defaultScalariformSettings
ScalariformKeys.preferences := sharedCodeFmt

addCompilerPlugin(scalaMacros)

libraryDependencies ++=
libraryDependencies ++=
flinkTcDeps ++
testDeps
testDeps

testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-v")
testOptions in Test += Tests.Argument("-oF")
fork in Test := true
fork in Test := true
parallelExecution in Test := true

pomExtra := pomExtraInfo
97 changes: 48 additions & 49 deletions data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import scala.util.Try
import fif.Data

/**
* Implementation of the Data typeclass with the Flink DataSet type.
*
* Since DataSet instances are lazy, this typeclass implementation's methods
* have lazy (vs. eager) semantics.
*/
* Implementation of the Data typeclass with the Flink DataSet type.
*
* Since DataSet instances are lazy, this typeclass implementation's methods
* have lazy (vs. eager) semantics.
*/
case object FlinkData extends Data[DataSet] {

import Data.ops._
Expand All @@ -24,7 +24,8 @@ case object FlinkData extends Data[DataSet] {
data.map(f)
}

override def mapParition[A, B: ClassTag](d: DataSet[A])(f: Iterable[A] => Iterable[B]): DataSet[B] = {
override def mapParition[A, B: ClassTag](d: DataSet[A])(
f: Iterable[A] => Iterable[B]): DataSet[B] = {
implicit val ti = FlinkHelper.typeInfo[B]
d.mapPartition { partition =>
f(partition.toIterable)
Expand All @@ -41,7 +42,8 @@ case object FlinkData extends Data[DataSet] {
()
}

override def foreachPartition[A](d: DataSet[A])(f: Iterable[A] => Any): Unit = {
override def foreachPartition[A](d: DataSet[A])(
f: Iterable[A] => Any): Unit = {
// ignore this map operation's return type
implicit val ti = FlinkHelper.unitTypeInformation
d.mapPartition { partition =>
Expand All @@ -54,24 +56,22 @@ case object FlinkData extends Data[DataSet] {
override def filter[A](d: DataSet[A])(f: A => Boolean): DataSet[A] =
d.filter(f)

override def aggregate[A, B: ClassTag](data: DataSet[A])(zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = {
override def aggregate[A, B: ClassTag](data: DataSet[A])(
zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = {
implicit val ti = FlinkHelper.typeInfo[B]
data
.mapPartition { partition =>
Seq(partition.foldLeft(zero)(seqOp))
}
.reduce(combOp)
.collect()
.reduce(combOp)
data.mapPartition { partition =>
Seq(partition.foldLeft(zero)(seqOp))
}.reduce(combOp).collect().reduce(combOp)
}

/**
* Unimplemented!
*
* Flink doesn't support an API for total sorting. Must determine a correct
* implementation using the lower-level API.
*/
override def sortBy[A, B: ClassTag](d: DataSet[A])(f: (A) B)(implicit ord: math.Ordering[B]): DataSet[A] =
* Unimplemented!
*
* Flink doesn't support an API for total sorting. Must determine a correct
* implementation using the lower-level API.
*/
override def sortBy[A, B: ClassTag](d: DataSet[A])(f: (A) B)(
implicit ord: math.Ordering[B]): DataSet[A] =
???

override def take[A](d: DataSet[A])(k: Int): Traversable[A] =
Expand All @@ -83,40 +83,38 @@ case object FlinkData extends Data[DataSet] {
override def toSeq[A](d: DataSet[A]): Seq[A] =
d.collect()

override def flatMap[A, B: ClassTag](d: DataSet[A])(f: A => TraversableOnce[B]): DataSet[B] = {
override def flatMap[A, B: ClassTag](d: DataSet[A])(
f: A => TraversableOnce[B]): DataSet[B] = {
implicit val ti = FlinkHelper.typeInfo[B]
d.flatMap(f)
}

override def flatten[A, B: ClassTag](d: DataSet[A])(implicit asDataSet: A => TraversableOnce[B]): DataSet[B] =
override def flatten[A, B: ClassTag](d: DataSet[A])(
implicit asDataSet: A => TraversableOnce[B]): DataSet[B] =
flatMap(d)(asDataSet)

override def groupBy[A, B: ClassTag](data: DataSet[A])(f: A => B): DataSet[(B, Iterable[A])] = {
override def groupBy[A, B: ClassTag](data: DataSet[A])(
f: A => B): DataSet[(B, Iterable[A])] = {

val reducedToMaps = {

implicit val ti: TypeInformation[scala.collection.immutable.Map[B, Iterable[A]]] =
implicit val ti: TypeInformation[
scala.collection.immutable.Map[B, Iterable[A]]] =
FlinkHelper.typeInfo(ClassTag(classOf[Map[B, Iterable[A]]]))

data
.mapPartition { partition =>
Seq(
partition
.toIterable
.groupBy(f)
.map {
case (key, values) => (key, values.toIterable)
}
)
}
.reduce(FlinkHelper.mapCombine[B, A] _)
data.mapPartition { partition =>
Seq(
partition.toIterable.groupBy(f).map {
case (key, values) => (key, values.toIterable)
}
)
}.reduce(FlinkHelper.mapCombine[B, A] _)
}

implicit val ti: TypeInformation[(B, Iterable[A])] =
FlinkHelper.typeInfo(ClassTag(classOf[(B, Iterable[A])]))

reducedToMaps
.flatMap(_.toSeq)
reducedToMaps.flatMap(_.toSeq)
}

override def reduce[A](d: DataSet[A])(op: (A, A) => A): A =
Expand All @@ -129,19 +127,20 @@ case object FlinkData extends Data[DataSet] {
size(d) == 0

/**
* Unimplemented!
*
* Waiting on support coming in Flink 0.10 !
*/
override def zip[A, B: ClassTag](d: DataSet[A])(that: DataSet[B]): DataSet[(A, B)] =
* Unimplemented!
*
* Waiting on support coming in Flink 0.10 !
*/
override def zip[A, B: ClassTag](d: DataSet[A])(
that: DataSet[B]): DataSet[(A, B)] =
???

/**
* Unimplemented!
*
* Waiting on support coming in Flink 0.10 !
*/
* Unimplemented!
*
* Waiting on support coming in Flink 0.10 !
*/
override def zipWithIndex[A](d: DataSet[A]): DataSet[(A, Long)] =
???

}
}
Loading

0 comments on commit c81cd77

Please sign in to comment.