diff --git a/batch-models/src/main/scala/org/ekstep/analytics/job/WFSExecutor.scala b/batch-models/src/main/scala/org/ekstep/analytics/job/WFSExecutor.scala new file mode 100644 index 00000000..b18bcade --- /dev/null +++ b/batch-models/src/main/scala/org/ekstep/analytics/job/WFSExecutor.scala @@ -0,0 +1,58 @@ +package org.ekstep.analytics.job + +import optional.Application +import org.apache.spark.SparkContext +import org.ekstep.analytics.framework.{JobConfig, JobContext} +import org.ekstep.analytics.framework.Level.ERROR +import org.ekstep.analytics.framework.exception.{DataFetcherException, JobNotFoundException} +import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger} + +object WFSExecutor extends Application { + + implicit val className = "org.ekstep.analytics.job.WFSExecutor" + + def main(model: String, fromPartition: String, toPartition: String, config: String) { + + JobLogger.start("Started executing WFSExecutor", Option(Map("config" -> config, "model" -> model, "fromPartition" -> fromPartition, "toPartition" -> toPartition))) + val con = JSONUtils.deserialize[JobConfig](config) + val sc = CommonUtil.getSparkContext(con.parallelization.getOrElse(200), con.appName.getOrElse(con.model)); + try { + val result = CommonUtil.time({ + execute(model, fromPartition.toInt, toPartition.toInt, config)(sc); + }) + JobLogger.end("WFSExecutor completed", "SUCCESS", Option(Map("timeTaken" -> result._1))); + } catch { + case ex: Exception => + JobLogger.log(ex.getMessage, None, ERROR); + JobLogger.end("WFSExecutor failed", "FAILED") + throw ex + } finally { + CommonUtil.closeSparkContext()(sc); + } + } + + def execute(model: String, fromPartition: Int, toPartition: Int, config: String)(implicit sc: SparkContext) { + + val range = fromPartition to toPartition + range.toList.map { partition => + try { + val jobConfig = config.replace("__partition__", partition.toString) + val job = JobFactory.getJob(model); + JobLogger.log("### Executing wfs for the partition - " + partition + " ###") + job.main(jobConfig)(Option(sc)); + } catch { + case ex: DataFetcherException => { + JobLogger.log(ex.getMessage, Option(Map("model_code" -> model, "partition" -> partition)), ERROR) + } + case ex: JobNotFoundException => { + JobLogger.log(ex.getMessage, Option(Map("model_code" -> model, "partition" -> partition)), ERROR) + throw ex; + } + case ex: Exception => { + JobLogger.log(ex.getMessage, Option(Map("model_code" -> model, "partition" -> partition)), ERROR) + ex.printStackTrace() + } + } + } + } +} diff --git a/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala index fddf3ba4..ae758cea 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala @@ -1,5 +1,7 @@ package org.ekstep.analytics.model +import java.io.Serializable + import org.ekstep.analytics.framework.IBatchModelTemplate import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -16,6 +18,12 @@ import org.ekstep.analytics.framework.util.JobLogger import org.ekstep.analytics.framework.conf.AppConf import org.ekstep.analytics.framework._ +@scala.beans.BeanInfo +class WFSInputEvent(val eid: String, val ets: Long, val `@timestamp`: String, val ver: String, val mid: String, val actor: Actor, val context: V3Context, val `object`: Option[V3Object], val edata: WFSInputEData, val tags: List[AnyRef] = null) extends AlgoInput with Input {} +@scala.beans.BeanInfo +class WFSInputEData(val `type`: String, val mode: String, val duration: Long, val pageid: String, val item: Question, + val resvalues: Array[Map[String, AnyRef]], val pass: String, val score: Int) extends Serializable {} + case class WorkflowInput(sessionKey: WorkflowIndex, events: Buffer[String]) extends AlgoInput case class WorkflowOutput(index: WorkflowIndex, summaries: Buffer[MeasuredEvent]) extends AlgoOutput case class WorkflowIndex(did: String, channel: String, pdataId: String) @@ -53,11 +61,11 @@ object WorkFlowSummaryModel extends IBatchModelTemplate[String, WorkflowInput, M data.map({ f => var summEvents: Buffer[MeasuredEvent] = Buffer(); - val events = f.events.map(f => JSONUtils.deserialize[V3EventNew](f)) + val events = f.events.map(f => JSONUtils.deserialize[WFSInputEvent](f)) val sortedEvents = events.sortBy { x => x.ets } var rootSummary: org.ekstep.analytics.util.Summary = null var currSummary: org.ekstep.analytics.util.Summary = null - var prevEvent: V3EventNew = sortedEvents.head + var prevEvent: WFSInputEvent = sortedEvents.head sortedEvents.foreach{ x => diff --git a/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala b/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala index 42d3f690..3362909a 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala @@ -1,14 +1,16 @@ package org.ekstep.analytics.util import org.ekstep.analytics.framework._ + import scala.collection.mutable.Buffer import org.apache.commons.lang3.StringUtils import org.ekstep.analytics.framework.conf.AppConf import org.ekstep.analytics.framework.util.CommonUtil +import org.ekstep.analytics.model.WFSInputEvent case class Item(itemId: String, timeSpent: Option[Double], res: Option[Array[String]], resValues: Option[Array[AnyRef]], mc: Option[AnyRef], mmc: Option[AnyRef], score: Int, time_stamp: Long, maxScore: Option[AnyRef], pass: String, qtitle: Option[String], qdesc: Option[String]); -class Summary(val firstEvent: V3EventNew) { +class Summary(val firstEvent: WFSInputEvent) { val defaultPData = V3PData(AppConf.getConfig("default.consumption.app.id"), Option("1.0")) val interactTypes = List("touch", "drag", "drop", "pinch", "zoom", "shake", "rotate", "speak", "listen", "write", "draw", "start", "end", "choose", "activate", "scroll", "click", "edit", "submit", "search", "dnd", "added", "removed", "selected") @@ -28,7 +30,7 @@ class Summary(val firstEvent: V3EventNew) { var interactEventsCount: Long = if(StringUtils.equals("INTERACT", firstEvent.eid) && interactTypes.contains(firstEvent.edata.`type`.toLowerCase)) 1l else 0l var `type`: String = if (null == firstEvent.edata.`type`) "app" else StringUtils.lowerCase(firstEvent.edata.`type`) var mode: Option[String] = if (firstEvent.edata.mode == null) Option("") else Option(firstEvent.edata.mode) - var lastEvent: V3EventNew = null + var lastEvent: WFSInputEvent = null var itemResponses: Buffer[Item] = Buffer[Item]() var endTime: Long = 0l var timeSpent: Double = 0.0 @@ -37,8 +39,8 @@ class Summary(val firstEvent: V3EventNew) { var eventsSummary: Map[String, Long] = Map(firstEvent.eid -> 1) var pageSummary: Iterable[PageSummary] = Iterable[PageSummary]() var prevEventEts: Long = startTime - var lastImpression: V3EventNew = null - var impressionMap: Map[V3EventNew, Double] = Map() + var lastImpression: WFSInputEvent = null + var impressionMap: Map[WFSInputEvent, Double] = Map() var summaryEvents: Buffer[MeasuredEvent] = Buffer() var CHILDREN: Buffer[Summary] = Buffer() @@ -106,7 +108,7 @@ class Summary(val firstEvent: V3EventNew) { this.isClosed = false } - def add(event: V3EventNew, idleTime: Int) { + def add(event: WFSInputEvent, idleTime: Int) { if(this.startTime == 0l) this.startTime = event.ets val ts = CommonUtil.getTimeDiff(prevEventEts, event.ets).get prevEventEts = event.ets @@ -169,7 +171,7 @@ class Summary(val firstEvent: V3EventNew) { } } - def checkEnd(event: V3EventNew, idleTime: Int, config: Map[String, AnyRef]): Summary = { + def checkEnd(event: WFSInputEvent, idleTime: Int, config: Map[String, AnyRef]): Summary = { val mode = if(event.edata.mode == null) "" else event.edata.mode if(StringUtils.equalsIgnoreCase(this.`type`, event.edata.`type`) && StringUtils.equals(this.mode.get, mode)) { if(this.PARENT == null) return this else return PARENT; @@ -184,7 +186,7 @@ class Summary(val firstEvent: V3EventNew) { return summ; } - def getSimilarEndSummary(event: V3EventNew): Summary = { + def getSimilarEndSummary(event: WFSInputEvent): Summary = { val mode = if(event.edata.mode == null) "" else event.edata.mode if(StringUtils.equalsIgnoreCase(this.`type`, event.edata.`type`) && StringUtils.equals(this.mode.get, mode)) { return this; @@ -220,7 +222,7 @@ class Summary(val firstEvent: V3EventNew) { val interactEventsPerMin: Double = if (this.interactEventsCount == 0 || this.timeSpent == 0) 0d else if (this.timeSpent < 60.0) this.interactEventsCount.toDouble else BigDecimal(this.interactEventsCount / (this.timeSpent / 60)).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble; - val syncts = CommonUtil.getEventSyncTS(if(this.lastEvent == null) this.firstEvent else this.lastEvent) + val syncts = getEventSyncTS(if(this.lastEvent == null) this.firstEvent else this.lastEvent) val eventsSummary = this.eventsSummary.map(f => EventSummary(f._1, f._2.toInt)) val measures = Map("start_time" -> this.startTime, "end_time" -> this.endTime, @@ -267,4 +269,9 @@ class Summary(val firstEvent: V3EventNew) { } else Iterable[EnvSummary]() } + def getEventSyncTS(event: WFSInputEvent): Long = { + val timeInString = event.`@timestamp`; + CommonUtil.getEventSyncTS(timeInString); + } + } \ No newline at end of file diff --git a/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala b/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala index 77a43b7e..307ae79c 100644 --- a/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala +++ b/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala @@ -1,15 +1,13 @@ package org.ekstep.analytics.util -import org.ekstep.analytics.framework.V3EventNew import org.ekstep.analytics.framework.util.JSONUtils -import org.ekstep.analytics.model.SparkSpec +import org.ekstep.analytics.model.{SparkSpec, WFSInputEvent} class TestSummary extends SparkSpec { it should "create summary without uid, pdata and edata.type" in { - val eventStr = "{\"eid\":\"START\",\"ets\":1534595611976,\"ver\":\"3.0\",\"mid\":\"817e25d0-33f5-48a2-8239-db286aaf3bd8\",\"actor\":{\"type\":\"User\"},\"context\":{\"channel\":\"01235953109336064029450\",\"env\":\"home\",\"sid\":\"8f32dbc4-c0d0-4630-9ff6-8c3bce3d15bb\",\"did\":\"a49c706d0402d6db3bb7cb3105cc9e7cf9b2ed7e\",\"cdata\":[]},\"object\":{\"id\":\"do_31250841732493312026783\",\"type\":\"TextBookUnit\",\"rollup\":{\"l1\":\"do_31250841732058316826675\",\"l2\":\"do_31250841732491673626778\",\"l3\":\"do_31250841732493312026783\"}},\"edata\":{\"mode\":\"play\",\"duration\":0,\"pageid\":\"collection-detail\",\"score\":0,\"rating\":0.0,\"index\":0,\"size\":0},\"tags\":[],\"@timestamp\":\"2018-08-19T04:08:41.195Z\"}" - val event = JSONUtils.deserialize[V3EventNew](eventStr) + val event = JSONUtils.deserialize[WFSInputEvent](eventStr) val summary = new Summary(event) summary.uid should be("") @@ -20,7 +18,7 @@ class TestSummary extends SparkSpec { it should "create summary and add ASSESS event" in { val eventStr = "{\"eid\":\"START\",\"ets\":1534651548447,\"ver\":\"3.0\",\"mid\":\"b154b70f-17fb-4c16-80f6-130f5ad479a1\",\"actor\":{\"id\":\"79498cf0-cdfa-41fb-b504-f257a8b40955\",\"type\":\"User\"},\"context\":{\"channel\":\"01235953109336064029450\",\"pdata\":{\"id\":\"prod.sunbird.app\",\"ver\":\"2.0.102\",\"pid\":\"sunbird.app.contentplayer\"},\"env\":\"contentplayer\",\"sid\":\"8f32dbc4-c0d0-4630-9ff6-8c3bce3d15bb\",\"did\":\"a49c706d0402d6db3bb7cb3105cc9e7cf9b2ed7e\",\"cdata\":[{\"id\":\"7f37a89e68bceff05ac34fb1b507ad71\",\"type\":\"ContentSession\"}],\"rollup\":{}},\"object\":{\"id\":\"do_31251316520745369623572\",\"type\":\"Content\",\"ver\":\"1.0\"},\"edata\":{\"type\":\"content\",\"mode\":\"play\",\"duration\":1534651548445,\"pageid\":\"\",\"score\":0,\"rating\":0.0,\"index\":0,\"size\":0},\"tags\":[],\"@timestamp\":\"2018-08-19T04:09:13.259Z\"}" - val event = JSONUtils.deserialize[V3EventNew](eventStr) + val event = JSONUtils.deserialize[WFSInputEvent](eventStr) val summary = new Summary(event) summary.uid should be("79498cf0-cdfa-41fb-b504-f257a8b40955") @@ -28,12 +26,12 @@ class TestSummary extends SparkSpec { summary.mode.get should be("play") val assessStr = "{\"actor\":{\"id\":\"580\",\"type\":\"User\"},\"context\":{\"cdata\":[{\"id\":\"de2115eb13f5113ffce9444cfecd3ab7\",\"type\":\"ContentSession\"}],\"channel\":\"in.sunbird\",\"did\":\"b027147870670bc57de790535311fbe5\",\"env\":\"ContentPlayer\",\"pdata\":{\"id\":\"in.sunbird.dev\",\"pid\":\"sunbird.app.contentplayer\",\"ver\":\"2.0.93\"},\"rollup\":{},\"sid\":\"7op5o46hpi2abkmp8ckihjeq72\"},\"edata\":{\"duration\":25,\"index\":8,\"item\":{\"desc\":\"\",\"exlength\":0,\"id\":\"do_31249422333176217625083\",\"maxscore\":1,\"mc\":[],\"mmc\":[],\"params\":[],\"title\":\"3ZMSUQQ9\",\"uri\":\"\"},\"pass\":\"Yes\",\"score\":1},\"eid\":\"ASSESS\",\"ets\":1515497370223,\"mid\":\"01951ddb-cf6e-4a4a-b740-b5e4fd0e483d\",\"object\":{\"id\":\"do_1122852550749306881159\",\"type\":\"Content\",\"ver\":\"1.0\"},\"tags\":[],\"ver\":\"3.0\",\"@timestamp\":\"2018-06-04T04:59:45.328Z\",\"ts\":\"2018-06-04T04:59:33.379+0000\"}" - val assessEvent = JSONUtils.deserialize[V3EventNew](assessStr) + val assessEvent = JSONUtils.deserialize[WFSInputEvent](assessStr) summary.add(assessEvent, 600) val endStr = "{\"eid\":\"END\",\"ets\":1536574379460,\"ver\":\"3.0\",\"mid\":\"9200c869-d16f-47af-b220-cf97845f1501\",\"actor\":{\"id\":\"a72482ab-320b-43dc-9821-5e6d1c9bf44a\",\"type\":\"User\"},\"context\":{\"channel\":\"01235953109336064029450\",\"pdata\":{\"id\":\"prod.sunbird.app\",\"ver\":\"2.0.106\",\"pid\":\"sunbird.app.contentplayer\"},\"env\":\"contentplayer\",\"sid\":\"b8bc1f1d-22a6-4aa6-aa5e-de3654e80f96\",\"did\":\"1b21a2906e7de0dd66235e7cf9373adb4aaaa104\",\"cdata\":[{\"id\":\"324168fd6623977b924f8552697cfd09\",\"type\":\"ContentSession\"}],\"rollup\":{\"l1\":\"do_31250890252435456027272\",\"l2\":\"do_31250890925065011217170\",\"l3\":\"do_31250890925065830417174\",\"l4\":\"do_31250890925069107217196\"}},\"object\":{\"id\":\"do_31251478823299481625615\",\"type\":\"Content\",\"ver\":\"1.0\"},\"edata\":{\"type\":\"resource\",\"mode\":\"play\",\"duration\":121972,\"pageid\":\"sunbird-player-Endpage\",\"score\":0,\"rating\":0.0,\"summary\":[{\"progress\":100.0}],\"index\":0,\"size\":0},\"tags\":[],\"@timestamp\":\"2018-09-11T05:08:04.311Z\"}" - val endEvent = JSONUtils.deserialize[V3EventNew](endStr) + val endEvent = JSONUtils.deserialize[WFSInputEvent](endStr) summary.checkEnd(endEvent, 600, Map()) summary.getSimilarEndSummary(endEvent) diff --git a/scripts/shell/local/run-wfs.sh b/scripts/shell/local/run-wfs.sh new file mode 100644 index 00000000..e6c0b9be --- /dev/null +++ b/scripts/shell/local/run-wfs.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +export SPARK_HOME=/spark-2.4.4-bin-hadoop2.7 +export MODELS_HOME=/models-2.0 +export DP_LOGS=/logs/data-products + +## Job to run daily +cd +# example wfs config +# echo '{"search":{"type":"azure","queries":[{"bucket":"'$bucket'","prefix":"unique-partition/__partition__/","endDate":"'$endDate'","delta":0}]},"model":"org.ekstep.analytics.model.WorkflowSummary","modelParams":{"apiVersion":"v2", "parallelization":200},"output":[{"to":"kafka","params":{"brokerList":"'$brokerList'","topic":"'$topic'"}}],"parallelization":200,"appName":"Workflow Summarizer","deviceMapping":true}' +source model-config.sh +today=$(date "+%Y-%m-%d") + +job_config=$(config $1) +start_partition=$2 +end_partition=$3 + +echo "Starting the job - WFS" >> "$DP_LOGS/$today-job-execution.log" + +$SPARK_HOME/bin/spark-submit --master local[*] --jars $MODELS_HOME/analytics-framework-2.0.jar,$MODELS_HOME/scruid_2.11-2.3.2.jar --class org.ekstep.analytics.job.WFSExecutor $MODELS_HOME/batch-models-2.0.jar --model "$1" --fromPartition $start_partition --toPartition $end_partition --config "$job_config" >> "$DP_LOGS/$today-job-execution.log" + +echo "Job execution completed - $1" >> "$DP_LOGS/$today-job-execution.log" \ No newline at end of file