Skip to content

Commit

Permalink
Add account and ip info to WifiSyslogSession
Browse files Browse the repository at this point in the history
  • Loading branch information
caesar0301 committed Dec 14, 2015
1 parent 5cd0cb2 commit 23fdb55
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package cn.edu.sjtu.omnilab.odh.spark


case class IPSession(MAC: String, stime: Long, etime: Long, IP: String)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cn.edu.sjtu.omnilab.odh.spark

import cn.edu.sjtu.omnilab.odh.rawfilter.{APToBuilding, WIFICode}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._ // to use join etc.

import scala.collection.mutable.HashMap

Expand Down Expand Up @@ -43,7 +44,8 @@ object MergeWifiSession {
WIFICode.Disassoc
)

spark.textFile(input)
// Load clean Wifi syslog
val inputRDD = spark.textFile(input)

.map{ filtered => {
val parts = filtered.split(',')
Expand All @@ -59,16 +61,93 @@ object MergeWifiSession {
payload = parts(3)
)

}}}

.filter(m => m != null && validSessionCodes.contains(m.code))
}}}.filter(m => m != null)

// Filter IP sessions from input
val ipSession = inputRDD
.filter( m => m.code == WIFICode.IPAllocation || m.code == WIFICode.IPRecycle)
.groupBy(_.MAC)
.flatMap { case (key, logs) => { collectIPSession(logs) }}
.keyBy(_.MAC)
.cache

// Filter Auth info. from input
// If an account is detected only once for a MAC address, this account is
// associate to that MAC for the whole day.
// Otherwise, a session-based association is performed after the join action.
val authRDD = inputRDD.filter(m => m.code == WIFICode.UserAuth)
.map( m => CleanWIFILog(MAC=m.MAC, time=m.time, code=m.code, payload=m.payload))
.distinct.groupBy(_.MAC)
.mapValues{ case (authinfos) => {
var accounts: List[String] = List[String]()
authinfos.foreach(accounts ::= _.payload)

if (accounts.distinct.length == 1) {
authinfos.toList.take(1)
} else {
authinfos.toList
}}}

val sessionRDD = inputRDD
// extract mobility sessions
.filter(m => validSessionCodes.contains(m.code)).groupBy(_.MAC)
.flatMap { case (key, logs) => { extractSessions(logs) }}

.map { m =>
val buildInfo = apToBuild.parse(m.AP)
// append IP info. to mobility sessions
.keyBy(_.MAC)
.leftOuterJoin(ipSession)
.map { case (key, (wsession, ipsession)) => (wsession, ipsession.getOrElse(null))}
.groupBy(_._1)
.map{ case (wsession, ipsessions) => {

// Determine IP by comparing the length of overlap for Wifi session and IP session.
// The maximum temporal overlap corresponds to the right IP address.
var result: IPSession = null
var timeSpan: Long = 0

ipsessions.foreach{ case (wsession, ipsession) => {
var tspan: Long = 0

if (ipsession != null) {
val stime: Long = Math.max(ipsession.stime, wsession.stime)
// We ARTIFICALLY add 15 mins more to tolerate data loss
val etime: Long = Math.min(ipsession.etime + 15 * 3600 * 1000, wsession.etime)
tspan = etime - stime
}

if ( tspan > timeSpan ) {
timeSpan = tspan
result = ipsession
}
}}

(wsession, result) }}

// append account info. to mobility sessions
.keyBy(_._1.MAC)
.leftOuterJoin(authRDD)
.map { case (key, ((wsession, ipsession), authinfo)) => {
val authinfos = authinfo.getOrElse(null)

if ( authinfos != null ) {
if (authinfos.size == 1) {
(wsession, ipsession, authinfos(0))
} else {
// A session-based association is performed
var result: CleanWIFILog = null
authinfos.foreach( auth => {
if ( auth.time >= wsession.stime && auth.time <= wsession.etime ) {
result = auth
}
})
(wsession, ipsession, result)
}
} else {
(wsession, ipsession, null)
} }}

.map { case (wsession, ipsession, authinfo) =>
val buildInfo = apToBuild.parse(wsession.AP)

var bname: String = null
var btype: String = null
Expand All @@ -83,10 +162,20 @@ object MergeWifiSession {
blon = buildInfo.get(4)
}

"%s,%d,%d,%s,%s,%s,%s,%s,%s"
.format(m.MAC, m.stime, m.etime, m.AP, bname, btype, bschool, blat, blon)
}
var ipaddr: String = null
if ( ipsession != null ) {
ipaddr = ipsession.IP
}

var account: String = null
if ( authinfo != null ) {
account = authinfo.payload
}

"%s,%d,%d,%s,%s,%s,%s,%s,%s,%s,%s"
.format(wsession.MAC, wsession.stime, wsession.etime, wsession.AP,
bname, btype, bschool, blat, blon, ipaddr, account)
}
.saveAsTextFile(output)

}
Expand Down Expand Up @@ -160,4 +249,82 @@ object MergeWifiSession {
sessions.sortBy(_.stime).toIterable

}


/**
* Construct IP sessions identified by IPALLOC and IPRECYCLE messages
* @param logs
* @return
*/
def collectIPSession(logs: Iterable[CleanWIFILog]): Iterable[IPSession] = {
var sessions = new Array[IPSession](0)
var IPMap = new HashMap[String, List[Long]]
var mac: String = null

logs.foreach( log => {
mac = log.MAC
val time = log.time
val code = log.code
val ip = log.payload

if ( code == WIFICode.IPAllocation ) {

if ( ! IPMap.contains(ip) )
IPMap.put(ip, List(time, time))
else {
val value = IPMap.get(ip).get
if ( time >= value(1) ) {
// update session ending time
IPMap.put(ip, List(value(0), time))
}
}

} else if (code == WIFICode.IPRecycle) {

if ( IPMap.contains(ip)) {
val value = IPMap.get(ip).get
// recycle certain IP
sessions = sessions :+ IPSession(mac, value(0), time, ip)
IPMap.remove(ip)
} else {
// omit recycling messages without allocation first
}

}
})

if ( mac != null ){
IPMap.foreach { case (key, value) => {
sessions = sessions :+ IPSession(mac, value(0), value(1), key)
}}
}

// adjust session timestamps
var preSession: IPSession = null
var ajustedSessions = new Array[IPSession](0)
sessions.sortBy(m => m.stime).foreach { m => {

if ( preSession != null ) {
val tdiff = m.stime - m.etime
if (tdiff < 0)
preSession = preSession.copy(etime = m.stime)

// merge adjacent sessions with the same IP
if ( preSession.IP == m.IP && tdiff < mergeSessionThreshold )
preSession = preSession.copy(etime = m.etime)
else {
ajustedSessions = ajustedSessions :+ preSession
preSession = m
}
} else {
preSession = m
}

}}

if (preSession != null)
ajustedSessions = ajustedSessions :+ preSession

ajustedSessions.toIterable
}
}
2 changes: 1 addition & 1 deletion porters/wifi_syslog.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ hadoop fs -rm -r $TEMPWP
hadoop fs -mkdir -p $TEMPWP

TARGET=$(date -d "yesterday" '+%Y-%m-%d')
if [ $1 != "" ]; then
if [ XXOO$1 != "XXOO" ]; then
TARGET=$1
fi

Expand Down
2 changes: 1 addition & 1 deletion porters/wifi_syslog_session.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ if ! hadoop fs -test -d $HDFS_WIFI_SYSLOG_SESSION; then
fi

TARGET=$(date -d "yesterday" '+%Y-%m-%d')
if [ $1 != "" ]; then
if [ XXOO$1 != "XXOO" ]; then
TARGET=$1
fi

Expand Down
2 changes: 1 addition & 1 deletion porters/wifi_traffic_http.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ year=`date -d "yesterday" "+%Y"`
month=`date -d "yesterday" "+%m"`
day=`date -d "yesterday" "+%d"`

if [ $1 != "" ]; then
if [ XXOO$1 != "XXOO" ]; then
year=`echo $1 | cut -d'-' -f1`
month=`echo $1 | cut -d'-' -f2`
day=`echo $1 | cut -d'-' -f3`
Expand Down

0 comments on commit 23fdb55

Please sign in to comment.