From da1518044d9f3cbd6720f82057969293edbfe29f Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Mon, 1 Jun 2020 12:51:42 +0530 Subject: [PATCH 1/8] Issue #0000 fix: WFS optimisations - rename V3Event case class --- .../analytics/model/WorkFlowSummaryModel.scala | 4 ++-- .../scala/org/ekstep/analytics/util/Summary.scala | 14 +++++++------- .../org/ekstep/analytics/util/TestSummary.scala | 10 +++++----- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala index fddf3ba4..6dbb4cfb 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala @@ -53,11 +53,11 @@ object WorkFlowSummaryModel extends IBatchModelTemplate[String, WorkflowInput, M data.map({ f => var summEvents: Buffer[MeasuredEvent] = Buffer(); - val events = f.events.map(f => JSONUtils.deserialize[V3EventNew](f)) + val events = f.events.map(f => JSONUtils.deserialize[ReducedV3Event](f)) val sortedEvents = events.sortBy { x => x.ets } var rootSummary: org.ekstep.analytics.util.Summary = null var currSummary: org.ekstep.analytics.util.Summary = null - var prevEvent: V3EventNew = sortedEvents.head + var prevEvent: ReducedV3Event = sortedEvents.head sortedEvents.foreach{ x => diff --git a/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala b/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala index 42d3f690..d82aaa6b 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala @@ -8,7 +8,7 @@ import org.ekstep.analytics.framework.util.CommonUtil case class Item(itemId: String, timeSpent: Option[Double], res: Option[Array[String]], resValues: Option[Array[AnyRef]], mc: Option[AnyRef], mmc: Option[AnyRef], score: Int, time_stamp: Long, maxScore: Option[AnyRef], pass: String, qtitle: Option[String], qdesc: Option[String]); -class Summary(val firstEvent: V3EventNew) { +class Summary(val firstEvent: ReducedV3Event) { val defaultPData = V3PData(AppConf.getConfig("default.consumption.app.id"), Option("1.0")) val interactTypes = List("touch", "drag", "drop", "pinch", "zoom", "shake", "rotate", "speak", "listen", "write", "draw", "start", "end", "choose", "activate", "scroll", "click", "edit", "submit", "search", "dnd", "added", "removed", "selected") @@ -28,7 +28,7 @@ class Summary(val firstEvent: V3EventNew) { var interactEventsCount: Long = if(StringUtils.equals("INTERACT", firstEvent.eid) && interactTypes.contains(firstEvent.edata.`type`.toLowerCase)) 1l else 0l var `type`: String = if (null == firstEvent.edata.`type`) "app" else StringUtils.lowerCase(firstEvent.edata.`type`) var mode: Option[String] = if (firstEvent.edata.mode == null) Option("") else Option(firstEvent.edata.mode) - var lastEvent: V3EventNew = null + var lastEvent: ReducedV3Event = null var itemResponses: Buffer[Item] = Buffer[Item]() var endTime: Long = 0l var timeSpent: Double = 0.0 @@ -37,8 +37,8 @@ class Summary(val firstEvent: V3EventNew) { var eventsSummary: Map[String, Long] = Map(firstEvent.eid -> 1) var pageSummary: Iterable[PageSummary] = Iterable[PageSummary]() var prevEventEts: Long = startTime - var lastImpression: V3EventNew = null - var impressionMap: Map[V3EventNew, Double] = Map() + var lastImpression: ReducedV3Event = null + var impressionMap: Map[ReducedV3Event, Double] = Map() var summaryEvents: Buffer[MeasuredEvent] = Buffer() var CHILDREN: Buffer[Summary] = Buffer() @@ -106,7 +106,7 @@ class Summary(val firstEvent: V3EventNew) { this.isClosed = false } - def add(event: V3EventNew, idleTime: Int) { + def add(event: ReducedV3Event, idleTime: Int) { if(this.startTime == 0l) this.startTime = event.ets val ts = CommonUtil.getTimeDiff(prevEventEts, event.ets).get prevEventEts = event.ets @@ -169,7 +169,7 @@ class Summary(val firstEvent: V3EventNew) { } } - def checkEnd(event: V3EventNew, idleTime: Int, config: Map[String, AnyRef]): Summary = { + def checkEnd(event: ReducedV3Event, idleTime: Int, config: Map[String, AnyRef]): Summary = { val mode = if(event.edata.mode == null) "" else event.edata.mode if(StringUtils.equalsIgnoreCase(this.`type`, event.edata.`type`) && StringUtils.equals(this.mode.get, mode)) { if(this.PARENT == null) return this else return PARENT; @@ -184,7 +184,7 @@ class Summary(val firstEvent: V3EventNew) { return summ; } - def getSimilarEndSummary(event: V3EventNew): Summary = { + def getSimilarEndSummary(event: ReducedV3Event): Summary = { val mode = if(event.edata.mode == null) "" else event.edata.mode if(StringUtils.equalsIgnoreCase(this.`type`, event.edata.`type`) && StringUtils.equals(this.mode.get, mode)) { return this; diff --git a/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala b/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala index 77a43b7e..454f0516 100644 --- a/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala +++ b/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala @@ -1,6 +1,6 @@ package org.ekstep.analytics.util -import org.ekstep.analytics.framework.V3EventNew +import org.ekstep.analytics.framework.ReducedV3Event import org.ekstep.analytics.framework.util.JSONUtils import org.ekstep.analytics.model.SparkSpec @@ -9,7 +9,7 @@ class TestSummary extends SparkSpec { it should "create summary without uid, pdata and edata.type" in { val eventStr = "{\"eid\":\"START\",\"ets\":1534595611976,\"ver\":\"3.0\",\"mid\":\"817e25d0-33f5-48a2-8239-db286aaf3bd8\",\"actor\":{\"type\":\"User\"},\"context\":{\"channel\":\"01235953109336064029450\",\"env\":\"home\",\"sid\":\"8f32dbc4-c0d0-4630-9ff6-8c3bce3d15bb\",\"did\":\"a49c706d0402d6db3bb7cb3105cc9e7cf9b2ed7e\",\"cdata\":[]},\"object\":{\"id\":\"do_31250841732493312026783\",\"type\":\"TextBookUnit\",\"rollup\":{\"l1\":\"do_31250841732058316826675\",\"l2\":\"do_31250841732491673626778\",\"l3\":\"do_31250841732493312026783\"}},\"edata\":{\"mode\":\"play\",\"duration\":0,\"pageid\":\"collection-detail\",\"score\":0,\"rating\":0.0,\"index\":0,\"size\":0},\"tags\":[],\"@timestamp\":\"2018-08-19T04:08:41.195Z\"}" - val event = JSONUtils.deserialize[V3EventNew](eventStr) + val event = JSONUtils.deserialize[ReducedV3Event](eventStr) val summary = new Summary(event) summary.uid should be("") @@ -20,7 +20,7 @@ class TestSummary extends SparkSpec { it should "create summary and add ASSESS event" in { val eventStr = "{\"eid\":\"START\",\"ets\":1534651548447,\"ver\":\"3.0\",\"mid\":\"b154b70f-17fb-4c16-80f6-130f5ad479a1\",\"actor\":{\"id\":\"79498cf0-cdfa-41fb-b504-f257a8b40955\",\"type\":\"User\"},\"context\":{\"channel\":\"01235953109336064029450\",\"pdata\":{\"id\":\"prod.sunbird.app\",\"ver\":\"2.0.102\",\"pid\":\"sunbird.app.contentplayer\"},\"env\":\"contentplayer\",\"sid\":\"8f32dbc4-c0d0-4630-9ff6-8c3bce3d15bb\",\"did\":\"a49c706d0402d6db3bb7cb3105cc9e7cf9b2ed7e\",\"cdata\":[{\"id\":\"7f37a89e68bceff05ac34fb1b507ad71\",\"type\":\"ContentSession\"}],\"rollup\":{}},\"object\":{\"id\":\"do_31251316520745369623572\",\"type\":\"Content\",\"ver\":\"1.0\"},\"edata\":{\"type\":\"content\",\"mode\":\"play\",\"duration\":1534651548445,\"pageid\":\"\",\"score\":0,\"rating\":0.0,\"index\":0,\"size\":0},\"tags\":[],\"@timestamp\":\"2018-08-19T04:09:13.259Z\"}" - val event = JSONUtils.deserialize[V3EventNew](eventStr) + val event = JSONUtils.deserialize[ReducedV3Event](eventStr) val summary = new Summary(event) summary.uid should be("79498cf0-cdfa-41fb-b504-f257a8b40955") @@ -28,12 +28,12 @@ class TestSummary extends SparkSpec { summary.mode.get should be("play") val assessStr = "{\"actor\":{\"id\":\"580\",\"type\":\"User\"},\"context\":{\"cdata\":[{\"id\":\"de2115eb13f5113ffce9444cfecd3ab7\",\"type\":\"ContentSession\"}],\"channel\":\"in.sunbird\",\"did\":\"b027147870670bc57de790535311fbe5\",\"env\":\"ContentPlayer\",\"pdata\":{\"id\":\"in.sunbird.dev\",\"pid\":\"sunbird.app.contentplayer\",\"ver\":\"2.0.93\"},\"rollup\":{},\"sid\":\"7op5o46hpi2abkmp8ckihjeq72\"},\"edata\":{\"duration\":25,\"index\":8,\"item\":{\"desc\":\"\",\"exlength\":0,\"id\":\"do_31249422333176217625083\",\"maxscore\":1,\"mc\":[],\"mmc\":[],\"params\":[],\"title\":\"3ZMSUQQ9\",\"uri\":\"\"},\"pass\":\"Yes\",\"score\":1},\"eid\":\"ASSESS\",\"ets\":1515497370223,\"mid\":\"01951ddb-cf6e-4a4a-b740-b5e4fd0e483d\",\"object\":{\"id\":\"do_1122852550749306881159\",\"type\":\"Content\",\"ver\":\"1.0\"},\"tags\":[],\"ver\":\"3.0\",\"@timestamp\":\"2018-06-04T04:59:45.328Z\",\"ts\":\"2018-06-04T04:59:33.379+0000\"}" - val assessEvent = JSONUtils.deserialize[V3EventNew](assessStr) + val assessEvent = JSONUtils.deserialize[ReducedV3Event](assessStr) summary.add(assessEvent, 600) val endStr = "{\"eid\":\"END\",\"ets\":1536574379460,\"ver\":\"3.0\",\"mid\":\"9200c869-d16f-47af-b220-cf97845f1501\",\"actor\":{\"id\":\"a72482ab-320b-43dc-9821-5e6d1c9bf44a\",\"type\":\"User\"},\"context\":{\"channel\":\"01235953109336064029450\",\"pdata\":{\"id\":\"prod.sunbird.app\",\"ver\":\"2.0.106\",\"pid\":\"sunbird.app.contentplayer\"},\"env\":\"contentplayer\",\"sid\":\"b8bc1f1d-22a6-4aa6-aa5e-de3654e80f96\",\"did\":\"1b21a2906e7de0dd66235e7cf9373adb4aaaa104\",\"cdata\":[{\"id\":\"324168fd6623977b924f8552697cfd09\",\"type\":\"ContentSession\"}],\"rollup\":{\"l1\":\"do_31250890252435456027272\",\"l2\":\"do_31250890925065011217170\",\"l3\":\"do_31250890925065830417174\",\"l4\":\"do_31250890925069107217196\"}},\"object\":{\"id\":\"do_31251478823299481625615\",\"type\":\"Content\",\"ver\":\"1.0\"},\"edata\":{\"type\":\"resource\",\"mode\":\"play\",\"duration\":121972,\"pageid\":\"sunbird-player-Endpage\",\"score\":0,\"rating\":0.0,\"summary\":[{\"progress\":100.0}],\"index\":0,\"size\":0},\"tags\":[],\"@timestamp\":\"2018-09-11T05:08:04.311Z\"}" - val endEvent = JSONUtils.deserialize[V3EventNew](endStr) + val endEvent = JSONUtils.deserialize[ReducedV3Event](endStr) summary.checkEnd(endEvent, 600, Map()) summary.getSimilarEndSummary(endEvent) From cf1591207d790b6db6d41374a222537066ac64ae Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Mon, 1 Jun 2020 13:28:20 +0530 Subject: [PATCH 2/8] Issue #0000 fix: WFS optimisations - rename V3Event case class --- .../analytics/model/WorkFlowSummaryModel.scala | 4 ++-- .../scala/org/ekstep/analytics/util/Summary.scala | 14 +++++++------- .../org/ekstep/analytics/util/TestSummary.scala | 10 +++++----- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala index 6dbb4cfb..400b258d 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala @@ -53,11 +53,11 @@ object WorkFlowSummaryModel extends IBatchModelTemplate[String, WorkflowInput, M data.map({ f => var summEvents: Buffer[MeasuredEvent] = Buffer(); - val events = f.events.map(f => JSONUtils.deserialize[ReducedV3Event](f)) + val events = f.events.map(f => JSONUtils.deserialize[WFSInputEvent](f)) val sortedEvents = events.sortBy { x => x.ets } var rootSummary: org.ekstep.analytics.util.Summary = null var currSummary: org.ekstep.analytics.util.Summary = null - var prevEvent: ReducedV3Event = sortedEvents.head + var prevEvent: WFSInputEvent = sortedEvents.head sortedEvents.foreach{ x => diff --git a/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala b/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala index d82aaa6b..cd3dd972 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala @@ -8,7 +8,7 @@ import org.ekstep.analytics.framework.util.CommonUtil case class Item(itemId: String, timeSpent: Option[Double], res: Option[Array[String]], resValues: Option[Array[AnyRef]], mc: Option[AnyRef], mmc: Option[AnyRef], score: Int, time_stamp: Long, maxScore: Option[AnyRef], pass: String, qtitle: Option[String], qdesc: Option[String]); -class Summary(val firstEvent: ReducedV3Event) { +class Summary(val firstEvent: WFSInputEvent) { val defaultPData = V3PData(AppConf.getConfig("default.consumption.app.id"), Option("1.0")) val interactTypes = List("touch", "drag", "drop", "pinch", "zoom", "shake", "rotate", "speak", "listen", "write", "draw", "start", "end", "choose", "activate", "scroll", "click", "edit", "submit", "search", "dnd", "added", "removed", "selected") @@ -28,7 +28,7 @@ class Summary(val firstEvent: ReducedV3Event) { var interactEventsCount: Long = if(StringUtils.equals("INTERACT", firstEvent.eid) && interactTypes.contains(firstEvent.edata.`type`.toLowerCase)) 1l else 0l var `type`: String = if (null == firstEvent.edata.`type`) "app" else StringUtils.lowerCase(firstEvent.edata.`type`) var mode: Option[String] = if (firstEvent.edata.mode == null) Option("") else Option(firstEvent.edata.mode) - var lastEvent: ReducedV3Event = null + var lastEvent: WFSInputEvent = null var itemResponses: Buffer[Item] = Buffer[Item]() var endTime: Long = 0l var timeSpent: Double = 0.0 @@ -37,8 +37,8 @@ class Summary(val firstEvent: ReducedV3Event) { var eventsSummary: Map[String, Long] = Map(firstEvent.eid -> 1) var pageSummary: Iterable[PageSummary] = Iterable[PageSummary]() var prevEventEts: Long = startTime - var lastImpression: ReducedV3Event = null - var impressionMap: Map[ReducedV3Event, Double] = Map() + var lastImpression: WFSInputEvent = null + var impressionMap: Map[WFSInputEvent, Double] = Map() var summaryEvents: Buffer[MeasuredEvent] = Buffer() var CHILDREN: Buffer[Summary] = Buffer() @@ -106,7 +106,7 @@ class Summary(val firstEvent: ReducedV3Event) { this.isClosed = false } - def add(event: ReducedV3Event, idleTime: Int) { + def add(event: WFSInputEvent, idleTime: Int) { if(this.startTime == 0l) this.startTime = event.ets val ts = CommonUtil.getTimeDiff(prevEventEts, event.ets).get prevEventEts = event.ets @@ -169,7 +169,7 @@ class Summary(val firstEvent: ReducedV3Event) { } } - def checkEnd(event: ReducedV3Event, idleTime: Int, config: Map[String, AnyRef]): Summary = { + def checkEnd(event: WFSInputEvent, idleTime: Int, config: Map[String, AnyRef]): Summary = { val mode = if(event.edata.mode == null) "" else event.edata.mode if(StringUtils.equalsIgnoreCase(this.`type`, event.edata.`type`) && StringUtils.equals(this.mode.get, mode)) { if(this.PARENT == null) return this else return PARENT; @@ -184,7 +184,7 @@ class Summary(val firstEvent: ReducedV3Event) { return summ; } - def getSimilarEndSummary(event: ReducedV3Event): Summary = { + def getSimilarEndSummary(event: WFSInputEvent): Summary = { val mode = if(event.edata.mode == null) "" else event.edata.mode if(StringUtils.equalsIgnoreCase(this.`type`, event.edata.`type`) && StringUtils.equals(this.mode.get, mode)) { return this; diff --git a/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala b/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala index 454f0516..fe64f3ae 100644 --- a/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala +++ b/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala @@ -1,6 +1,6 @@ package org.ekstep.analytics.util -import org.ekstep.analytics.framework.ReducedV3Event +import org.ekstep.analytics.framework.WFSInputEvent import org.ekstep.analytics.framework.util.JSONUtils import org.ekstep.analytics.model.SparkSpec @@ -9,7 +9,7 @@ class TestSummary extends SparkSpec { it should "create summary without uid, pdata and edata.type" in { val eventStr = "{\"eid\":\"START\",\"ets\":1534595611976,\"ver\":\"3.0\",\"mid\":\"817e25d0-33f5-48a2-8239-db286aaf3bd8\",\"actor\":{\"type\":\"User\"},\"context\":{\"channel\":\"01235953109336064029450\",\"env\":\"home\",\"sid\":\"8f32dbc4-c0d0-4630-9ff6-8c3bce3d15bb\",\"did\":\"a49c706d0402d6db3bb7cb3105cc9e7cf9b2ed7e\",\"cdata\":[]},\"object\":{\"id\":\"do_31250841732493312026783\",\"type\":\"TextBookUnit\",\"rollup\":{\"l1\":\"do_31250841732058316826675\",\"l2\":\"do_31250841732491673626778\",\"l3\":\"do_31250841732493312026783\"}},\"edata\":{\"mode\":\"play\",\"duration\":0,\"pageid\":\"collection-detail\",\"score\":0,\"rating\":0.0,\"index\":0,\"size\":0},\"tags\":[],\"@timestamp\":\"2018-08-19T04:08:41.195Z\"}" - val event = JSONUtils.deserialize[ReducedV3Event](eventStr) + val event = JSONUtils.deserialize[WFSInputEvent](eventStr) val summary = new Summary(event) summary.uid should be("") @@ -20,7 +20,7 @@ class TestSummary extends SparkSpec { it should "create summary and add ASSESS event" in { val eventStr = "{\"eid\":\"START\",\"ets\":1534651548447,\"ver\":\"3.0\",\"mid\":\"b154b70f-17fb-4c16-80f6-130f5ad479a1\",\"actor\":{\"id\":\"79498cf0-cdfa-41fb-b504-f257a8b40955\",\"type\":\"User\"},\"context\":{\"channel\":\"01235953109336064029450\",\"pdata\":{\"id\":\"prod.sunbird.app\",\"ver\":\"2.0.102\",\"pid\":\"sunbird.app.contentplayer\"},\"env\":\"contentplayer\",\"sid\":\"8f32dbc4-c0d0-4630-9ff6-8c3bce3d15bb\",\"did\":\"a49c706d0402d6db3bb7cb3105cc9e7cf9b2ed7e\",\"cdata\":[{\"id\":\"7f37a89e68bceff05ac34fb1b507ad71\",\"type\":\"ContentSession\"}],\"rollup\":{}},\"object\":{\"id\":\"do_31251316520745369623572\",\"type\":\"Content\",\"ver\":\"1.0\"},\"edata\":{\"type\":\"content\",\"mode\":\"play\",\"duration\":1534651548445,\"pageid\":\"\",\"score\":0,\"rating\":0.0,\"index\":0,\"size\":0},\"tags\":[],\"@timestamp\":\"2018-08-19T04:09:13.259Z\"}" - val event = JSONUtils.deserialize[ReducedV3Event](eventStr) + val event = JSONUtils.deserialize[WFSInputEvent](eventStr) val summary = new Summary(event) summary.uid should be("79498cf0-cdfa-41fb-b504-f257a8b40955") @@ -28,12 +28,12 @@ class TestSummary extends SparkSpec { summary.mode.get should be("play") val assessStr = "{\"actor\":{\"id\":\"580\",\"type\":\"User\"},\"context\":{\"cdata\":[{\"id\":\"de2115eb13f5113ffce9444cfecd3ab7\",\"type\":\"ContentSession\"}],\"channel\":\"in.sunbird\",\"did\":\"b027147870670bc57de790535311fbe5\",\"env\":\"ContentPlayer\",\"pdata\":{\"id\":\"in.sunbird.dev\",\"pid\":\"sunbird.app.contentplayer\",\"ver\":\"2.0.93\"},\"rollup\":{},\"sid\":\"7op5o46hpi2abkmp8ckihjeq72\"},\"edata\":{\"duration\":25,\"index\":8,\"item\":{\"desc\":\"\",\"exlength\":0,\"id\":\"do_31249422333176217625083\",\"maxscore\":1,\"mc\":[],\"mmc\":[],\"params\":[],\"title\":\"3ZMSUQQ9\",\"uri\":\"\"},\"pass\":\"Yes\",\"score\":1},\"eid\":\"ASSESS\",\"ets\":1515497370223,\"mid\":\"01951ddb-cf6e-4a4a-b740-b5e4fd0e483d\",\"object\":{\"id\":\"do_1122852550749306881159\",\"type\":\"Content\",\"ver\":\"1.0\"},\"tags\":[],\"ver\":\"3.0\",\"@timestamp\":\"2018-06-04T04:59:45.328Z\",\"ts\":\"2018-06-04T04:59:33.379+0000\"}" - val assessEvent = JSONUtils.deserialize[ReducedV3Event](assessStr) + val assessEvent = JSONUtils.deserialize[WFSInputEvent](assessStr) summary.add(assessEvent, 600) val endStr = "{\"eid\":\"END\",\"ets\":1536574379460,\"ver\":\"3.0\",\"mid\":\"9200c869-d16f-47af-b220-cf97845f1501\",\"actor\":{\"id\":\"a72482ab-320b-43dc-9821-5e6d1c9bf44a\",\"type\":\"User\"},\"context\":{\"channel\":\"01235953109336064029450\",\"pdata\":{\"id\":\"prod.sunbird.app\",\"ver\":\"2.0.106\",\"pid\":\"sunbird.app.contentplayer\"},\"env\":\"contentplayer\",\"sid\":\"b8bc1f1d-22a6-4aa6-aa5e-de3654e80f96\",\"did\":\"1b21a2906e7de0dd66235e7cf9373adb4aaaa104\",\"cdata\":[{\"id\":\"324168fd6623977b924f8552697cfd09\",\"type\":\"ContentSession\"}],\"rollup\":{\"l1\":\"do_31250890252435456027272\",\"l2\":\"do_31250890925065011217170\",\"l3\":\"do_31250890925065830417174\",\"l4\":\"do_31250890925069107217196\"}},\"object\":{\"id\":\"do_31251478823299481625615\",\"type\":\"Content\",\"ver\":\"1.0\"},\"edata\":{\"type\":\"resource\",\"mode\":\"play\",\"duration\":121972,\"pageid\":\"sunbird-player-Endpage\",\"score\":0,\"rating\":0.0,\"summary\":[{\"progress\":100.0}],\"index\":0,\"size\":0},\"tags\":[],\"@timestamp\":\"2018-09-11T05:08:04.311Z\"}" - val endEvent = JSONUtils.deserialize[ReducedV3Event](endStr) + val endEvent = JSONUtils.deserialize[WFSInputEvent](endStr) summary.checkEnd(endEvent, 600, Map()) summary.getSimilarEndSummary(endEvent) From 32057b31f15a992dd914a68ce12424120e7e1fbd Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Mon, 1 Jun 2020 13:42:21 +0530 Subject: [PATCH 3/8] Issue #0000 fix: WFS optimisations - Add WFSInputEvent case class --- .../org/ekstep/analytics/model/WorkFlowSummaryModel.scala | 8 ++++++++ .../main/scala/org/ekstep/analytics/util/Summary.scala | 8 +++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala index 400b258d..ae758cea 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/model/WorkFlowSummaryModel.scala @@ -1,5 +1,7 @@ package org.ekstep.analytics.model +import java.io.Serializable + import org.ekstep.analytics.framework.IBatchModelTemplate import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -16,6 +18,12 @@ import org.ekstep.analytics.framework.util.JobLogger import org.ekstep.analytics.framework.conf.AppConf import org.ekstep.analytics.framework._ +@scala.beans.BeanInfo +class WFSInputEvent(val eid: String, val ets: Long, val `@timestamp`: String, val ver: String, val mid: String, val actor: Actor, val context: V3Context, val `object`: Option[V3Object], val edata: WFSInputEData, val tags: List[AnyRef] = null) extends AlgoInput with Input {} +@scala.beans.BeanInfo +class WFSInputEData(val `type`: String, val mode: String, val duration: Long, val pageid: String, val item: Question, + val resvalues: Array[Map[String, AnyRef]], val pass: String, val score: Int) extends Serializable {} + case class WorkflowInput(sessionKey: WorkflowIndex, events: Buffer[String]) extends AlgoInput case class WorkflowOutput(index: WorkflowIndex, summaries: Buffer[MeasuredEvent]) extends AlgoOutput case class WorkflowIndex(did: String, channel: String, pdataId: String) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala b/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala index cd3dd972..a5e9ab93 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala @@ -1,6 +1,7 @@ package org.ekstep.analytics.util import org.ekstep.analytics.framework._ + import scala.collection.mutable.Buffer import org.apache.commons.lang3.StringUtils import org.ekstep.analytics.framework.conf.AppConf @@ -220,7 +221,7 @@ class Summary(val firstEvent: WFSInputEvent) { val interactEventsPerMin: Double = if (this.interactEventsCount == 0 || this.timeSpent == 0) 0d else if (this.timeSpent < 60.0) this.interactEventsCount.toDouble else BigDecimal(this.interactEventsCount / (this.timeSpent / 60)).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble; - val syncts = CommonUtil.getEventSyncTS(if(this.lastEvent == null) this.firstEvent else this.lastEvent) + val syncts = getEventSyncTS(if(this.lastEvent == null) this.firstEvent else this.lastEvent) val eventsSummary = this.eventsSummary.map(f => EventSummary(f._1, f._2.toInt)) val measures = Map("start_time" -> this.startTime, "end_time" -> this.endTime, @@ -267,4 +268,9 @@ class Summary(val firstEvent: WFSInputEvent) { } else Iterable[EnvSummary]() } + def getEventSyncTS(event: WFSInputEvent): Long = { + val timeInString = event.`@timestamp`; + CommonUtil.getEventSyncTS(timeInString); + } + } \ No newline at end of file From 7f2415a6c7a257e5a15af029628722ce94fff310 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Mon, 1 Jun 2020 13:43:13 +0530 Subject: [PATCH 4/8] Issue #0000 fix: WFS optimisations - Add WFSInputEvent case class --- .../src/main/scala/org/ekstep/analytics/util/Summary.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala b/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala index a5e9ab93..3362909a 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/util/Summary.scala @@ -6,6 +6,7 @@ import scala.collection.mutable.Buffer import org.apache.commons.lang3.StringUtils import org.ekstep.analytics.framework.conf.AppConf import org.ekstep.analytics.framework.util.CommonUtil +import org.ekstep.analytics.model.WFSInputEvent case class Item(itemId: String, timeSpent: Option[Double], res: Option[Array[String]], resValues: Option[Array[AnyRef]], mc: Option[AnyRef], mmc: Option[AnyRef], score: Int, time_stamp: Long, maxScore: Option[AnyRef], pass: String, qtitle: Option[String], qdesc: Option[String]); From 545efa4d81670369382cf708dc206a6856127a71 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Mon, 1 Jun 2020 13:45:18 +0530 Subject: [PATCH 5/8] Issue #0000 fix: WFS optimisations - Add WFSInputEvent case class --- .../src/test/scala/org/ekstep/analytics/util/TestSummary.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala b/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala index fe64f3ae..1b393771 100644 --- a/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala +++ b/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala @@ -1,8 +1,7 @@ package org.ekstep.analytics.util -import org.ekstep.analytics.framework.WFSInputEvent import org.ekstep.analytics.framework.util.JSONUtils -import org.ekstep.analytics.model.SparkSpec +import org.ekstep.analytics.model.{SparkSpec, WFSInputEvent} class TestSummary extends SparkSpec { From b26830f875a2016f8d0a26f25e71e6b86203978c Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Mon, 1 Jun 2020 14:25:30 +0530 Subject: [PATCH 6/8] Issue #0000 fix: WFS optimisations - Dummy commit --- .../src/test/scala/org/ekstep/analytics/util/TestSummary.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala b/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala index 1b393771..307ae79c 100644 --- a/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala +++ b/batch-models/src/test/scala/org/ekstep/analytics/util/TestSummary.scala @@ -6,7 +6,6 @@ import org.ekstep.analytics.model.{SparkSpec, WFSInputEvent} class TestSummary extends SparkSpec { it should "create summary without uid, pdata and edata.type" in { - val eventStr = "{\"eid\":\"START\",\"ets\":1534595611976,\"ver\":\"3.0\",\"mid\":\"817e25d0-33f5-48a2-8239-db286aaf3bd8\",\"actor\":{\"type\":\"User\"},\"context\":{\"channel\":\"01235953109336064029450\",\"env\":\"home\",\"sid\":\"8f32dbc4-c0d0-4630-9ff6-8c3bce3d15bb\",\"did\":\"a49c706d0402d6db3bb7cb3105cc9e7cf9b2ed7e\",\"cdata\":[]},\"object\":{\"id\":\"do_31250841732493312026783\",\"type\":\"TextBookUnit\",\"rollup\":{\"l1\":\"do_31250841732058316826675\",\"l2\":\"do_31250841732491673626778\",\"l3\":\"do_31250841732493312026783\"}},\"edata\":{\"mode\":\"play\",\"duration\":0,\"pageid\":\"collection-detail\",\"score\":0,\"rating\":0.0,\"index\":0,\"size\":0},\"tags\":[],\"@timestamp\":\"2018-08-19T04:08:41.195Z\"}" val event = JSONUtils.deserialize[WFSInputEvent](eventStr) val summary = new Summary(event) From 72daa0fdfe6bb1ce0068a4a9946e2de4ced8c743 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Mon, 1 Jun 2020 15:52:02 +0530 Subject: [PATCH 7/8] Issue #0000 fix: WFS optimisations - parallelization by partition --- .../ekstep/analytics/job/WFSExecutor.scala | 58 +++++++++++++++++++ scripts/shell/local/run-wfs.sh | 22 +++++++ 2 files changed, 80 insertions(+) create mode 100644 batch-models/src/main/scala/org/ekstep/analytics/job/WFSExecutor.scala create mode 100644 scripts/shell/local/run-wfs.sh diff --git a/batch-models/src/main/scala/org/ekstep/analytics/job/WFSExecutor.scala b/batch-models/src/main/scala/org/ekstep/analytics/job/WFSExecutor.scala new file mode 100644 index 00000000..4e2eaa5d --- /dev/null +++ b/batch-models/src/main/scala/org/ekstep/analytics/job/WFSExecutor.scala @@ -0,0 +1,58 @@ +package org.ekstep.analytics.job + +import optional.Application +import org.apache.spark.SparkContext +import org.ekstep.analytics.framework.{JobConfig, JobContext} +import org.ekstep.analytics.framework.Level.ERROR +import org.ekstep.analytics.framework.exception.{DataFetcherException, JobNotFoundException} +import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger} + +object WFSExecutor extends Application { + + implicit val className = "org.ekstep.analytics.job.WFSExecutor" + + def main(model: String, fromPartition: String, toPartition: String, config: String) { + + JobLogger.start("Started executing WFSExecutor", Option(Map("config" -> config, "model" -> model, "fromPartition" -> fromPartition, "toPartition" -> toPartition))) + val con = JSONUtils.deserialize[JobConfig](config) + val sc = CommonUtil.getSparkContext(JobContext.parallelization, con.appName.getOrElse(con.model)); + try { + val result = CommonUtil.time({ + execute(model, fromPartition.toInt, toPartition.toInt, config)(sc); + }) + JobLogger.end("WFSExecutor completed", "SUCCESS", Option(Map("timeTaken" -> result._1))); + } catch { + case ex: Exception => + JobLogger.log(ex.getMessage, None, ERROR); + JobLogger.end("WFSExecutor failed", "FAILED") + throw ex + } finally { + CommonUtil.closeSparkContext()(sc); + } + } + + def execute(model: String, fromPartition: Int, toPartition: Int, config: String)(implicit sc: SparkContext) { + + val range = fromPartition to toPartition + range.toList.map { partition => + try { + val jobConfig = config.replace("__partition__", partition.toString) + val job = JobFactory.getJob(model); + JobLogger.log("### Executing wfs for the partition - " + partition + " ###") + job.main(jobConfig)(Option(sc)); + } catch { + case ex: DataFetcherException => { + JobLogger.log(ex.getMessage, Option(Map("model_code" -> model, "partition" -> partition)), ERROR) + } + case ex: JobNotFoundException => { + JobLogger.log(ex.getMessage, Option(Map("model_code" -> model, "partition" -> partition)), ERROR) + throw ex; + } + case ex: Exception => { + JobLogger.log(ex.getMessage, Option(Map("model_code" -> model, "partition" -> partition)), ERROR) + ex.printStackTrace() + } + } + } + } +} diff --git a/scripts/shell/local/run-wfs.sh b/scripts/shell/local/run-wfs.sh new file mode 100644 index 00000000..e6c0b9be --- /dev/null +++ b/scripts/shell/local/run-wfs.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +export SPARK_HOME=/spark-2.4.4-bin-hadoop2.7 +export MODELS_HOME=/models-2.0 +export DP_LOGS=/logs/data-products + +## Job to run daily +cd +# example wfs config +# echo '{"search":{"type":"azure","queries":[{"bucket":"'$bucket'","prefix":"unique-partition/__partition__/","endDate":"'$endDate'","delta":0}]},"model":"org.ekstep.analytics.model.WorkflowSummary","modelParams":{"apiVersion":"v2", "parallelization":200},"output":[{"to":"kafka","params":{"brokerList":"'$brokerList'","topic":"'$topic'"}}],"parallelization":200,"appName":"Workflow Summarizer","deviceMapping":true}' +source model-config.sh +today=$(date "+%Y-%m-%d") + +job_config=$(config $1) +start_partition=$2 +end_partition=$3 + +echo "Starting the job - WFS" >> "$DP_LOGS/$today-job-execution.log" + +$SPARK_HOME/bin/spark-submit --master local[*] --jars $MODELS_HOME/analytics-framework-2.0.jar,$MODELS_HOME/scruid_2.11-2.3.2.jar --class org.ekstep.analytics.job.WFSExecutor $MODELS_HOME/batch-models-2.0.jar --model "$1" --fromPartition $start_partition --toPartition $end_partition --config "$job_config" >> "$DP_LOGS/$today-job-execution.log" + +echo "Job execution completed - $1" >> "$DP_LOGS/$today-job-execution.log" \ No newline at end of file From abac6122039c1139d77c9274890112a5ac55bbd5 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Thu, 4 Jun 2020 11:25:59 +0530 Subject: [PATCH 8/8] Issue #0000 fix: WFS optimisations - parallelization by partition --- .../src/main/scala/org/ekstep/analytics/job/WFSExecutor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/job/WFSExecutor.scala b/batch-models/src/main/scala/org/ekstep/analytics/job/WFSExecutor.scala index 4e2eaa5d..b18bcade 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/job/WFSExecutor.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/job/WFSExecutor.scala @@ -15,7 +15,7 @@ object WFSExecutor extends Application { JobLogger.start("Started executing WFSExecutor", Option(Map("config" -> config, "model" -> model, "fromPartition" -> fromPartition, "toPartition" -> toPartition))) val con = JSONUtils.deserialize[JobConfig](config) - val sc = CommonUtil.getSparkContext(JobContext.parallelization, con.appName.getOrElse(con.model)); + val sc = CommonUtil.getSparkContext(con.parallelization.getOrElse(200), con.appName.getOrElse(con.model)); try { val result = CommonUtil.time({ execute(model, fromPartition.toInt, toPartition.toInt, config)(sc);