From 23fdb55fe4c753ed5bb95883af60ccf8c0a6daa1 Mon Sep 17 00:00:00 2001 From: Xiaming Chen Date: Mon, 14 Dec 2015 22:21:47 +0800 Subject: [PATCH] Add account and ip info to WifiSyslogSession --- .../sjtu/omnilab/odh/spark/IPSession.scala | 4 + .../omnilab/odh/spark/MergeWifiSession.scala | 185 +++++++++++++++++- porters/wifi_syslog.sh | 2 +- porters/wifi_syslog_session.sh | 2 +- porters/wifi_traffic_http.sh | 2 +- 5 files changed, 183 insertions(+), 12 deletions(-) create mode 100644 etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/IPSession.scala diff --git a/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/IPSession.scala b/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/IPSession.scala new file mode 100644 index 0000000..be42118 --- /dev/null +++ b/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/IPSession.scala @@ -0,0 +1,4 @@ +package cn.edu.sjtu.omnilab.odh.spark + + +case class IPSession(MAC: String, stime: Long, etime: Long, IP: String) diff --git a/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/MergeWifiSession.scala b/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/MergeWifiSession.scala index 53bab81..87ce1b8 100644 --- a/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/MergeWifiSession.scala +++ b/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/MergeWifiSession.scala @@ -2,6 +2,7 @@ package cn.edu.sjtu.omnilab.odh.spark import cn.edu.sjtu.omnilab.odh.rawfilter.{APToBuilding, WIFICode} import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.SparkContext._ // to use join etc. import scala.collection.mutable.HashMap @@ -43,7 +44,8 @@ object MergeWifiSession { WIFICode.Disassoc ) - spark.textFile(input) + // Load clean Wifi syslog + val inputRDD = spark.textFile(input) .map{ filtered => { val parts = filtered.split(',') @@ -59,16 +61,93 @@ object MergeWifiSession { payload = parts(3) ) - }}} - - .filter(m => m != null && validSessionCodes.contains(m.code)) + }}}.filter(m => m != null) + // Filter IP sessions from input + val ipSession = inputRDD + .filter( m => m.code == WIFICode.IPAllocation || m.code == WIFICode.IPRecycle) .groupBy(_.MAC) + .flatMap { case (key, logs) => { collectIPSession(logs) }} + .keyBy(_.MAC) + .cache + + // Filter Auth info. from input + // If an account is detected only once for a MAC address, this account is + // associate to that MAC for the whole day. + // Otherwise, a session-based association is performed after the join action. + val authRDD = inputRDD.filter(m => m.code == WIFICode.UserAuth) + .map( m => CleanWIFILog(MAC=m.MAC, time=m.time, code=m.code, payload=m.payload)) + .distinct.groupBy(_.MAC) + .mapValues{ case (authinfos) => { + var accounts: List[String] = List[String]() + authinfos.foreach(accounts ::= _.payload) + + if (accounts.distinct.length == 1) { + authinfos.toList.take(1) + } else { + authinfos.toList + }}} + val sessionRDD = inputRDD + // extract mobility sessions + .filter(m => validSessionCodes.contains(m.code)).groupBy(_.MAC) .flatMap { case (key, logs) => { extractSessions(logs) }} - .map { m => - val buildInfo = apToBuild.parse(m.AP) + // append IP info. to mobility sessions + .keyBy(_.MAC) + .leftOuterJoin(ipSession) + .map { case (key, (wsession, ipsession)) => (wsession, ipsession.getOrElse(null))} + .groupBy(_._1) + .map{ case (wsession, ipsessions) => { + + // Determine IP by comparing the length of overlap for Wifi session and IP session. + // The maximum temporal overlap corresponds to the right IP address. + var result: IPSession = null + var timeSpan: Long = 0 + + ipsessions.foreach{ case (wsession, ipsession) => { + var tspan: Long = 0 + + if (ipsession != null) { + val stime: Long = Math.max(ipsession.stime, wsession.stime) + // We ARTIFICALLY add 15 mins more to tolerate data loss + val etime: Long = Math.min(ipsession.etime + 15 * 3600 * 1000, wsession.etime) + tspan = etime - stime + } + + if ( tspan > timeSpan ) { + timeSpan = tspan + result = ipsession + } + }} + + (wsession, result) }} + + // append account info. to mobility sessions + .keyBy(_._1.MAC) + .leftOuterJoin(authRDD) + .map { case (key, ((wsession, ipsession), authinfo)) => { + val authinfos = authinfo.getOrElse(null) + + if ( authinfos != null ) { + if (authinfos.size == 1) { + (wsession, ipsession, authinfos(0)) + } else { + // A session-based association is performed + var result: CleanWIFILog = null + authinfos.foreach( auth => { + if ( auth.time >= wsession.stime && auth.time <= wsession.etime ) { + result = auth + } + }) + (wsession, ipsession, result) + } + } else { + (wsession, ipsession, null) + } }} + + .map { case (wsession, ipsession, authinfo) => + val buildInfo = apToBuild.parse(wsession.AP) var bname: String = null var btype: String = null @@ -83,10 +162,20 @@ object MergeWifiSession { blon = buildInfo.get(4) } - "%s,%d,%d,%s,%s,%s,%s,%s,%s" - .format(m.MAC, m.stime, m.etime, m.AP, bname, btype, bschool, blat, blon) - } + var ipaddr: String = null + if ( ipsession != null ) { + ipaddr = ipsession.IP + } + var account: String = null + if ( authinfo != null ) { + account = authinfo.payload + } + + "%s,%d,%d,%s,%s,%s,%s,%s,%s,%s,%s" + .format(wsession.MAC, wsession.stime, wsession.etime, wsession.AP, + bname, btype, bschool, blat, blon, ipaddr, account) + } .saveAsTextFile(output) } @@ -160,4 +249,82 @@ object MergeWifiSession { sessions.sortBy(_.stime).toIterable } + + + /** + * Construct IP sessions identified by IPALLOC and IPRECYCLE messages + * @param logs + * @return + */ + def collectIPSession(logs: Iterable[CleanWIFILog]): Iterable[IPSession] = { + var sessions = new Array[IPSession](0) + var IPMap = new HashMap[String, List[Long]] + var mac: String = null + + logs.foreach( log => { + mac = log.MAC + val time = log.time + val code = log.code + val ip = log.payload + + if ( code == WIFICode.IPAllocation ) { + + if ( ! IPMap.contains(ip) ) + IPMap.put(ip, List(time, time)) + else { + val value = IPMap.get(ip).get + if ( time >= value(1) ) { + // update session ending time + IPMap.put(ip, List(value(0), time)) + } + } + + } else if (code == WIFICode.IPRecycle) { + + if ( IPMap.contains(ip)) { + val value = IPMap.get(ip).get + // recycle certain IP + sessions = sessions :+ IPSession(mac, value(0), time, ip) + IPMap.remove(ip) + } else { + // omit recycling messages without allocation first + } + + } + }) + + if ( mac != null ){ + IPMap.foreach { case (key, value) => { + sessions = sessions :+ IPSession(mac, value(0), value(1), key) + }} + } + + // adjust session timestamps + var preSession: IPSession = null + var ajustedSessions = new Array[IPSession](0) + sessions.sortBy(m => m.stime).foreach { m => { + + if ( preSession != null ) { + val tdiff = m.stime - m.etime + if (tdiff < 0) + preSession = preSession.copy(etime = m.stime) + + // merge adjacent sessions with the same IP + if ( preSession.IP == m.IP && tdiff < mergeSessionThreshold ) + preSession = preSession.copy(etime = m.etime) + else { + ajustedSessions = ajustedSessions :+ preSession + preSession = m + } + } else { + preSession = m + } + + }} + + if (preSession != null) + ajustedSessions = ajustedSessions :+ preSession + + ajustedSessions.toIterable + } } diff --git a/porters/wifi_syslog.sh b/porters/wifi_syslog.sh index 5edf1e0..30cff52 100755 --- a/porters/wifi_syslog.sh +++ b/porters/wifi_syslog.sh @@ -41,7 +41,7 @@ hadoop fs -rm -r $TEMPWP hadoop fs -mkdir -p $TEMPWP TARGET=$(date -d "yesterday" '+%Y-%m-%d') -if [ $1 != "" ]; then +if [ XXOO$1 != "XXOO" ]; then TARGET=$1 fi diff --git a/porters/wifi_syslog_session.sh b/porters/wifi_syslog_session.sh index e481d95..7c76387 100755 --- a/porters/wifi_syslog_session.sh +++ b/porters/wifi_syslog_session.sh @@ -36,7 +36,7 @@ if ! hadoop fs -test -d $HDFS_WIFI_SYSLOG_SESSION; then fi TARGET=$(date -d "yesterday" '+%Y-%m-%d') -if [ $1 != "" ]; then +if [ XXOO$1 != "XXOO" ]; then TARGET=$1 fi diff --git a/porters/wifi_traffic_http.sh b/porters/wifi_traffic_http.sh index 64fd1b7..596a151 100755 --- a/porters/wifi_traffic_http.sh +++ b/porters/wifi_traffic_http.sh @@ -32,7 +32,7 @@ year=`date -d "yesterday" "+%Y"` month=`date -d "yesterday" "+%m"` day=`date -d "yesterday" "+%d"` -if [ $1 != "" ]; then +if [ XXOO$1 != "XXOO" ]; then year=`echo $1 | cut -d'-' -f1` month=`echo $1 | cut -d'-' -f2` day=`echo $1 | cut -d'-' -f3`