Skip to content

Commit

Permalink
Update the mobility session algorithm to recognize UserRoam msg.
Browse files Browse the repository at this point in the history
  • Loading branch information
caesar0301 committed Dec 25, 2015
1 parent 8f2c1fd commit eddf4c8
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 106 deletions.
2 changes: 2 additions & 0 deletions etlers/WifiToolkit/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ libraryDependencies += "joda-time" % "joda-time" % "2.6"

libraryDependencies += "org.apache.commons" % "commons-math3" % "3.5"

libraryDependencies += "org.apache.commons" % "commons-lang3" % "3.4"

libraryDependencies += "org.yaml" % "snakeyaml" % "1.15"
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package cn.edu.sjtu.omnilab.odh.spark

import cn.edu.sjtu.omnilab.odh.rawfilter.{WIFILogFilter}
import cn.edu.sjtu.omnilab.odh.utils.Utils
import org.apache.spark.{SparkConf, SparkContext}


/**
* Cleanse raw WIFI syslog into movement data.
*/
Expand All @@ -27,28 +25,8 @@ object CleanseWifiLogs {
val spark = new SparkContext(conf)

val inRDD = spark.textFile(input)
.map { m => {

val filtered = WIFILogFilter.filterData(m)
var cleanLog: CleanWIFILog = null

if (filtered != null) {
val parts = filtered.split(',')

cleanLog = CleanWIFILog(MAC = parts(0),
time = Utils.ISOToUnix(parts(1)),
code = parts(2).toInt,
payload = parts(3))

}

cleanLog

}}
.map { m => WIFILogFilter.filterData(m)}
.filter(_ != null)

.map(m => "%s,%d,%d,%s".format(m.MAC, m.time, m.code, m.payload))

.saveAsTextFile(output)

spark.stop()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cn.edu.sjtu.omnilab.odh.spark

import cn.edu.sjtu.omnilab.odh.rawfilter.{APToBuilding, WIFICode}
import org.apache.commons.lang.StringUtils
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._ // to use join etc.

Expand Down Expand Up @@ -37,18 +38,28 @@ object MergeWifiSession {
val inRDD = spark.textFile(input)

// extract sessions
val validSessionCodes = List(
val mobilityCodes = List(
WIFICode.AuthRequest,
WIFICode.AssocRequest,
WIFICode.Deauth,
WIFICode.Disassoc
WIFICode.Disassoc,
WIFICode.UserRoam,
WIFICode.NewDev
)

val ipAllocCodes = List(
WIFICode.UserAuth,
WIFICode.IPAllocation,
WIFICode.IPRecycle,
WIFICode.UserRoam
)

// Load clean Wifi syslog
val inputRDD = spark.textFile(input)

.map{ filtered => {
val parts = filtered.split(',')
// NOTE: there are more than four fields for UserAuth and UserRoam
val parts = StringUtils.split(filtered, ",", 4)

if (parts.length < 4) {
null
Expand All @@ -65,7 +76,7 @@ object MergeWifiSession {

// Filter IP sessions from input
val ipSession = inputRDD
.filter( m => m.code == WIFICode.IPAllocation || m.code == WIFICode.IPRecycle)
.filter( m => ipAllocCodes.contains(m.code))
.groupBy(_.MAC)
.flatMap { case (key, logs) => { collectIPSession(logs) }}
.keyBy(_.MAC)
Expand All @@ -76,7 +87,10 @@ object MergeWifiSession {
// associate to that MAC for the whole day.
// Otherwise, a session-based association is performed after the join action.
val authRDD = inputRDD.filter(m => m.code == WIFICode.UserAuth)
.map( m => CleanWIFILog(MAC=m.MAC, time=m.time, code=m.code, payload=m.payload))
.map( m => {
val accname = m.payload.split(',')(0)
CleanWIFILog(MAC=m.MAC, time=m.time, code=m.code, payload=accname)
})
.distinct.groupBy(_.MAC)
.mapValues{ case (authinfos) => {
var accounts: List[String] = List[String]()
Expand All @@ -90,8 +104,9 @@ object MergeWifiSession {

val sessionRDD = inputRDD
// extract mobility sessions
.filter(m => validSessionCodes.contains(m.code)).groupBy(_.MAC)
.flatMap { case (key, logs) => { extractSessions(logs) }}
.filter(m => mobilityCodes.contains(m.code))
.groupBy(_.MAC)
.flatMap { case (key, logs) => { extractSessions_ex(logs) }}

// append IP info. to mobility sessions
.keyBy(_.MAC)
Expand Down Expand Up @@ -149,11 +164,11 @@ object MergeWifiSession {
.map { case (wsession, ipsession, authinfo) =>
val buildInfo = apToBuild.parse(wsession.AP)

var bname: String = null
var btype: String = null
var bschool: String = null
var blat: String = null
var blon: String = null
var bname: String = ""
var btype: String = ""
var bschool: String = ""
var blat: String = ""
var blon: String = ""
if ( buildInfo != null ) {
bname = buildInfo.get(0)
btype = buildInfo.get(1)
Expand All @@ -162,12 +177,12 @@ object MergeWifiSession {
blon = buildInfo.get(4)
}

var ipaddr: String = null
var ipaddr: String = ""
if ( ipsession != null ) {
ipaddr = ipsession.IP
}

var account: String = null
var account: String = ""
if ( authinfo != null ) {
account = authinfo.payload
}
Expand All @@ -183,70 +198,76 @@ object MergeWifiSession {
/**
* Extract WIFI association sessions from certain individual's logs
*/
def extractSessions(logs: Iterable[CleanWIFILog]): Iterable[WIFISession] = {
def extractSessions_ex(logs: Iterable[CleanWIFILog]): Iterable[WIFISession] = {

var sessions = new Array[WIFISession](0)
var APMap = new HashMap[String, Long]
var preAP: String = null
var preSession: WIFISession = null
var curSession: WIFISession = null

/*
* Use algorithm iterated from previous GWJ's version of SyslogCleanser project.
* Based on previous algorithm used in SyslogCleanser project.
*/
logs.toSeq.sortBy(_.time)
var sessions = new Array[WIFISession](0)
var APMap = new HashMap[String, List[Long]]

logs.toSeq.sortBy(_.time)
.foreach( log => {
/*
* construct network sessions roughly
* currently, we end a session when AUTH and DEAUTH pair is detected
*/

val mac = log.MAC
val time = log.time
val code = log.code
val ap = log.payload

if ( code == WIFICode.AuthRequest || code == WIFICode.AssocRequest) {
val ap = log.payload.split(',')(0)

// create ip mapping
if ( code == WIFICode.AuthRequest || code == WIFICode.AssocRequest ||
code == WIFICode.UserRoam || code == WIFICode.NewDev) {
// start a new mobility session with these msgs.
if ( ! APMap.contains(ap) )
APMap.put(ap, List(time, time))
else {
val value = APMap.get(ap).get
if ( time >= value(1) ) {
APMap.put(ap, List(value(0), time))
}
}
} else if (code == WIFICode.Deauth || code == WIFICode.Disassoc) {
if ( APMap.contains(ap)) {
val value = APMap.get(ap).get
sessions = sessions :+ WIFISession(mac, value(0), time, ap)
APMap.remove(ap)
}
}

if ( ! APMap.contains(ap) || (APMap.contains(ap) && ap != preAP))
APMap.put(ap, time)
APMap.foreach { case (key, value) => {
sessions = sessions :+ WIFISession(mac, value(0), value(1), key)
}}

} else if (code == WIFICode.Deauth || code == WIFICode.Disassoc) {
})

if ( APMap.contains(ap) ) {
// record this new session and remove it from APMap
val stime = APMap.get(ap).get
curSession = WIFISession(mac, stime, time, ap)
APMap.remove(ap)
// adjust session timestamps
var preSession: WIFISession = null
var ajustedSessions = new Array[WIFISession](0)

// adjust session timestamps
if ( preSession != null ) {
val tdiff = curSession.stime - preSession.etime
if (tdiff < 0)
preSession = preSession.copy(etime = curSession.stime)

// merge adjacent sessions under the same AP
if ( preSession.AP == curSession.AP && tdiff < mergeSessionThreshold )
preSession = preSession.copy(etime = curSession.etime)
else {
sessions = sessions :+ preSession
preSession = curSession
}
sessions.sortBy(m => m.stime).foreach { m => {
if ( preSession != null ) {
val tdiff = m.stime - m.etime
if (tdiff < 0)
preSession = preSession.copy(etime = m.stime)

} else {
preSession = curSession
}
}
// merge adjacent sessions with the same IP
if ( preSession.AP == m.AP && tdiff < mergeSessionThreshold )
preSession = preSession.copy(etime = m.etime)
else {
ajustedSessions = ajustedSessions :+ preSession
preSession = m
}
} else {
preSession = m
}

preAP = ap

})
}}

if (preSession != null)
sessions = sessions :+ preSession
ajustedSessions = ajustedSessions :+ preSession

sessions.sortBy(_.stime).toIterable
ajustedSessions.sortBy(m => m.stime).toIterable

}

Expand All @@ -262,34 +283,46 @@ object MergeWifiSession {
var mac: String = null

logs.foreach( log => {
// extract ip info
mac = log.MAC
val time = log.time
val code = log.code
val ip = log.payload

if ( code == WIFICode.IPAllocation ) {

if ( ! IPMap.contains(ip) )
IPMap.put(ip, List(time, time))
else {
val value = IPMap.get(ip).get
if ( time >= value(1) ) {
// update session ending time
IPMap.put(ip, List(value(0), time))
}
val ip = code match {
case WIFICode.UserAuth | WIFICode.UserRoam => {
val parts = log.payload.split(',')
val part_ip = parts(1)
if ( ! List("null", "0.0.0.0", "255.255.255.255").contains(part_ip))
part_ip
else
null
}
case WIFICode.IPAllocation | WIFICode.IPRecycle => log.payload
case _ => null
}

} else if (code == WIFICode.IPRecycle) {

if ( IPMap.contains(ip)) {
val value = IPMap.get(ip).get
// recycle certain IP
sessions = sessions :+ IPSession(mac, value(0), time, ip)
IPMap.remove(ip)
} else {
// omit recycling messages without allocation first
// create ip mapping
if (ip != null){
if ( code == WIFICode.IPAllocation ||
code == WIFICode.UserAuth ||
code == WIFICode.UserRoam) {
// start an IP allocation session with these msgs.
if ( ! IPMap.contains(ip) )
IPMap.put(ip, List(time, time))
else {
val value = IPMap.get(ip).get
if ( time >= value(1) ) {
// update session ending time
IPMap.put(ip, List(value(0), time))
}
}
} else if (code == WIFICode.IPRecycle) {
// terminate an IP allocation session with recycle msg.
if ( IPMap.contains(ip)) {
val value = IPMap.get(ip).get
sessions = sessions :+ IPSession(mac, value(0), time, ip)
IPMap.remove(ip)
}
}

}
})

Expand Down Expand Up @@ -325,6 +358,6 @@ object MergeWifiSession {
if (preSession != null)
ajustedSessions = ajustedSessions :+ preSession

ajustedSessions.toIterable
ajustedSessions.sortBy(m => m.stime).toIterable
}
}
15 changes: 14 additions & 1 deletion etlers/WifiToolkit/test500010
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,17 @@
1449963448803 <141>Dec 13 07:29:21 2015 SJTU-Local3 mobileip[2209]: <500010> <NOTI> <SJTU-Local3 10.190.3.1> Station 24:24:0e:8e:3b:f1, 10.188.112.151: Mobility trail, on switch 10.190.3.1, VLAN 1003, AP LXZL-2F-06, SJTU/6c:f3:7f:36:32:e8/a
1449963449337 <141>Dec 13 07:18:46 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station 58:44:98:e8:75:ef, : Mobility trail, on switch 10.190.5.1, VLAN 1005, AP DZY-1-1F-08, SJTU/6c:f3:7f:5a:df:c0/g
1449963449358 <141>Dec 13 07:20:32 2015 SJTU-Local2 mobileip[2151]: <500010> <NOTI> <SJTU-Local2 10.190.2.1> Station e4:ce:8f:8f:91:71, : Mobility trail, on switch 10.190.2.1, VLAN 1002, AP JXDLXY-A-Z-8F-06, SJTU/d8:c7:c8:28:f6:10/g
1449963449564 <141>Dec 13 07:18:46 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station d8:1d:72:b4:07:b6, 10.184.88.74: Mobility trail, on switch 10.190.5.1, VLAN 1005, AP XSFWZX-1F-01, SJTU-Web/6c:f3:7f:34:9a:41/g
1449963449564 <141>Dec 13 07:18:46 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station d8:1d:72:b4:07:b6, 10.184.88.74: Mobility trail, on switch 10.190.5.1, VLAN 1005, AP XSFWZX-1F-01, SJTU-Web/6c:f3:7f:34:9a:41/g
1449976452638 <141>Dec 13 10:55:28 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g
1449976479731 <141>Dec 13 10:55:56 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g
1449976506720 <141>Dec 13 10:56:23 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g
1449976533751 <141>Dec 13 10:56:50 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g
1449976560893 <141>Dec 13 10:57:17 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g
1449976587973 <141>Dec 13 10:57:44 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g
1449976615320 <141>Dec 13 10:58:11 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g
1449976642410 <141>Dec 13 10:58:38 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g
1449976669443 <141>Dec 13 10:59:05 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g
1449976696496 <141>Dec 13 10:59:32 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g
1449976723678 <141>Dec 13 10:59:59 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g
1449976750778 <141>Dec 13 11:00:27 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station 38:71:de:2e:b8:87, 10.184.183.85: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:20/g
1449976859947 <141>Dec 13 11:02:16 2015 SJTU-Local5 mobileip[2161]: <500010> <NOTI> <SJTU-Local5 10.190.5.1> Station d0:25:98:3d:99:a9, 10.189.66.125: Mobility trail, on switch 10.190.5.1, VLAN 1004, AP WLXXZX-1F-01, SJTU/d8:c7:c8:28:46:28/a
7 changes: 7 additions & 0 deletions etlers/WifiToolkit/test_http
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
10.187.226.191 3323 42.62.61.12 80 unique 1401552002.408494 1401552002.522564 0.031966 0.008017 1401552002.448477 0.000000 1401552002.482449 0.033972 0.000000 0.040115 509 687 "POST" "/sxsvr/requestDownloadXL/" "HTTP/1.1" "ufaapi.kuaipan.cn" "N/A" "N/A" "N/A" "N/A" "HTTP/1.1" "200" "nginx" "N/A" "text/html; charset=utf-8" "N/A" "N/A" "N/A" "N/A" "N/A" "N/A" "keep-alive" "N/A"
10.187.226.191 3325 42.62.61.12 80 unique 1401552002.408525 1401552002.522653 0.031956 0.008469 1401552002.448950 0.000000 1401552002.483935 0.034985 0.000000 0.038718 509 687 "POST" "/sxsvr/requestDownloadXL/" "HTTP/1.1" "ufaapi.kuaipan.cn" "N/A" "N/A" "N/A" "N/A" "HTTP/1.1" "200" "nginx" "N/A" "text/html; charset=utf-8" "N/A" "N/A" "N/A" "N/A" "N/A" "N/A" "keep-alive" "N/A"
10.184.177.211 57955 14.17.43.149 80 unique 1401552002.465187 1401552002.545713 0.034112 0.000785 1401552002.500084 0.000000 1401552002.539457 0.039373 0.000000 0.006256 212 837 "GET" "/cw.html" "HTTP/1.1" "tools.3g.qq.com" "Dalvik/1.6.0 (Linux; U; Android 4.2.2; GT-I9158 Build/JDQ39)" "N/A" "Keep-Alive" "N/A" "HTTP/1.1" "200" "HTTP Load Balancer/2.0" "609" "text/html; charset=UTF-8" "gzip" "N/A" "no-cache" "N/A" "N/A" "N/A" "N/A" "N/A"
10.187.226.191 3327 42.62.61.12 80 unique 1401552002.447871 1401552002.555424 0.032126 0.000486 1401552002.480483 0.000000 1401552002.514625 0.034142 0.000000 0.040799 509 687 "POST" "/sxsvr/requestDownloadXL/" "HTTP/1.1" "ufaapi.kuaipan.cn" "N/A" "N/A" "N/A" "N/A" "HTTP/1.1" "200" "nginx" "N/A" "text/html; charset=utf-8" "N/A" "N/A" "N/A" "N/A" "N/A" "N/A" "keep-alive" "N/A"
10.184.56.209 62759 114.112.68.137 80 unique 1401552002.511238 1401552002.621942 0.044496 0.000081 1401552002.555815 0.000000 1401552002.583079 0.027264 0.000000 0.038863 201 213 "GET" "/safe/msg.pack?pid=h_home&ver=4.7.0.4087&oem=h_home" "HTTP/1.1" "cs.weishi.ijinshan.com" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)" "N/A" "Keep-Alive" "5" "HTTP/1.1" "302" "Apache-Coyote/1.1" "0" "N/A" "N/A" "N/A" "N/A" "N/A" "N/A" "N/A" "close" "N/A"
10.187.226.191 3329 42.62.61.12 80 unique 1401552002.547993 1401552002.650357 0.032397 0.000679 1401552002.581069 0.000000 1401552002.615512 0.034443 0.000000 0.034845 509 687 "POST" "/sxsvr/requestDownloadXL/" "HTTP/1.1" "ufaapi.kuaipan.cn" "N/A" "N/A" "N/A" "N/A" "HTTP/1.1" "200" "nginx" "N/A" "text/html; charset=utf-8" "N/A" "N/A" "N/A" "N/A" "N/A" "N/A" "keep-alive" "N/A"
10.184.57.233 60377 222.199.191.49 80 unique 1401552002.584645 1401552002.688671 0.029316 0.012646 1401552002.626607 0.000000 1401552002.654168 0.027561 0.000000 0.034503 935 245 "GET" "/cpro/ui/cm.js" "HTTP/1.1" "cpro.baidustatic.com" "Mozilla/5.0 (iPhone; CPU iPhone OS 6_1_4 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10B350 Safari/8536.25" "http://m.baidu.com/from=1099b/bd_page_type=1/ssid=0/uid=0/pu=usm%400%2Csz%401320_2001%2Cta%40iphone_1_6.1_3_536/baiduid=1FF55BD373B9DAA836BD4FC0A71282BB/w=0_10_757179796/t=iphone/l=3/tc?ref=www_iphone&lid=3239258061022554973&order=2&vit=osres&tj=www_normal_2_0_10&m=8&srd=1&cltj=cloud_title&dict=21&sec=38919&di=f5be0290a7977b4f&bdenc=1&nsrc=IlPT2AEptyoA_yixCFOxXnANedT62v3IEQGG_8sGRmngyY39hLWxBcJpUnKhVmaH0F4suHO0vt6NgGGcWSZ8jcVOrhhmsn9h8T7buralrMLLCBVdag2lCAeHGnU0tqmg7RlLg2Z9F2MoB7ssov0xwtg_rsXZ82As8sbyeGS6rMe2VY4y2luZnF76PZlNTjKuO3jEa_" "keep-alive" "N/A" "HTTP/1.1" "304" "JSP2/1.0.27" "N/A" "N/A" "N/A" ""5388580e-14ca5"" "max-age=3600" "Fri, 30 May 2014 10:06:06 GMT" "N/A" "Sat, 31 May 2014 16:08:42 GMT" "close" "N/A"

0 comments on commit eddf4c8

Please sign in to comment.