diff --git a/.gitignore b/.gitignore index dccc0e80..45c3c1a4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ dynamodb-local +derby.log +metastore_db logs release plots diff --git a/build.sbt b/build.sbt index 63d2aa98..0fd5060e 100644 --- a/build.sbt +++ b/build.sbt @@ -17,6 +17,7 @@ lazy val libs = .addJVM(name = "henkan-convert", version = "0.6.4", org ="com.kailuowang") .add( name = "play-json", version = "2.7.3", org = "com.typesafe.play") .add( name = "play", version = "2.7.3", org = "com.typesafe.play") + .add( name = "spark", version = "2.4.4", org = "org.apache.spark", "spark-sql", "spark-core") .addJava(name ="commons-math3", version = "3.6.1", org ="org.apache.commons") .addJVM(name = "play-json-derived-codecs", version = "6.0.0", org = "org.julienrf") .addJVM(name = "newtype", version = "0.4.3", org = "io.estatico") @@ -37,7 +38,7 @@ addCommandAlias("validate", s";clean;test;play/IntegrationTest/test;it/Integrati addCommandAlias("it", s"IntegrationTest/test") lazy val thomas = project.in(file(".")) - .aggregate(playExample, play, client, bandit, it, http4s, cli, mongo, analysis, docs, stress, dynamo) + .aggregate(playExample, play, client, bandit, it, http4s, cli, mongo, analysis, docs, stress, dynamo, spark) .settings( rootSettings, crossScalaVersions := Nil, @@ -160,6 +161,14 @@ lazy val dynamo = project libs.dependencies("lihua-dynamo") ) +lazy val spark = project + .dependsOn(client) + .settings(name := "thomas-spark") + .settings(rootSettings) + .settings( + libs.dependency("spark-sql", Some("provided")) + ) + lazy val http4s = project .dependsOn(mongo) diff --git a/client/src/main/scala/com/iheart/thomas/client/AbtestClient.scala b/client/src/main/scala/com/iheart/thomas/client/AbtestClient.scala index 16ecc023..430ab80f 100644 --- a/client/src/main/scala/com/iheart/thomas/client/AbtestClient.scala +++ b/client/src/main/scala/com/iheart/thomas/client/AbtestClient.scala @@ -8,14 +8,13 @@ package client import java.time.OffsetDateTime -import cats.{Functor, Id, MonadError} +import cats.{Functor, MonadError} import cats.effect._ import com.iheart.thomas.abtest.model._ import lihua.Entity import _root_.play.api.libs.json._ import cats.implicits._ import com.iheart.thomas.abtest.Error -import com.iheart.thomas.abtest.AssignGroups import com.iheart.thomas.analysis.KPIDistribution import abtest.Formats._ import scala.concurrent.ExecutionContext @@ -152,12 +151,13 @@ object AbtestClient { * Shortcuts for getting the assigned group only. * @param serviceUrl for getting all running tests as of `time` */ - def assignGroups[F[_]: ConcurrentEffect]( - serviceUrl: String, - time: Option[OffsetDateTime])(implicit ec: ExecutionContext): F[AssignGroups[Id]] = + def testsWithFeatures[F[_]: ConcurrentEffect](serviceUrl: String, + time: Option[OffsetDateTime])( + implicit ec: ExecutionContext): F[Vector[(Abtest, Feature)]] = Http4SAbtestClient .resource[F](new HttpServiceUrlsPlay("mock") { override val tests: String = serviceUrl }, ec) - .use(_.tests(time).map(t => AssignGroups.fromTestsFeatures[Id](t))) + .use( + _.tests(time).map(_.map { case (Entity(_, test), feature) => (test, feature) })) } diff --git a/client/src/main/scala/com/iheart/thomas/client/JavaAbtestAssignments.scala b/client/src/main/scala/com/iheart/thomas/client/JavaAbtestAssignments.scala index 2f9e58a3..ec7df87c 100644 --- a/client/src/main/scala/com/iheart/thomas/client/JavaAbtestAssignments.scala +++ b/client/src/main/scala/com/iheart/thomas/client/JavaAbtestAssignments.scala @@ -6,7 +6,9 @@ package com.iheart.thomas package client +import cats.Id import cats.effect.{ContextShift, IO} +import com.iheart.thomas.abtest.AssignGroups import com.iheart.thomas.abtest.model.UserGroupQuery import collection.JavaConverters._ @@ -15,60 +17,28 @@ class JavaAbtestAssignments private (serviceUrl: String, asOf: Option[Long]) { private val time = asOf.map(TimeUtil.toDateTime) import scala.concurrent.ExecutionContext.Implicits.global implicit val csIo: ContextShift[IO] = IO.contextShift(global) - val assignGroups = AbtestClient.assignGroups[IO](serviceUrl, time).unsafeRunSync() + val testData = AbtestClient.testsWithFeatures[IO](serviceUrl, time).unsafeRunSync() def assignments( userId: String, tags: java.util.ArrayList[String], meta: java.util.Map[String, String], features: java.util.ArrayList[String] - ): java.util.List[(FeatureName, GroupName)] = { - assignGroups - .assign( - UserGroupQuery(Some(userId), - time, - tags.asScala.toList, - meta.asScala.toMap, - features = features.asScala.toList)) - ._2 - .map { - case (fn, (gn, _)) => (fn, gn) - } - .toList + ): java.util.Map[FeatureName, GroupName] = { + AssignGroups + .assign[Id](testData, + UserGroupQuery(Some(userId), + time, + tags.asScala.toList, + meta.asScala.toMap, + features = features.asScala.toList)) + .map { case (fn, (gn, _)) => (fn, gn) } .asJava } - def assignments( - userIds: java.util.List[UserId], - feature: String, - tags: java.util.List[String], - meta: java.util.Map[String, String] - ): java.util.List[(UserId, GroupName)] = { - val tagsL = tags.asScala.toList - val metaS = meta.asScala.toMap - val features = List(feature) - userIds.asScala.toList.flatMap { userId => - assignGroups - .assign(UserGroupQuery(Some(userId), time, tagsL, metaS, features)) - ._2 - .get(feature) - .map(_._1) - .map((userId, _)) - }.asJava - } - - def assignments( - userIds: java.util.List[UserId], - feature: String - ): java.util.List[(UserId, GroupName)] = - assignments(userIds, - feature, - new java.util.ArrayList[String](), - new java.util.HashMap[String, String]()) - def assignments( userId: String - ): java.util.List[(FeatureName, GroupName)] = + ): java.util.Map[FeatureName, GroupName] = assignments(userId, new java.util.ArrayList[String](), new java.util.HashMap[String, String](), @@ -77,7 +47,7 @@ class JavaAbtestAssignments private (serviceUrl: String, asOf: Option[Long]) { def assignments( userId: String, features: java.util.ArrayList[String] - ): java.util.List[(FeatureName, GroupName)] = + ): java.util.Map[FeatureName, GroupName] = assignments(userId, new java.util.ArrayList[String](), new java.util.HashMap[String, String](), diff --git a/core/src/main/scala/com/iheart/thomas/abtest/AbtestAlg.scala b/core/src/main/scala/com/iheart/thomas/abtest/AbtestAlg.scala index b11d76d9..bdb0d407 100644 --- a/core/src/main/scala/com/iheart/thomas/abtest/AbtestAlg.scala +++ b/core/src/main/scala/com/iheart/thomas/abtest/AbtestAlg.scala @@ -248,8 +248,7 @@ final class DefaultAbtestAlg[F[_]](refreshPeriod: FiniteDuration)( time: Option[OffsetDateTime], userTags: List[Tag]): F[Map[FeatureName, GroupName]] = validateUserId(userId) >> - getGroupAssignmentsOf(UserGroupQuery(Some(userId), time, userTags)).map(p => - toGroups(p._2)) + getGroupAssignmentsOf(UserGroupQuery(Some(userId), time, userTags)).map(toGroups) def terminate(testId: TestId): F[Option[Entity[Abtest]]] = getTest(testId).flatMap { test => @@ -304,13 +303,12 @@ final class DefaultAbtestAlg[F[_]](refreshPeriod: FiniteDuration)( def getGroupsWithMeta(query: UserGroupQuery): F[UserGroupQueryResult] = validate(query) >> { - getGroupAssignmentsOf(query).map { - case (at, groupAssignments) => - val metas = groupAssignments.mapFilter { - case (groupName, test) => - test.data.groupMetas.get(groupName) - } - UserGroupQueryResult(at, toGroups(groupAssignments), metas) + getGroupAssignmentsOf(query).map { groupAssignments => + val metas = groupAssignments.mapFilter { + case (groupName, test) => + test.groupMetas.get(groupName) + } + UserGroupQueryResult(toGroups(groupAssignments), metas) } } @@ -419,16 +417,16 @@ final class DefaultAbtestAlg[F[_]](refreshPeriod: FiniteDuration)( created <- doCreate(testSpec, Some(updatedContinueFrom)) } yield created - private def toGroups(assignments: Map[FeatureName, (GroupName, Entity[Abtest])]) - : Map[FeatureName, GroupName] = + private def toGroups( + assignments: Map[FeatureName, (GroupName, _)]): Map[FeatureName, GroupName] = assignments.toList.map { case (k, v) => (k, v._1) }.toMap - private def getGroupAssignmentsOf(query: UserGroupQuery) - : F[(OffsetDateTime, Map[FeatureName, (GroupName, Entity[Abtest])])] = + private def getGroupAssignmentsOf( + query: UserGroupQuery): F[Map[FeatureName, (GroupName, Abtest)]] = for { data <- getAllTestsCached(query.at) - r <- AssignGroups.fromTestsFeatures[F](data).assign(query) - } yield r + testFeatures = data.map { case (Entity(_, test), feature) => (test, feature) } + } yield AssignGroups.assign[Id](testFeatures, query) private def doCreate(newSpec: AbtestSpec, inheritFrom: Option[Entity[Abtest]]): F[Entity[Abtest]] = { diff --git a/core/src/main/scala/com/iheart/thomas/abtest/AssignGroups.scala b/core/src/main/scala/com/iheart/thomas/abtest/AssignGroups.scala index afd494cc..ac81c0b1 100644 --- a/core/src/main/scala/com/iheart/thomas/abtest/AssignGroups.scala +++ b/core/src/main/scala/com/iheart/thomas/abtest/AssignGroups.scala @@ -6,83 +6,17 @@ package com.iheart.thomas package abtest -import java.time.OffsetDateTime - import cats.Monad import cats.implicits._ -import com.iheart.thomas.abtest.model.Abtest.Status.InProgress import com.iheart.thomas.abtest.model._ -import lihua.Entity - -trait AssignGroups[F[_]] { - def assign(query: UserGroupQuery) - : F[(OffsetDateTime, Map[FeatureName, (GroupName, Entity[Abtest])])] -} +import henkan.convert.Syntax._ object AssignGroups { - //todo: replace this using the static assign methods - def fromTestsFeatures[F[_]: Monad]( - data: Vector[(Entity[Abtest], Feature)] - )(implicit eligibilityControl: EligibilityControl[F]): AssignGroups[F] = - new DefaultAssignGroups[F]( - ofTime => - data - .collect { - case (et @ Entity(_, test), _) if test.statusAsOf(ofTime) == InProgress => et - } - .pure[F], - fn => - data - .collectFirst { - case (_, f) if f.name == fn => f - } - .pure[F] - ) - - class DefaultAssignGroups[F[_]: Monad]( - testsRetriever: OffsetDateTime => F[Vector[Entity[Abtest]]], - featureRetriever: FeatureName => F[Option[Feature]] - )(implicit eligibilityControl: EligibilityControl[F]) - extends AssignGroups[F] { - - def assign(query: UserGroupQuery) - : F[(OffsetDateTime, Map[FeatureName, (GroupName, Entity[Abtest])])] = { - val ofTime = query.at.getOrElse(TimeUtil.currentMinute) - val allTests = testsRetriever(ofTime) - val targetTests = - if (query.features.isEmpty) allTests - else allTests.map(_.filter(t => query.features.contains(t.data.feature))) - targetTests - .flatMap(_.traverseFilter { test ⇒ - eligibilityControl.eligible(query, test.data).flatMap { - eligible => - featureRetriever(test.data.feature).map { - feature => - val idToUse = test.data.idToUse(query) - def overriddenGroup = { - (feature, idToUse).mapN((f, uid) => f.overrides.get(uid)).flatten - } - { - if (eligible) - overriddenGroup orElse { - idToUse.flatMap(uid => Bucketing.getGroup(uid, test.data)) - } else if (feature.fold(false)(_.overrideEligibility)) - overriddenGroup - else - None - }.map(gn => (test.data.feature, (gn, test))) - } - } - }.map(_.toMap)) - .map((ofTime, _)) - } - } - def assign[F[_]: Monad](test: Abtest, feature: Feature, query: UserGroupQuery)( implicit eligibilityControl: EligibilityControl[F]): F[Option[GroupName]] = { eligibilityControl.eligible(query, test).map { eligible => - val idToUse = test.idToUse(query) + val idToUse = test.idToUse(query.to[UserInfo]()) def overriddenGroup = { idToUse.map(uid => feature.overrides.get(uid)).flatten } @@ -97,13 +31,13 @@ object AssignGroups { } } - def assign[F[_]: Monad](tests: List[(Abtest, Feature)], query: UserGroupQuery)( + def assign[F[_]: Monad](tests: Vector[(Abtest, Feature)], query: UserGroupQuery)( implicit eligibilityControl: EligibilityControl[F]) - : F[Map[FeatureName, GroupName]] = { + : F[Map[FeatureName, (GroupName, Abtest)]] = { tests .traverseFilter { case (test, feature) => - assign[F](test, feature, query).map(_.map((feature.name, _))) + assign[F](test, feature, query).map(_.map(gn => (feature.name, (gn, test)))) } .map(_.toMap) diff --git a/core/src/main/scala/com/iheart/thomas/abtest/EligibilityControl.scala b/core/src/main/scala/com/iheart/thomas/abtest/EligibilityControl.scala index 729200e6..3e567a2f 100644 --- a/core/src/main/scala/com/iheart/thomas/abtest/EligibilityControl.scala +++ b/core/src/main/scala/com/iheart/thomas/abtest/EligibilityControl.scala @@ -6,65 +6,86 @@ package com.iheart.thomas package abtest +import java.time.OffsetDateTime + import cats.implicits._ import cats.kernel.Semigroup import cats.{Applicative, Id, Monad} import com.iheart.thomas.abtest +import com.iheart.thomas.abtest.model.Abtest.Status.InProgress import model._ + import scala.util.matching.Regex +import henkan.convert.Syntax._ trait EligibilityControl[F[_]] { def eligible( - userInfo: UserGroupQuery, - test: Abtest + query: UserGroupQuery, + test: Abtest ): F[Boolean] } object EligibilityControl extends EligibilityControlInstances0 { - def apply[F[_]](f: (UserGroupQuery, Abtest) => F[Boolean]): EligibilityControl[F] = new EligibilityControl[F] { - def eligible(userInfo: UserGroupQuery, test: Abtest): F[Boolean] = - f(userInfo, test) - } - - implicit def semigroupAnd[F[_]: Monad]: Semigroup[EligibilityControl[F]] = new Semigroup[EligibilityControl[F]] { - override def combine(x: EligibilityControl[F], y: EligibilityControl[F]): EligibilityControl[F] = - apply[F]((userInfo: UserGroupQuery, test: Abtest) => x.eligible(userInfo, test).flatMap { a => - if (a) { - y.eligible(userInfo, test) - } else false.pure[F] - }) - } + def apply[F[_]](f: (UserGroupQuery, Abtest) => F[Boolean]): EligibilityControl[F] = + new EligibilityControl[F] { + def eligible(userInfo: UserGroupQuery, test: Abtest): F[Boolean] = + f(userInfo, test) + } + + implicit def semigroupAnd[F[_]: Monad]: Semigroup[EligibilityControl[F]] = + new Semigroup[EligibilityControl[F]] { + override def combine(x: EligibilityControl[F], + y: EligibilityControl[F]): EligibilityControl[F] = + apply[F]((userInfo: UserGroupQuery, test: Abtest) => + x.eligible(userInfo, test).flatMap { a => + if (a) { + y.eligible(userInfo, test) + } else false.pure[F] + }) + } } -private[thomas] sealed abstract class EligibilityControlInstances0 extends EligibilityControlInstances1 { +private[thomas] sealed abstract class EligibilityControlInstances0 + extends EligibilityControlInstances1 { implicit def default: EligibilityControl[Id] = - byGroupMeta |+| byRequiredTags |+| bySegRanges + byGroupMeta |+| byRequiredTags |+| bySegRanges |+| byTestEffectiveRange - lazy val byGroupMeta: EligibilityControl[Id] = abtest.EligibilityControl[Id]((userInfo, test) => - test.matchingUserMeta.forall { - case (k, r) => userInfo.meta.get(k).fold(false)(v => new Regex(r).findFirstMatchIn(v).isDefined) + lazy val byGroupMeta: EligibilityControl[Id] = + abtest.EligibilityControl[Id]((query, test) => + test.matchingUserMeta.forall { + case (k, r) => + query.meta.get(k).fold(false)(v => new Regex(r).findFirstMatchIn(v).isDefined) }) - lazy val byRequiredTags: EligibilityControl[Id] = abtest.EligibilityControl[Id]((userInfo: UserGroupQuery, test: Abtest) => - test.requiredTags.forall(userInfo.tags.contains)) - - lazy val bySegRanges: EligibilityControl[Id] = abtest.EligibilityControl[Id]((userInfo: UserGroupQuery, test: Abtest) => - if (test.segmentRanges.isEmpty) true - else test.segmentRanges.exists { range => - test.idToUse(userInfo).fold(false) { id => - range.contains(Bucketing.md5Double(id)) - } - }) + lazy val byRequiredTags: EligibilityControl[Id] = + abtest.EligibilityControl[Id]((query: UserGroupQuery, test: Abtest) => + test.requiredTags.forall(query.tags.contains)) + + lazy val byTestEffectiveRange: EligibilityControl[Id] = + abtest.EligibilityControl[Id]((query: UserGroupQuery, test: Abtest) => + test.statusAsOf(query.at.getOrElse(OffsetDateTime.now)) === InProgress) + + lazy val bySegRanges: EligibilityControl[Id] = abtest.EligibilityControl[Id]( + (query: UserGroupQuery, test: Abtest) => + if (test.segmentRanges.isEmpty) true + else + test.segmentRanges.exists { range => + test.idToUse(query.to[UserInfo]()).fold(false) { id => + range.contains(Bucketing.md5Double(id)) + } + }) } private[thomas] sealed abstract class EligibilityControlInstances1 { - implicit def fromIdEControl[F[_]: Applicative](implicit idec: EligibilityControl[Id]): EligibilityControl[F] = EligibilityControl[F]( - (u, t) => idec.eligible(u, t).pure[F] - ) + implicit def fromIdEControl[F[_]: Applicative]( + implicit idec: EligibilityControl[Id]): EligibilityControl[F] = + EligibilityControl[F]( + (u, t) => idec.eligible(u, t).pure[F] + ) } diff --git a/core/src/main/scala/com/iheart/thomas/abtest/model/package.scala b/core/src/main/scala/com/iheart/thomas/abtest/model/package.scala index 1ff20e1d..6468d18d 100644 --- a/core/src/main/scala/com/iheart/thomas/abtest/model/package.scala +++ b/core/src/main/scala/com/iheart/thomas/abtest/model/package.scala @@ -64,8 +64,8 @@ package model { def endsAfter(time: OffsetDateTime) = end.fold(true)(_.isAfter(time)) - def idToUse(query: UserGroupQuery): Option[String] = - alternativeIdName.fold(query.userId)(query.meta.get) + def idToUse(ui: UserInfo): Option[String] = + alternativeIdName.fold(ui.userId)(ui.meta.get) } /** @@ -138,8 +138,12 @@ package model { features: List[FeatureName] = Nil ) + case class UserInfo( + userId: Option[UserId], + meta: UserMeta = Map() + ) + case class UserGroupQueryResult( - at: OffsetDateTime, groups: Map[FeatureName, GroupName], metas: Map[FeatureName, GroupMeta] ) diff --git a/docs/src/main/tut/FAQ.md b/docs/src/main/tut/FAQ.md index 477eba8a..50fe6765 100644 --- a/docs/src/main/tut/FAQ.md +++ b/docs/src/main/tut/FAQ.md @@ -58,6 +58,36 @@ download all the relevant tests and overrides. So please avoid recreating it unn `client.assignments(userId, [tags], {user_meta})` returns a Map (or hashmap if you are in python) of assignments. The keys of this Map will be feature names, and the values are the group names, the second and third arguments `[tags]` and `{user_meta}` are optional, ignore them if your tests don't requirement them. +This solution works fine for pyspark with small amount of data. For large dataset, Pyspark introp with JVM is not efficient. + +Thomas also provides a tighter spark integration module `thomas-spark`, which provides an UDF and a function that works directly with +DataFrame. The assignment computation is distributed through UDF + +Here is an example on how to use this +Start spark with the package + +`pyspark --packages com.iheart:thomas-spark_2.11:LATEST_VERSION` + +Inside pyspark shell + +```python +from pyspark.mllib.common import _py2java +from pyspark.mllib.common import _java2py + + +ta = sc._jvm.com.iheart.thomas.spark.Assigner.create("https://MY_ABTEST_SERVICE_HOST/abtest/testsWithFeatures") + +mockUserIds = [str(i) for i in range(100000)] +df = _py2java(sc.parallelize(mockUserIds).toDF("profileId")) + +result = _java2py(ta.assignments(df, "My_Test_Feature")) + +``` +Note that some python to java conversion is needed since `thomas-spark` is written in Scala. + + + + # How to run Bayesian Analysis Since Thomas does not come with an analytics solution, to analyze the A/B test results using Thomas's Bayesian utility, you need to write integration with your analytics solution. Please refer to [the dedicated page](bayesian.html) for detailed guide on this one. diff --git a/spark/src/main/scala/com/iheart/thomas/spark/Assigner.scala b/spark/src/main/scala/com/iheart/thomas/spark/Assigner.scala new file mode 100644 index 00000000..d2f66c17 --- /dev/null +++ b/spark/src/main/scala/com/iheart/thomas/spark/Assigner.scala @@ -0,0 +1,41 @@ +package com.iheart.thomas +package spark + +import cats.Id +import cats.effect.{ContextShift, IO} +import com.iheart.thomas.abtest.AssignGroups +import com.iheart.thomas.abtest.model.{Abtest, Feature, UserGroupQuery} +import com.iheart.thomas.client.AbtestClient +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.{udf, col} + +class Assigner(data: Vector[(Abtest, Feature)]) extends Serializable { + + def assignUdf(feature: FeatureName) = udf { (userId: String) => + AssignGroups + .assign[Id](data, UserGroupQuery(Some(userId), None, features = List(feature))) + .get(feature) + .map(_._1) + .getOrElse(null) + } + + def assignments(userIds: DataFrame, + feature: FeatureName, + idColumn: String): DataFrame = { + userIds.withColumn("assignment", assignUdf(feature)(col(idColumn))) + } +} + +object Assigner { + def create(url: String): Assigner = create(url, None) + def create(url: String, asOf: Long): Assigner = create(url, Some(asOf)) + + def create(url: String, asOf: Option[Long]): Assigner = { + import scala.concurrent.ExecutionContext.Implicits.global + val time = asOf.map(TimeUtil.toDateTime) + implicit val csIo: ContextShift[IO] = IO.contextShift(global) + val data = AbtestClient.testsWithFeatures[IO](url, time).unsafeRunSync() + + new Assigner(data) + } +} diff --git a/version.sbt b/version.sbt index 2c4a100d..497572ac 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.36" +version in ThisBuild := "0.37-SNAPSHOT"