Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WFS optimisations - parallelization by partition #73

Open
wants to merge 8 commits into
base: loadtest
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.ekstep.analytics.job

import optional.Application
import org.apache.spark.SparkContext
import org.ekstep.analytics.framework.{JobConfig, JobContext}
import org.ekstep.analytics.framework.Level.ERROR
import org.ekstep.analytics.framework.exception.{DataFetcherException, JobNotFoundException}
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger}

object WFSExecutor extends Application {

implicit val className = "org.ekstep.analytics.job.WFSExecutor"

def main(model: String, fromPartition: String, toPartition: String, config: String) {

JobLogger.start("Started executing WFSExecutor", Option(Map("config" -> config, "model" -> model, "fromPartition" -> fromPartition, "toPartition" -> toPartition)))
val con = JSONUtils.deserialize[JobConfig](config)
val sc = CommonUtil.getSparkContext(con.parallelization.getOrElse(200), con.appName.getOrElse(con.model));
try {
val result = CommonUtil.time({
execute(model, fromPartition.toInt, toPartition.toInt, config)(sc);
})
JobLogger.end("WFSExecutor completed", "SUCCESS", Option(Map("timeTaken" -> result._1)));
} catch {
case ex: Exception =>
JobLogger.log(ex.getMessage, None, ERROR);
JobLogger.end("WFSExecutor failed", "FAILED")
throw ex
} finally {
CommonUtil.closeSparkContext()(sc);
}
}

def execute(model: String, fromPartition: Int, toPartition: Int, config: String)(implicit sc: SparkContext) {

val range = fromPartition to toPartition
range.toList.map { partition =>
try {
val jobConfig = config.replace("__partition__", partition.toString)
val job = JobFactory.getJob(model);
JobLogger.log("### Executing wfs for the partition - " + partition + " ###")
job.main(jobConfig)(Option(sc));
} catch {
case ex: DataFetcherException => {
JobLogger.log(ex.getMessage, Option(Map("model_code" -> model, "partition" -> partition)), ERROR)
}
case ex: JobNotFoundException => {
JobLogger.log(ex.getMessage, Option(Map("model_code" -> model, "partition" -> partition)), ERROR)
throw ex;
}
case ex: Exception => {
JobLogger.log(ex.getMessage, Option(Map("model_code" -> model, "partition" -> partition)), ERROR)
ex.printStackTrace()
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.ekstep.analytics.model

import java.io.Serializable

import org.ekstep.analytics.framework.IBatchModelTemplate
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
Expand All @@ -16,6 +18,12 @@ import org.ekstep.analytics.framework.util.JobLogger
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework._

@scala.beans.BeanInfo
class WFSInputEvent(val eid: String, val ets: Long, val `@timestamp`: String, val ver: String, val mid: String, val actor: Actor, val context: V3Context, val `object`: Option[V3Object], val edata: WFSInputEData, val tags: List[AnyRef] = null) extends AlgoInput with Input {}
@scala.beans.BeanInfo
class WFSInputEData(val `type`: String, val mode: String, val duration: Long, val pageid: String, val item: Question,
val resvalues: Array[Map[String, AnyRef]], val pass: String, val score: Int) extends Serializable {}

case class WorkflowInput(sessionKey: WorkflowIndex, events: Buffer[String]) extends AlgoInput
case class WorkflowOutput(index: WorkflowIndex, summaries: Buffer[MeasuredEvent]) extends AlgoOutput
case class WorkflowIndex(did: String, channel: String, pdataId: String)
Expand Down Expand Up @@ -53,11 +61,11 @@ object WorkFlowSummaryModel extends IBatchModelTemplate[String, WorkflowInput, M

data.map({ f =>
var summEvents: Buffer[MeasuredEvent] = Buffer();
val events = f.events.map(f => JSONUtils.deserialize[V3EventNew](f))
val events = f.events.map(f => JSONUtils.deserialize[WFSInputEvent](f))
val sortedEvents = events.sortBy { x => x.ets }
var rootSummary: org.ekstep.analytics.util.Summary = null
var currSummary: org.ekstep.analytics.util.Summary = null
var prevEvent: V3EventNew = sortedEvents.head
var prevEvent: WFSInputEvent = sortedEvents.head

sortedEvents.foreach{ x =>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package org.ekstep.analytics.util

import org.ekstep.analytics.framework._

import scala.collection.mutable.Buffer
import org.apache.commons.lang3.StringUtils
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.CommonUtil
import org.ekstep.analytics.model.WFSInputEvent

case class Item(itemId: String, timeSpent: Option[Double], res: Option[Array[String]], resValues: Option[Array[AnyRef]], mc: Option[AnyRef], mmc: Option[AnyRef], score: Int, time_stamp: Long, maxScore: Option[AnyRef], pass: String, qtitle: Option[String], qdesc: Option[String]);

class Summary(val firstEvent: V3EventNew) {
class Summary(val firstEvent: WFSInputEvent) {

val defaultPData = V3PData(AppConf.getConfig("default.consumption.app.id"), Option("1.0"))
val interactTypes = List("touch", "drag", "drop", "pinch", "zoom", "shake", "rotate", "speak", "listen", "write", "draw", "start", "end", "choose", "activate", "scroll", "click", "edit", "submit", "search", "dnd", "added", "removed", "selected")
Expand All @@ -28,7 +30,7 @@ class Summary(val firstEvent: V3EventNew) {
var interactEventsCount: Long = if(StringUtils.equals("INTERACT", firstEvent.eid) && interactTypes.contains(firstEvent.edata.`type`.toLowerCase)) 1l else 0l
var `type`: String = if (null == firstEvent.edata.`type`) "app" else StringUtils.lowerCase(firstEvent.edata.`type`)
var mode: Option[String] = if (firstEvent.edata.mode == null) Option("") else Option(firstEvent.edata.mode)
var lastEvent: V3EventNew = null
var lastEvent: WFSInputEvent = null
var itemResponses: Buffer[Item] = Buffer[Item]()
var endTime: Long = 0l
var timeSpent: Double = 0.0
Expand All @@ -37,8 +39,8 @@ class Summary(val firstEvent: V3EventNew) {
var eventsSummary: Map[String, Long] = Map(firstEvent.eid -> 1)
var pageSummary: Iterable[PageSummary] = Iterable[PageSummary]()
var prevEventEts: Long = startTime
var lastImpression: V3EventNew = null
var impressionMap: Map[V3EventNew, Double] = Map()
var lastImpression: WFSInputEvent = null
var impressionMap: Map[WFSInputEvent, Double] = Map()
var summaryEvents: Buffer[MeasuredEvent] = Buffer()

var CHILDREN: Buffer[Summary] = Buffer()
Expand Down Expand Up @@ -106,7 +108,7 @@ class Summary(val firstEvent: V3EventNew) {
this.isClosed = false
}

def add(event: V3EventNew, idleTime: Int) {
def add(event: WFSInputEvent, idleTime: Int) {
if(this.startTime == 0l) this.startTime = event.ets
val ts = CommonUtil.getTimeDiff(prevEventEts, event.ets).get
prevEventEts = event.ets
Expand Down Expand Up @@ -169,7 +171,7 @@ class Summary(val firstEvent: V3EventNew) {
}
}

def checkEnd(event: V3EventNew, idleTime: Int, config: Map[String, AnyRef]): Summary = {
def checkEnd(event: WFSInputEvent, idleTime: Int, config: Map[String, AnyRef]): Summary = {
val mode = if(event.edata.mode == null) "" else event.edata.mode
if(StringUtils.equalsIgnoreCase(this.`type`, event.edata.`type`) && StringUtils.equals(this.mode.get, mode)) {
if(this.PARENT == null) return this else return PARENT;
Expand All @@ -184,7 +186,7 @@ class Summary(val firstEvent: V3EventNew) {
return summ;
}

def getSimilarEndSummary(event: V3EventNew): Summary = {
def getSimilarEndSummary(event: WFSInputEvent): Summary = {
val mode = if(event.edata.mode == null) "" else event.edata.mode
if(StringUtils.equalsIgnoreCase(this.`type`, event.edata.`type`) && StringUtils.equals(this.mode.get, mode)) {
return this;
Expand Down Expand Up @@ -220,7 +222,7 @@ class Summary(val firstEvent: V3EventNew) {
val interactEventsPerMin: Double = if (this.interactEventsCount == 0 || this.timeSpent == 0) 0d
else if (this.timeSpent < 60.0) this.interactEventsCount.toDouble
else BigDecimal(this.interactEventsCount / (this.timeSpent / 60)).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble;
val syncts = CommonUtil.getEventSyncTS(if(this.lastEvent == null) this.firstEvent else this.lastEvent)
val syncts = getEventSyncTS(if(this.lastEvent == null) this.firstEvent else this.lastEvent)
val eventsSummary = this.eventsSummary.map(f => EventSummary(f._1, f._2.toInt))
val measures = Map("start_time" -> this.startTime,
"end_time" -> this.endTime,
Expand Down Expand Up @@ -267,4 +269,9 @@ class Summary(val firstEvent: V3EventNew) {
} else Iterable[EnvSummary]()
}

def getEventSyncTS(event: WFSInputEvent): Long = {
val timeInString = event.`@timestamp`;
CommonUtil.getEventSyncTS(timeInString);
}

}
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package org.ekstep.analytics.util

import org.ekstep.analytics.framework.V3EventNew
import org.ekstep.analytics.framework.util.JSONUtils
import org.ekstep.analytics.model.SparkSpec
import org.ekstep.analytics.model.{SparkSpec, WFSInputEvent}

class TestSummary extends SparkSpec {

it should "create summary without uid, pdata and edata.type" in {

val eventStr = "{\"eid\":\"START\",\"ets\":1534595611976,\"ver\":\"3.0\",\"mid\":\"817e25d0-33f5-48a2-8239-db286aaf3bd8\",\"actor\":{\"type\":\"User\"},\"context\":{\"channel\":\"01235953109336064029450\",\"env\":\"home\",\"sid\":\"8f32dbc4-c0d0-4630-9ff6-8c3bce3d15bb\",\"did\":\"a49c706d0402d6db3bb7cb3105cc9e7cf9b2ed7e\",\"cdata\":[]},\"object\":{\"id\":\"do_31250841732493312026783\",\"type\":\"TextBookUnit\",\"rollup\":{\"l1\":\"do_31250841732058316826675\",\"l2\":\"do_31250841732491673626778\",\"l3\":\"do_31250841732493312026783\"}},\"edata\":{\"mode\":\"play\",\"duration\":0,\"pageid\":\"collection-detail\",\"score\":0,\"rating\":0.0,\"index\":0,\"size\":0},\"tags\":[],\"@timestamp\":\"2018-08-19T04:08:41.195Z\"}"
val event = JSONUtils.deserialize[V3EventNew](eventStr)
val event = JSONUtils.deserialize[WFSInputEvent](eventStr)
val summary = new Summary(event)

summary.uid should be("")
Expand All @@ -20,20 +18,20 @@ class TestSummary extends SparkSpec {
it should "create summary and add ASSESS event" in {

val eventStr = "{\"eid\":\"START\",\"ets\":1534651548447,\"ver\":\"3.0\",\"mid\":\"b154b70f-17fb-4c16-80f6-130f5ad479a1\",\"actor\":{\"id\":\"79498cf0-cdfa-41fb-b504-f257a8b40955\",\"type\":\"User\"},\"context\":{\"channel\":\"01235953109336064029450\",\"pdata\":{\"id\":\"prod.sunbird.app\",\"ver\":\"2.0.102\",\"pid\":\"sunbird.app.contentplayer\"},\"env\":\"contentplayer\",\"sid\":\"8f32dbc4-c0d0-4630-9ff6-8c3bce3d15bb\",\"did\":\"a49c706d0402d6db3bb7cb3105cc9e7cf9b2ed7e\",\"cdata\":[{\"id\":\"7f37a89e68bceff05ac34fb1b507ad71\",\"type\":\"ContentSession\"}],\"rollup\":{}},\"object\":{\"id\":\"do_31251316520745369623572\",\"type\":\"Content\",\"ver\":\"1.0\"},\"edata\":{\"type\":\"content\",\"mode\":\"play\",\"duration\":1534651548445,\"pageid\":\"\",\"score\":0,\"rating\":0.0,\"index\":0,\"size\":0},\"tags\":[],\"@timestamp\":\"2018-08-19T04:09:13.259Z\"}"
val event = JSONUtils.deserialize[V3EventNew](eventStr)
val event = JSONUtils.deserialize[WFSInputEvent](eventStr)
val summary = new Summary(event)

summary.uid should be("79498cf0-cdfa-41fb-b504-f257a8b40955")
summary.`type` should be("content")
summary.mode.get should be("play")

val assessStr = "{\"actor\":{\"id\":\"580\",\"type\":\"User\"},\"context\":{\"cdata\":[{\"id\":\"de2115eb13f5113ffce9444cfecd3ab7\",\"type\":\"ContentSession\"}],\"channel\":\"in.sunbird\",\"did\":\"b027147870670bc57de790535311fbe5\",\"env\":\"ContentPlayer\",\"pdata\":{\"id\":\"in.sunbird.dev\",\"pid\":\"sunbird.app.contentplayer\",\"ver\":\"2.0.93\"},\"rollup\":{},\"sid\":\"7op5o46hpi2abkmp8ckihjeq72\"},\"edata\":{\"duration\":25,\"index\":8,\"item\":{\"desc\":\"\",\"exlength\":0,\"id\":\"do_31249422333176217625083\",\"maxscore\":1,\"mc\":[],\"mmc\":[],\"params\":[],\"title\":\"3ZMSUQQ9\",\"uri\":\"\"},\"pass\":\"Yes\",\"score\":1},\"eid\":\"ASSESS\",\"ets\":1515497370223,\"mid\":\"01951ddb-cf6e-4a4a-b740-b5e4fd0e483d\",\"object\":{\"id\":\"do_1122852550749306881159\",\"type\":\"Content\",\"ver\":\"1.0\"},\"tags\":[],\"ver\":\"3.0\",\"@timestamp\":\"2018-06-04T04:59:45.328Z\",\"ts\":\"2018-06-04T04:59:33.379+0000\"}"
val assessEvent = JSONUtils.deserialize[V3EventNew](assessStr)
val assessEvent = JSONUtils.deserialize[WFSInputEvent](assessStr)

summary.add(assessEvent, 600)

val endStr = "{\"eid\":\"END\",\"ets\":1536574379460,\"ver\":\"3.0\",\"mid\":\"9200c869-d16f-47af-b220-cf97845f1501\",\"actor\":{\"id\":\"a72482ab-320b-43dc-9821-5e6d1c9bf44a\",\"type\":\"User\"},\"context\":{\"channel\":\"01235953109336064029450\",\"pdata\":{\"id\":\"prod.sunbird.app\",\"ver\":\"2.0.106\",\"pid\":\"sunbird.app.contentplayer\"},\"env\":\"contentplayer\",\"sid\":\"b8bc1f1d-22a6-4aa6-aa5e-de3654e80f96\",\"did\":\"1b21a2906e7de0dd66235e7cf9373adb4aaaa104\",\"cdata\":[{\"id\":\"324168fd6623977b924f8552697cfd09\",\"type\":\"ContentSession\"}],\"rollup\":{\"l1\":\"do_31250890252435456027272\",\"l2\":\"do_31250890925065011217170\",\"l3\":\"do_31250890925065830417174\",\"l4\":\"do_31250890925069107217196\"}},\"object\":{\"id\":\"do_31251478823299481625615\",\"type\":\"Content\",\"ver\":\"1.0\"},\"edata\":{\"type\":\"resource\",\"mode\":\"play\",\"duration\":121972,\"pageid\":\"sunbird-player-Endpage\",\"score\":0,\"rating\":0.0,\"summary\":[{\"progress\":100.0}],\"index\":0,\"size\":0},\"tags\":[],\"@timestamp\":\"2018-09-11T05:08:04.311Z\"}"
val endEvent = JSONUtils.deserialize[V3EventNew](endStr)
val endEvent = JSONUtils.deserialize[WFSInputEvent](endStr)

summary.checkEnd(endEvent, 600, Map())
summary.getSimilarEndSummary(endEvent)
Expand Down
22 changes: 22 additions & 0 deletions scripts/shell/local/run-wfs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env bash

export SPARK_HOME=<add spark-installtion-directory>/spark-2.4.4-bin-hadoop2.7
export MODELS_HOME=<add models directory>/models-2.0
export DP_LOGS=<add logs directory>/logs/data-products

## Job to run daily
cd <add scripts directory>
# example wfs config
# echo '{"search":{"type":"azure","queries":[{"bucket":"'$bucket'","prefix":"unique-partition/__partition__/","endDate":"'$endDate'","delta":0}]},"model":"org.ekstep.analytics.model.WorkflowSummary","modelParams":{"apiVersion":"v2", "parallelization":200},"output":[{"to":"kafka","params":{"brokerList":"'$brokerList'","topic":"'$topic'"}}],"parallelization":200,"appName":"Workflow Summarizer","deviceMapping":true}'
source model-config.sh
today=$(date "+%Y-%m-%d")

job_config=$(config $1)
start_partition=$2
end_partition=$3

echo "Starting the job - WFS" >> "$DP_LOGS/$today-job-execution.log"

$SPARK_HOME/bin/spark-submit --master local[*] --jars $MODELS_HOME/analytics-framework-2.0.jar,$MODELS_HOME/scruid_2.11-2.3.2.jar --class org.ekstep.analytics.job.WFSExecutor $MODELS_HOME/batch-models-2.0.jar --model "$1" --fromPartition $start_partition --toPartition $end_partition --config "$job_config" >> "$DP_LOGS/$today-job-execution.log"

echo "Job execution completed - $1" >> "$DP_LOGS/$today-job-execution.log"