diff --git a/etlers/WifiToolkit/build.sbt b/etlers/WifiToolkit/build.sbt index 34250a5..552586c 100644 --- a/etlers/WifiToolkit/build.sbt +++ b/etlers/WifiToolkit/build.sbt @@ -12,4 +12,6 @@ libraryDependencies += "joda-time" % "joda-time" % "2.6" libraryDependencies += "org.apache.commons" % "commons-math3" % "3.5" +libraryDependencies += "org.apache.commons" % "commons-lang3" % "3.4" + libraryDependencies += "org.yaml" % "snakeyaml" % "1.15" diff --git a/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/CleanseWifiLogs.scala b/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/CleanseWifiLogs.scala index 929931f..bc3112a 100644 --- a/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/CleanseWifiLogs.scala +++ b/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/CleanseWifiLogs.scala @@ -1,10 +1,8 @@ package cn.edu.sjtu.omnilab.odh.spark import cn.edu.sjtu.omnilab.odh.rawfilter.{WIFILogFilter} -import cn.edu.sjtu.omnilab.odh.utils.Utils import org.apache.spark.{SparkConf, SparkContext} - /** * Cleanse raw WIFI syslog into movement data. */ @@ -27,28 +25,8 @@ object CleanseWifiLogs { val spark = new SparkContext(conf) val inRDD = spark.textFile(input) - .map { m => { - - val filtered = WIFILogFilter.filterData(m) - var cleanLog: CleanWIFILog = null - - if (filtered != null) { - val parts = filtered.split(',') - - cleanLog = CleanWIFILog(MAC = parts(0), - time = Utils.ISOToUnix(parts(1)), - code = parts(2).toInt, - payload = parts(3)) - - } - - cleanLog - - }} + .map { m => WIFILogFilter.filterData(m)} .filter(_ != null) - - .map(m => "%s,%d,%d,%s".format(m.MAC, m.time, m.code, m.payload)) - .saveAsTextFile(output) spark.stop() diff --git a/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/MergeWifiSession.scala b/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/MergeWifiSession.scala index 87ce1b8..aa38b06 100644 --- a/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/MergeWifiSession.scala +++ b/etlers/WifiToolkit/src/main/scala/cn/edu/sjtu/omnilab/odh/spark/MergeWifiSession.scala @@ -1,6 +1,7 @@ package cn.edu.sjtu.omnilab.odh.spark import cn.edu.sjtu.omnilab.odh.rawfilter.{APToBuilding, WIFICode} +import org.apache.commons.lang.StringUtils import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ // to use join etc. @@ -37,18 +38,28 @@ object MergeWifiSession { val inRDD = spark.textFile(input) // extract sessions - val validSessionCodes = List( + val mobilityCodes = List( WIFICode.AuthRequest, WIFICode.AssocRequest, WIFICode.Deauth, - WIFICode.Disassoc + WIFICode.Disassoc, + WIFICode.UserRoam, + WIFICode.NewDev + ) + + val ipAllocCodes = List( + WIFICode.UserAuth, + WIFICode.IPAllocation, + WIFICode.IPRecycle, + WIFICode.UserRoam ) // Load clean Wifi syslog val inputRDD = spark.textFile(input) .map{ filtered => { - val parts = filtered.split(',') + // NOTE: there are more than four fields for UserAuth and UserRoam + val parts = StringUtils.split(filtered, ",", 4) if (parts.length < 4) { null @@ -65,7 +76,7 @@ object MergeWifiSession { // Filter IP sessions from input val ipSession = inputRDD - .filter( m => m.code == WIFICode.IPAllocation || m.code == WIFICode.IPRecycle) + .filter( m => ipAllocCodes.contains(m.code)) .groupBy(_.MAC) .flatMap { case (key, logs) => { collectIPSession(logs) }} .keyBy(_.MAC) @@ -76,7 +87,10 @@ object MergeWifiSession { // associate to that MAC for the whole day. // Otherwise, a session-based association is performed after the join action. val authRDD = inputRDD.filter(m => m.code == WIFICode.UserAuth) - .map( m => CleanWIFILog(MAC=m.MAC, time=m.time, code=m.code, payload=m.payload)) + .map( m => { + val accname = m.payload.split(',')(0) + CleanWIFILog(MAC=m.MAC, time=m.time, code=m.code, payload=accname) + }) .distinct.groupBy(_.MAC) .mapValues{ case (authinfos) => { var accounts: List[String] = List[String]() @@ -90,8 +104,9 @@ object MergeWifiSession { val sessionRDD = inputRDD // extract mobility sessions - .filter(m => validSessionCodes.contains(m.code)).groupBy(_.MAC) - .flatMap { case (key, logs) => { extractSessions(logs) }} + .filter(m => mobilityCodes.contains(m.code)) + .groupBy(_.MAC) + .flatMap { case (key, logs) => { extractSessions_ex(logs) }} // append IP info. to mobility sessions .keyBy(_.MAC) @@ -149,11 +164,11 @@ object MergeWifiSession { .map { case (wsession, ipsession, authinfo) => val buildInfo = apToBuild.parse(wsession.AP) - var bname: String = null - var btype: String = null - var bschool: String = null - var blat: String = null - var blon: String = null + var bname: String = "" + var btype: String = "" + var bschool: String = "" + var blat: String = "" + var blon: String = "" if ( buildInfo != null ) { bname = buildInfo.get(0) btype = buildInfo.get(1) @@ -162,12 +177,12 @@ object MergeWifiSession { blon = buildInfo.get(4) } - var ipaddr: String = null + var ipaddr: String = "" if ( ipsession != null ) { ipaddr = ipsession.IP } - var account: String = null + var account: String = "" if ( authinfo != null ) { account = authinfo.payload } @@ -183,70 +198,76 @@ object MergeWifiSession { /** * Extract WIFI association sessions from certain individual's logs */ - def extractSessions(logs: Iterable[CleanWIFILog]): Iterable[WIFISession] = { + def extractSessions_ex(logs: Iterable[CleanWIFILog]): Iterable[WIFISession] = { - var sessions = new Array[WIFISession](0) - var APMap = new HashMap[String, Long] - var preAP: String = null - var preSession: WIFISession = null - var curSession: WIFISession = null /* - * Use algorithm iterated from previous GWJ's version of SyslogCleanser project. + * Based on previous algorithm used in SyslogCleanser project. */ - logs.toSeq.sortBy(_.time) + var sessions = new Array[WIFISession](0) + var APMap = new HashMap[String, List[Long]] + logs.toSeq.sortBy(_.time) .foreach( log => { - /* - * construct network sessions roughly - * currently, we end a session when AUTH and DEAUTH pair is detected - */ + val mac = log.MAC val time = log.time val code = log.code - val ap = log.payload - - if ( code == WIFICode.AuthRequest || code == WIFICode.AssocRequest) { + val ap = log.payload.split(',')(0) + + // create ip mapping + if ( code == WIFICode.AuthRequest || code == WIFICode.AssocRequest || + code == WIFICode.UserRoam || code == WIFICode.NewDev) { + // start a new mobility session with these msgs. + if ( ! APMap.contains(ap) ) + APMap.put(ap, List(time, time)) + else { + val value = APMap.get(ap).get + if ( time >= value(1) ) { + APMap.put(ap, List(value(0), time)) + } + } + } else if (code == WIFICode.Deauth || code == WIFICode.Disassoc) { + if ( APMap.contains(ap)) { + val value = APMap.get(ap).get + sessions = sessions :+ WIFISession(mac, value(0), time, ap) + APMap.remove(ap) + } + } - if ( ! APMap.contains(ap) || (APMap.contains(ap) && ap != preAP)) - APMap.put(ap, time) + APMap.foreach { case (key, value) => { + sessions = sessions :+ WIFISession(mac, value(0), value(1), key) + }} - } else if (code == WIFICode.Deauth || code == WIFICode.Disassoc) { + }) - if ( APMap.contains(ap) ) { - // record this new session and remove it from APMap - val stime = APMap.get(ap).get - curSession = WIFISession(mac, stime, time, ap) - APMap.remove(ap) + // adjust session timestamps + var preSession: WIFISession = null + var ajustedSessions = new Array[WIFISession](0) - // adjust session timestamps - if ( preSession != null ) { - val tdiff = curSession.stime - preSession.etime - if (tdiff < 0) - preSession = preSession.copy(etime = curSession.stime) - - // merge adjacent sessions under the same AP - if ( preSession.AP == curSession.AP && tdiff < mergeSessionThreshold ) - preSession = preSession.copy(etime = curSession.etime) - else { - sessions = sessions :+ preSession - preSession = curSession - } + sessions.sortBy(m => m.stime).foreach { m => { + if ( preSession != null ) { + val tdiff = m.stime - m.etime + if (tdiff < 0) + preSession = preSession.copy(etime = m.stime) - } else { - preSession = curSession - } - } + // merge adjacent sessions with the same IP + if ( preSession.AP == m.AP && tdiff < mergeSessionThreshold ) + preSession = preSession.copy(etime = m.etime) + else { + ajustedSessions = ajustedSessions :+ preSession + preSession = m } + } else { + preSession = m + } - preAP = ap - - }) + }} if (preSession != null) - sessions = sessions :+ preSession + ajustedSessions = ajustedSessions :+ preSession - sessions.sortBy(_.stime).toIterable + ajustedSessions.sortBy(m => m.stime).toIterable } @@ -262,34 +283,46 @@ object MergeWifiSession { var mac: String = null logs.foreach( log => { + // extract ip info mac = log.MAC val time = log.time val code = log.code - val ip = log.payload - - if ( code == WIFICode.IPAllocation ) { - - if ( ! IPMap.contains(ip) ) - IPMap.put(ip, List(time, time)) - else { - val value = IPMap.get(ip).get - if ( time >= value(1) ) { - // update session ending time - IPMap.put(ip, List(value(0), time)) - } + val ip = code match { + case WIFICode.UserAuth | WIFICode.UserRoam => { + val parts = log.payload.split(',') + val part_ip = parts(1) + if ( ! List("null", "0.0.0.0", "255.255.255.255").contains(part_ip)) + part_ip + else + null } + case WIFICode.IPAllocation | WIFICode.IPRecycle => log.payload + case _ => null + } - } else if (code == WIFICode.IPRecycle) { - - if ( IPMap.contains(ip)) { - val value = IPMap.get(ip).get - // recycle certain IP - sessions = sessions :+ IPSession(mac, value(0), time, ip) - IPMap.remove(ip) - } else { - // omit recycling messages without allocation first + // create ip mapping + if (ip != null){ + if ( code == WIFICode.IPAllocation || + code == WIFICode.UserAuth || + code == WIFICode.UserRoam) { + // start an IP allocation session with these msgs. + if ( ! IPMap.contains(ip) ) + IPMap.put(ip, List(time, time)) + else { + val value = IPMap.get(ip).get + if ( time >= value(1) ) { + // update session ending time + IPMap.put(ip, List(value(0), time)) + } + } + } else if (code == WIFICode.IPRecycle) { + // terminate an IP allocation session with recycle msg. + if ( IPMap.contains(ip)) { + val value = IPMap.get(ip).get + sessions = sessions :+ IPSession(mac, value(0), time, ip) + IPMap.remove(ip) + } } - } }) @@ -325,6 +358,6 @@ object MergeWifiSession { if (preSession != null) ajustedSessions = ajustedSessions :+ preSession - ajustedSessions.toIterable + ajustedSessions.sortBy(m => m.stime).toIterable } } diff --git a/etlers/WifiToolkit/test500010 b/etlers/WifiToolkit/test500010 index 5b682d1..18b9a93 100644 --- a/etlers/WifiToolkit/test500010 +++ b/etlers/WifiToolkit/test500010 @@ -13,4 +13,17 @@ 1449963448803 <141>Dec 13 07:29:21 2015 SJTU-Local3 mobileip[2209]: <500010> Station 24:24:0e:8e:3b:f1, 10.188.112.151: Mobility trail, on switch 10.190.3.1, VLAN 1003, AP LXZL-2F-06, SJTU/6c:f3:7f:36:32:e8/a 1449963449337 <141>Dec 13 07:18:46 2015 SJTU-Local5 mobileip[2161]: <500010> Station 58:44:98:e8:75:ef, : Mobility trail, on switch 10.190.5.1, VLAN 1005, AP DZY-1-1F-08, SJTU/6c:f3:7f:5a:df:c0/g 1449963449358 <141>Dec 13 07:20:32 2015 SJTU-Local2 mobileip[2151]: <500010> Station e4:ce:8f:8f:91:71, : Mobility trail, on switch 10.190.2.1, VLAN 1002, AP JXDLXY-A-Z-8F-06, SJTU/d8:c7:c8:28:f6:10/g -1449963449564 <141>Dec 13 07:18:46 2015 SJTU-Local5 mobileip[2161]: <500010> Station d8:1d:72:b4:07:b6, 10.184.88.74: Mobility trail, on switch 10.190.5.1, VLAN 1005, AP XSFWZX-1F-01, SJTU-Web/6c:f3:7f:34:9a:41/g \ No newline at end of file +1449963449564 <141>Dec 13 07:18:46 2015 SJTU-Local5 mobileip[2161]: <500010> Station d8:1d:72:b4:07:b6, 10.184.88.74: Mobility trail, on switch 10.190.5.1, VLAN 1005, AP XSFWZX-1F-01, SJTU-Web/6c:f3:7f:34:9a:41/g +1449976452638 <141>Dec 13 10:55:28 2015 SJTU-Local5 mobileip[2161]: <500010> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g +1449976479731 <141>Dec 13 10:55:56 2015 SJTU-Local5 mobileip[2161]: <500010> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g +1449976506720 <141>Dec 13 10:56:23 2015 SJTU-Local5 mobileip[2161]: <500010> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g +1449976533751 <141>Dec 13 10:56:50 2015 SJTU-Local5 mobileip[2161]: <500010> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g +1449976560893 <141>Dec 13 10:57:17 2015 SJTU-Local5 mobileip[2161]: <500010> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g +1449976587973 <141>Dec 13 10:57:44 2015 SJTU-Local5 mobileip[2161]: <500010> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g +1449976615320 <141>Dec 13 10:58:11 2015 SJTU-Local5 mobileip[2161]: <500010> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g +1449976642410 <141>Dec 13 10:58:38 2015 SJTU-Local5 mobileip[2161]: <500010> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g +1449976669443 <141>Dec 13 10:59:05 2015 SJTU-Local5 mobileip[2161]: <500010> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g +1449976696496 <141>Dec 13 10:59:32 2015 SJTU-Local5 mobileip[2161]: <500010> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g +1449976723678 <141>Dec 13 10:59:59 2015 SJTU-Local5 mobileip[2161]: <500010> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g +1449976750778 <141>Dec 13 11:00:27 2015 SJTU-Local5 mobileip[2161]: <500010> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g +1449976859947 <141>Dec 13 11:02:16 2015 SJTU-Local5 mobileip[2161]: <500010> Station d0:25:98:3d:99:a9, 10.189.66.125: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:28/a \ No newline at end of file diff --git a/etlers/WifiToolkit/test_http b/etlers/WifiToolkit/test_http new file mode 100644 index 0000000..5117cdc --- /dev/null +++ b/etlers/WifiToolkit/test_http @@ -0,0 +1,7 @@ +10.187.226.191 3323 42.62.61.12 80 unique 1401552002.408494 1401552002.522564 0.031966 0.008017 1401552002.448477 0.000000 1401552002.482449 0.033972 0.000000 0.040115 509 687 "POST" "/sxsvr/requestDownloadXL/" "HTTP/1.1" "ufaapi.kuaipan.cn" "N/A" "N/A" "N/A" "N/A" "HTTP/1.1" "200" "nginx" "N/A" "text/html; charset=utf-8" "N/A" "N/A" "N/A" "N/A" "N/A" "N/A" "keep-alive" "N/A" +10.187.226.191 3325 42.62.61.12 80 unique 1401552002.408525 1401552002.522653 0.031956 0.008469 1401552002.448950 0.000000 1401552002.483935 0.034985 0.000000 0.038718 509 687 "POST" "/sxsvr/requestDownloadXL/" "HTTP/1.1" "ufaapi.kuaipan.cn" "N/A" "N/A" "N/A" "N/A" "HTTP/1.1" "200" "nginx" "N/A" "text/html; charset=utf-8" "N/A" "N/A" "N/A" "N/A" "N/A" "N/A" "keep-alive" "N/A" +10.184.177.211 57955 14.17.43.149 80 unique 1401552002.465187 1401552002.545713 0.034112 0.000785 1401552002.500084 0.000000 1401552002.539457 0.039373 0.000000 0.006256 212 837 "GET" "/cw.html" "HTTP/1.1" "tools.3g.qq.com" "Dalvik/1.6.0 (Linux; U; Android 4.2.2; GT-I9158 Build/JDQ39)" "N/A" "Keep-Alive" "N/A" "HTTP/1.1" "200" "HTTP Load Balancer/2.0" "609" "text/html; charset=UTF-8" "gzip" "N/A" "no-cache" "N/A" "N/A" "N/A" "N/A" "N/A" +10.187.226.191 3327 42.62.61.12 80 unique 1401552002.447871 1401552002.555424 0.032126 0.000486 1401552002.480483 0.000000 1401552002.514625 0.034142 0.000000 0.040799 509 687 "POST" "/sxsvr/requestDownloadXL/" "HTTP/1.1" "ufaapi.kuaipan.cn" "N/A" "N/A" "N/A" "N/A" "HTTP/1.1" "200" "nginx" "N/A" "text/html; charset=utf-8" "N/A" "N/A" "N/A" "N/A" "N/A" "N/A" "keep-alive" "N/A" +10.184.56.209 62759 114.112.68.137 80 unique 1401552002.511238 1401552002.621942 0.044496 0.000081 1401552002.555815 0.000000 1401552002.583079 0.027264 0.000000 0.038863 201 213 "GET" "/safe/msg.pack?pid=h_home&ver=4.7.0.4087&oem=h_home" "HTTP/1.1" "cs.weishi.ijinshan.com" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)" "N/A" "Keep-Alive" "5" "HTTP/1.1" "302" "Apache-Coyote/1.1" "0" "N/A" "N/A" "N/A" "N/A" "N/A" "N/A" "N/A" "close" "N/A" +10.187.226.191 3329 42.62.61.12 80 unique 1401552002.547993 1401552002.650357 0.032397 0.000679 1401552002.581069 0.000000 1401552002.615512 0.034443 0.000000 0.034845 509 687 "POST" "/sxsvr/requestDownloadXL/" "HTTP/1.1" "ufaapi.kuaipan.cn" "N/A" "N/A" "N/A" "N/A" "HTTP/1.1" "200" "nginx" "N/A" "text/html; charset=utf-8" "N/A" "N/A" "N/A" "N/A" "N/A" "N/A" "keep-alive" "N/A" +10.184.57.233 60377 222.199.191.49 80 unique 1401552002.584645 1401552002.688671 0.029316 0.012646 1401552002.626607 0.000000 1401552002.654168 0.027561 0.000000 0.034503 935 245 "GET" "/cpro/ui/cm.js" "HTTP/1.1" "cpro.baidustatic.com" "Mozilla/5.0 (iPhone; CPU iPhone OS 6_1_4 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10B350 Safari/8536.25" "http://m.baidu.com/from=1099b/bd_page_type=1/ssid=0/uid=0/pu=usm%400%2Csz%401320_2001%2Cta%40iphone_1_6.1_3_536/baiduid=1FF55BD373B9DAA836BD4FC0A71282BB/w=0_10_757179796/t=iphone/l=3/tc?ref=www_iphone&lid=3239258061022554973&order=2&vit=osres&tj=www_normal_2_0_10&m=8&srd=1&cltj=cloud_title&dict=21&sec=38919&di=f5be0290a7977b4f&bdenc=1&nsrc=IlPT2AEptyoA_yixCFOxXnANedT62v3IEQGG_8sGRmngyY39hLWxBcJpUnKhVmaH0F4suHO0vt6NgGGcWSZ8jcVOrhhmsn9h8T7buralrMLLCBVdag2lCAeHGnU0tqmg7RlLg2Z9F2MoB7ssov0xwtg_rsXZ82As8sbyeGS6rMe2VY4y2luZnF76PZlNTjKuO3jEa_" "keep-alive" "N/A" "HTTP/1.1" "304" "JSP2/1.0.27" "N/A" "N/A" "N/A" ""5388580e-14ca5"" "max-age=3600" "Fri, 30 May 2014 10:06:06 GMT" "N/A" "Sat, 31 May 2014 16:08:42 GMT" "close" "N/A" \ No newline at end of file