Skip to content

Commit

Permalink
added a spark module with more integration
Browse files Browse the repository at this point in the history
  • Loading branch information
kailuowang committed Sep 13, 2019
2 parents d573823 + c65118f commit 522c541
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 174 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
dynamodb-local
derby.log
metastore_db
logs
release
plots
Expand Down
11 changes: 10 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ lazy val libs =
.addJVM(name = "henkan-convert", version = "0.6.4", org ="com.kailuowang")
.add( name = "play-json", version = "2.7.3", org = "com.typesafe.play")
.add( name = "play", version = "2.7.3", org = "com.typesafe.play")
.add( name = "spark", version = "2.4.4", org = "org.apache.spark", "spark-sql", "spark-core")
.addJava(name ="commons-math3", version = "3.6.1", org ="org.apache.commons")
.addJVM(name = "play-json-derived-codecs", version = "6.0.0", org = "org.julienrf")
.addJVM(name = "newtype", version = "0.4.3", org = "io.estatico")
Expand All @@ -37,7 +38,7 @@ addCommandAlias("validate", s";clean;test;play/IntegrationTest/test;it/Integrati
addCommandAlias("it", s"IntegrationTest/test")

lazy val thomas = project.in(file("."))
.aggregate(playExample, play, client, bandit, it, http4s, cli, mongo, analysis, docs, stress, dynamo)
.aggregate(playExample, play, client, bandit, it, http4s, cli, mongo, analysis, docs, stress, dynamo, spark)
.settings(
rootSettings,
crossScalaVersions := Nil,
Expand Down Expand Up @@ -160,6 +161,14 @@ lazy val dynamo = project
libs.dependencies("lihua-dynamo")
)

lazy val spark = project
.dependsOn(client)
.settings(name := "thomas-spark")
.settings(rootSettings)
.settings(
libs.dependency("spark-sql", Some("provided"))
)


lazy val http4s = project
.dependsOn(mongo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ package client

import java.time.OffsetDateTime

import cats.{Functor, Id, MonadError}
import cats.{Functor, MonadError}
import cats.effect._
import com.iheart.thomas.abtest.model._
import lihua.Entity
import _root_.play.api.libs.json._
import cats.implicits._
import com.iheart.thomas.abtest.Error
import com.iheart.thomas.abtest.AssignGroups
import com.iheart.thomas.analysis.KPIDistribution
import abtest.Formats._
import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -152,12 +151,13 @@ object AbtestClient {
* Shortcuts for getting the assigned group only.
* @param serviceUrl for getting all running tests as of `time`
*/
def assignGroups[F[_]: ConcurrentEffect](
serviceUrl: String,
time: Option[OffsetDateTime])(implicit ec: ExecutionContext): F[AssignGroups[Id]] =
def testsWithFeatures[F[_]: ConcurrentEffect](serviceUrl: String,
time: Option[OffsetDateTime])(
implicit ec: ExecutionContext): F[Vector[(Abtest, Feature)]] =
Http4SAbtestClient
.resource[F](new HttpServiceUrlsPlay("mock") {
override val tests: String = serviceUrl
}, ec)
.use(_.tests(time).map(t => AssignGroups.fromTestsFeatures[Id](t)))
.use(
_.tests(time).map(_.map { case (Entity(_, test), feature) => (test, feature) }))
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
package com.iheart.thomas
package client

import cats.Id
import cats.effect.{ContextShift, IO}
import com.iheart.thomas.abtest.AssignGroups
import com.iheart.thomas.abtest.model.UserGroupQuery

import collection.JavaConverters._
Expand All @@ -15,60 +17,28 @@ class JavaAbtestAssignments private (serviceUrl: String, asOf: Option[Long]) {
private val time = asOf.map(TimeUtil.toDateTime)
import scala.concurrent.ExecutionContext.Implicits.global
implicit val csIo: ContextShift[IO] = IO.contextShift(global)
val assignGroups = AbtestClient.assignGroups[IO](serviceUrl, time).unsafeRunSync()
val testData = AbtestClient.testsWithFeatures[IO](serviceUrl, time).unsafeRunSync()

def assignments(
userId: String,
tags: java.util.ArrayList[String],
meta: java.util.Map[String, String],
features: java.util.ArrayList[String]
): java.util.List[(FeatureName, GroupName)] = {
assignGroups
.assign(
UserGroupQuery(Some(userId),
time,
tags.asScala.toList,
meta.asScala.toMap,
features = features.asScala.toList))
._2
.map {
case (fn, (gn, _)) => (fn, gn)
}
.toList
): java.util.Map[FeatureName, GroupName] = {
AssignGroups
.assign[Id](testData,
UserGroupQuery(Some(userId),
time,
tags.asScala.toList,
meta.asScala.toMap,
features = features.asScala.toList))
.map { case (fn, (gn, _)) => (fn, gn) }
.asJava
}

def assignments(
userIds: java.util.List[UserId],
feature: String,
tags: java.util.List[String],
meta: java.util.Map[String, String]
): java.util.List[(UserId, GroupName)] = {
val tagsL = tags.asScala.toList
val metaS = meta.asScala.toMap
val features = List(feature)
userIds.asScala.toList.flatMap { userId =>
assignGroups
.assign(UserGroupQuery(Some(userId), time, tagsL, metaS, features))
._2
.get(feature)
.map(_._1)
.map((userId, _))
}.asJava
}

def assignments(
userIds: java.util.List[UserId],
feature: String
): java.util.List[(UserId, GroupName)] =
assignments(userIds,
feature,
new java.util.ArrayList[String](),
new java.util.HashMap[String, String]())

def assignments(
userId: String
): java.util.List[(FeatureName, GroupName)] =
): java.util.Map[FeatureName, GroupName] =
assignments(userId,
new java.util.ArrayList[String](),
new java.util.HashMap[String, String](),
Expand All @@ -77,7 +47,7 @@ class JavaAbtestAssignments private (serviceUrl: String, asOf: Option[Long]) {
def assignments(
userId: String,
features: java.util.ArrayList[String]
): java.util.List[(FeatureName, GroupName)] =
): java.util.Map[FeatureName, GroupName] =
assignments(userId,
new java.util.ArrayList[String](),
new java.util.HashMap[String, String](),
Expand Down
28 changes: 13 additions & 15 deletions core/src/main/scala/com/iheart/thomas/abtest/AbtestAlg.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ final class DefaultAbtestAlg[F[_]](refreshPeriod: FiniteDuration)(
time: Option[OffsetDateTime],
userTags: List[Tag]): F[Map[FeatureName, GroupName]] =
validateUserId(userId) >>
getGroupAssignmentsOf(UserGroupQuery(Some(userId), time, userTags)).map(p =>
toGroups(p._2))
getGroupAssignmentsOf(UserGroupQuery(Some(userId), time, userTags)).map(toGroups)

def terminate(testId: TestId): F[Option[Entity[Abtest]]] =
getTest(testId).flatMap { test =>
Expand Down Expand Up @@ -304,13 +303,12 @@ final class DefaultAbtestAlg[F[_]](refreshPeriod: FiniteDuration)(

def getGroupsWithMeta(query: UserGroupQuery): F[UserGroupQueryResult] =
validate(query) >> {
getGroupAssignmentsOf(query).map {
case (at, groupAssignments) =>
val metas = groupAssignments.mapFilter {
case (groupName, test) =>
test.data.groupMetas.get(groupName)
}
UserGroupQueryResult(at, toGroups(groupAssignments), metas)
getGroupAssignmentsOf(query).map { groupAssignments =>
val metas = groupAssignments.mapFilter {
case (groupName, test) =>
test.groupMetas.get(groupName)
}
UserGroupQueryResult(toGroups(groupAssignments), metas)
}
}

Expand Down Expand Up @@ -419,16 +417,16 @@ final class DefaultAbtestAlg[F[_]](refreshPeriod: FiniteDuration)(
created <- doCreate(testSpec, Some(updatedContinueFrom))
} yield created

private def toGroups(assignments: Map[FeatureName, (GroupName, Entity[Abtest])])
: Map[FeatureName, GroupName] =
private def toGroups(
assignments: Map[FeatureName, (GroupName, _)]): Map[FeatureName, GroupName] =
assignments.toList.map { case (k, v) => (k, v._1) }.toMap

private def getGroupAssignmentsOf(query: UserGroupQuery)
: F[(OffsetDateTime, Map[FeatureName, (GroupName, Entity[Abtest])])] =
private def getGroupAssignmentsOf(
query: UserGroupQuery): F[Map[FeatureName, (GroupName, Abtest)]] =
for {
data <- getAllTestsCached(query.at)
r <- AssignGroups.fromTestsFeatures[F](data).assign(query)
} yield r
testFeatures = data.map { case (Entity(_, test), feature) => (test, feature) }
} yield AssignGroups.assign[Id](testFeatures, query)

private def doCreate(newSpec: AbtestSpec,
inheritFrom: Option[Entity[Abtest]]): F[Entity[Abtest]] = {
Expand Down
76 changes: 5 additions & 71 deletions core/src/main/scala/com/iheart/thomas/abtest/AssignGroups.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,83 +6,17 @@
package com.iheart.thomas
package abtest

import java.time.OffsetDateTime

import cats.Monad
import cats.implicits._
import com.iheart.thomas.abtest.model.Abtest.Status.InProgress
import com.iheart.thomas.abtest.model._
import lihua.Entity

trait AssignGroups[F[_]] {
def assign(query: UserGroupQuery)
: F[(OffsetDateTime, Map[FeatureName, (GroupName, Entity[Abtest])])]
}
import henkan.convert.Syntax._

object AssignGroups {

//todo: replace this using the static assign methods
def fromTestsFeatures[F[_]: Monad](
data: Vector[(Entity[Abtest], Feature)]
)(implicit eligibilityControl: EligibilityControl[F]): AssignGroups[F] =
new DefaultAssignGroups[F](
ofTime =>
data
.collect {
case (et @ Entity(_, test), _) if test.statusAsOf(ofTime) == InProgress => et
}
.pure[F],
fn =>
data
.collectFirst {
case (_, f) if f.name == fn => f
}
.pure[F]
)

class DefaultAssignGroups[F[_]: Monad](
testsRetriever: OffsetDateTime => F[Vector[Entity[Abtest]]],
featureRetriever: FeatureName => F[Option[Feature]]
)(implicit eligibilityControl: EligibilityControl[F])
extends AssignGroups[F] {

def assign(query: UserGroupQuery)
: F[(OffsetDateTime, Map[FeatureName, (GroupName, Entity[Abtest])])] = {
val ofTime = query.at.getOrElse(TimeUtil.currentMinute)
val allTests = testsRetriever(ofTime)
val targetTests =
if (query.features.isEmpty) allTests
else allTests.map(_.filter(t => query.features.contains(t.data.feature)))
targetTests
.flatMap(_.traverseFilter { test
eligibilityControl.eligible(query, test.data).flatMap {
eligible =>
featureRetriever(test.data.feature).map {
feature =>
val idToUse = test.data.idToUse(query)
def overriddenGroup = {
(feature, idToUse).mapN((f, uid) => f.overrides.get(uid)).flatten
}
{
if (eligible)
overriddenGroup orElse {
idToUse.flatMap(uid => Bucketing.getGroup(uid, test.data))
} else if (feature.fold(false)(_.overrideEligibility))
overriddenGroup
else
None
}.map(gn => (test.data.feature, (gn, test)))
}
}
}.map(_.toMap))
.map((ofTime, _))
}
}

def assign[F[_]: Monad](test: Abtest, feature: Feature, query: UserGroupQuery)(
implicit eligibilityControl: EligibilityControl[F]): F[Option[GroupName]] = {
eligibilityControl.eligible(query, test).map { eligible =>
val idToUse = test.idToUse(query)
val idToUse = test.idToUse(query.to[UserInfo]())
def overriddenGroup = {
idToUse.map(uid => feature.overrides.get(uid)).flatten
}
Expand All @@ -97,13 +31,13 @@ object AssignGroups {
}
}

def assign[F[_]: Monad](tests: List[(Abtest, Feature)], query: UserGroupQuery)(
def assign[F[_]: Monad](tests: Vector[(Abtest, Feature)], query: UserGroupQuery)(
implicit eligibilityControl: EligibilityControl[F])
: F[Map[FeatureName, GroupName]] = {
: F[Map[FeatureName, (GroupName, Abtest)]] = {
tests
.traverseFilter {
case (test, feature) =>
assign[F](test, feature, query).map(_.map((feature.name, _)))
assign[F](test, feature, query).map(_.map(gn => (feature.name, (gn, test))))
}
.map(_.toMap)

Expand Down
Loading

0 comments on commit 522c541

Please sign in to comment.