diff --git a/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/MergeWifiSession.scala b/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/MergeWifiSession.scala index aa38b06..3d7c954 100644 --- a/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/MergeWifiSession.scala +++ b/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/MergeWifiSession.scala @@ -191,6 +191,7 @@ object MergeWifiSession { .format(wsession.MAC, wsession.stime, wsession.etime, wsession.AP, bname, btype, bschool, blat, blon, ipaddr, account) } + .sortBy(m => m) .saveAsTextFile(output) } @@ -206,11 +207,13 @@ object MergeWifiSession { */ var sessions = new Array[WIFISession](0) var APMap = new HashMap[String, List[Long]] + var preAP: String = null + var mac: String = null logs.toSeq.sortBy(_.time) .foreach( log => { - val mac = log.MAC + mac = log.MAC val time = log.time val code = log.code val ap = log.payload.split(',')(0) @@ -223,8 +226,15 @@ object MergeWifiSession { APMap.put(ap, List(time, time)) else { val value = APMap.get(ap).get - if ( time >= value(1) ) { - APMap.put(ap, List(value(0), time)) + if (preAP == ap) { + // update ending time + if ( time >= value(1) ) { + APMap.put(ap, List(value(0), time)) + } + } else { + // create new session + sessions = sessions :+ WIFISession(mac, value(0), value(1), ap) + APMap.put(ap, List(time, time)) } } } else if (code == WIFICode.Deauth || code == WIFICode.Disassoc) { @@ -235,19 +245,23 @@ object MergeWifiSession { } } - APMap.foreach { case (key, value) => { - sessions = sessions :+ WIFISession(mac, value(0), value(1), key) - }} + preAP = ap }) + if ( mac != null ){ + APMap.foreach { case (key, value) => { + sessions = sessions :+ WIFISession(mac, value(0), value(1), key) + }} + } + // adjust session timestamps var preSession: WIFISession = null var ajustedSessions = new Array[WIFISession](0) sessions.sortBy(m => m.stime).foreach { m => { if ( preSession != null ) { - val tdiff = m.stime - m.etime + val tdiff = m.stime - preSession.etime if (tdiff < 0) preSession = preSession.copy(etime = m.stime) @@ -338,7 +352,7 @@ object MergeWifiSession { sessions.sortBy(m => m.stime).foreach { m => { if ( preSession != null ) { - val tdiff = m.stime - m.etime + val tdiff = m.stime - preSession.etime if (tdiff < 0) preSession = preSession.copy(etime = m.stime)