Skip to content

Commit

Permalink
Initial commit of socket interface. Tested and working with proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
englehardt committed Mar 31, 2014
1 parent 4f12fcb commit f2379dd
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 34 deletions.
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
*.pyc
*~
.*.sw*

# A bug in selenium creates this on unix systems
C:\\nppdf32Log\\debuglog.txt

#MAC filesystem explorer drops this
.DS_Store
8 changes: 4 additions & 4 deletions automation/BrowserManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@

# <command_queue> is the queue through which the browser sends command tuples
# <status_queue> is a queue through which the BrowserManager either signals command failure or success
# <db_query_queue> is the queue through which to send data to the DataAggregator to manipulate and write
# <db_address> is the socket address through which to send data to the DataAggregator to manipulate and write
# <browser_params> are browser parameter settings (e.g. whether we're using a proxy, headless, etc.)

def BrowserManager(command_queue, status_queue, db_query_queue, browser_params):
def BrowserManager(command_queue, status_queue, db_address, browser_params):
# sets up the proxy (for now, mitmproxy) if necessary
proxy_site_queue = None # used to pass the current site down to the proxy
if browser_params['proxy']:
(local_port, proxy_site_queue) = deploy_mitm_proxy.init_proxy(db_query_queue, browser_params['crawl_id'])
(local_port, proxy_site_queue) = deploy_mitm_proxy.init_proxy(db_address, browser_params['crawl_id'])
browser_params['proxy'] = local_port

# Gets the WebDriver, profile folder (i.e. where history/cookies are stored) and display pid (None if not headless)
Expand Down Expand Up @@ -47,4 +47,4 @@ def BrowserManager(command_queue, status_queue, db_query_queue, browser_params):
except Exception as ex:
print "CRASH IN DRIVER ORACLE:" + str(ex) + " RESTARTING BROWSER MANAGER"
status_queue.put("FAILED")
break
break
13 changes: 10 additions & 3 deletions automation/DataAggregator/DataAggregator.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
from ..SocketManager import serversocket
import sqlite3
import time

# Receives SQL queries from other processes and writes them to the central database
# Executes queries until being told to die (then it will finish work and shut down)
# This process should never be terminated un-gracefully
# Currently uses SQLite but may move to different platform
# TODO: add killing of the serversocket once supported

# <crawl_id> is the id of the current crawl w.r.t a particular database (TODO: re-add this feature)
# <db_loc> is the absolute path of the DB's current location
# <query_queue> is data input queue: for now, passed query strings (TODO: more involved data manipulations)
# <status_queue> is a queue connect to the TaskManager used for
# <commit_loop> is the number of execution statements that should be made before a commit (used for speedup)

def DataAggregator(crawl_id, db_loc, query_queue, status_queue, commit_loop=1000):
def DataAggregator(crawl_id, db_loc, status_queue, commit_loop=1000):
# sets up DB connection
db = sqlite3.connect(db_loc, check_same_thread=False)
curr = db.cursor()

# sets up the serversocket to start accepting connections
sock = serversocket()
status_queue.put(sock.sock.getsockname()) #let TM know location
sock.start_accepting()

counter = 0 # number of executions made since last commit
while True:
# received KILL command from TaskManager
Expand All @@ -25,13 +32,13 @@ def DataAggregator(crawl_id, db_loc, query_queue, status_queue, commit_loop=1000
break

# no command for now -> sleep to avoid pegging CPU on blocking get
if query_queue.empty():
if sock.queue.empty():
time.sleep(0.001)
continue

# executes a query of form (template_string, arguments)
# query is of form (template_string, arguments)
query = query_queue.get()
query = sock.queue.get()
curr.execute(query[0], query[1])

# batch commit if necessary
Expand Down
14 changes: 8 additions & 6 deletions automation/Proxy/MITMProxy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Customized MITMProxy
# Extends the proxy controller to add some additional
# functionality to handling requests and responses
from ..SocketManager import clientsocket
from libmproxy import controller
import sys
import Queue
Expand All @@ -10,7 +11,7 @@
# Inspired by the following example. Note the gist has a lot of bugs.
# https://gist.github.com/dannvix/5285924
class InterceptingMaster (controller.Master):
def __init__(self, server, crawl_id, url_queue, db_command_queue):
def __init__(self, server, crawl_id, url_queue, db_socket_address):
self.crawl_id = crawl_id

# Attributes used to flag the first-party domain
Expand All @@ -19,8 +20,9 @@ def __init__(self, server, crawl_id, url_queue, db_command_queue):
self.new_top_url = None # first-party url that BrowserManager just claimed to have visited
self.changed = False # used to flag that new site has been visited so proxy looks for new site's traffic

# Queue to communicate with DataAggregator
self.db_queue = db_command_queue
# Open a socket to communicate with DataAggregator
self.db_socket = clientsocket()
self.db_socket.connect(*db_socket_address)

controller.Master.__init__(self, server)

Expand All @@ -36,7 +38,7 @@ def tick(self, q):
# try to load/process message as usual
try:
msg = q.get(timeout=0.01)
controller.Master.handle(self, *msg)
controller.Master.handle(self, msg)
except Queue.Empty:
pass

Expand Down Expand Up @@ -65,10 +67,10 @@ def handle_request(self, msg):
self.curr_top_url = self.new_top_url
self.changed = False

mitm_commands.process_general_mitm_request(self.db_queue, self.crawl_id, self.curr_top_url, msg)
mitm_commands.process_general_mitm_request(self.db_socket, self.crawl_id, self.curr_top_url, msg)

# Record data from HTTP responses
def handle_response(self, msg):
msg.reply()

mitm_commands.process_general_mitm_response(self.db_queue, self.crawl_id, self.curr_top_url, msg)
mitm_commands.process_general_mitm_response(self.db_socket, self.crawl_id, self.curr_top_url, msg)
6 changes: 3 additions & 3 deletions automation/Proxy/deploy_mitm_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# <db_query_query> is the queue used to pass down query strings and arguments to the DataAggregator
# <crawl_id> is the id set by the TaskManager

def init_proxy(db_command_queue, crawl_id):
def init_proxy(db_socket_address, crawl_id):
proxy_site_queue = Queue.Queue() # queue for crawler to communicate with proxy

# gets local port from one of the free ports
Expand All @@ -22,6 +22,6 @@ def init_proxy(db_command_queue, crawl_id):
config = proxy.ProxyConfig(cacert=os.path.join(os.path.dirname(__file__), 'mitmproxy.pem'),)
server = proxy.ProxyServer(config, proxy_port)
print 'Intercepting Proxy listening on ' + str(proxy_port)
m = MITMProxy.InterceptingMaster(server, crawl_id, proxy_site_queue, db_command_queue)
m = MITMProxy.InterceptingMaster(server, crawl_id, proxy_site_queue, db_socket_address)
thread.start_new_thread(m.run, ())
return proxy_port, proxy_site_queue
return proxy_port, proxy_site_queue
16 changes: 8 additions & 8 deletions automation/Proxy/mitm_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,30 @@

# msg is the message object given by MITM
# (crawl_id, url, method, referrer, top_url)
def process_general_mitm_request(db_queue, crawl_id, top_url, msg):
def process_general_mitm_request(db_socket, crawl_id, top_url, msg):
if len(msg.headers['referer']) > 0:
referrer = msg.headers['referer'][0]
else:
referrer = ''

data = (crawl_id, msg.get_url(), msg.method, referrer, top_url)
db_queue.put(("INSERT INTO http_requests (crawl_id, url, method, referrer, top_url) VALUES (?,?,?,?,?)", data))
db_socket.send_pickled(("INSERT INTO http_requests (crawl_id, url, method, referrer, top_url) VALUES (?,?,?,?,?)", data))


# msg is the message object given by MITM
# (crawl_id, url, method, referrer, response_status, response_status_text, top_url)
def process_general_mitm_response(db_queue, crawl_id, top_url, msg):
def process_general_mitm_response(db_socket, crawl_id, top_url, msg):
if len(msg.request.headers['referer']) > 0:
referrer = msg.request.headers['referer'][0]
else:
referrer = ''

if msg.get_cookies() is not None:
process_cookies(db_queue, crawl_id, top_url, referrer, msg.get_cookies())
process_cookies(db_socket, crawl_id, top_url, referrer, msg.get_cookies())

else:
data = (crawl_id, msg.request.get_url(), msg.request.method, referrer, msg.code, msg.msg, top_url)
db_queue.put(("INSERT INTO http_responses (crawl_id, url, method, referrer, response_status, "
db_socket.send_pickled(("INSERT INTO http_responses (crawl_id, url, method, referrer, response_status, "
"response_status_text, top_url) VALUES (?,?,?,?,?,?,?)", data))

# returns canonical date-time string
Expand All @@ -41,7 +41,7 @@ def parse_date(date):
return str(datetime.datetime.now())

# add an entry for a cookie to the table
def process_cookies(db_queue, crawl_id, top_url, referrer, cookies):
def process_cookies(db_socket, crawl_id, top_url, referrer, cookies):
for name in cookies:
value, attr_dict = cookies[name]
domain = '' if 'domain' not in attr_dict else unicode(attr_dict['domain'], errors='ignore')
Expand All @@ -50,5 +50,5 @@ def process_cookies(db_queue, crawl_id, top_url, referrer, cookies):

data = (crawl_id, domain, unicode(name, errors='ignore'), unicode(value, errors='ignore'),
expiry, accessed, referrer, top_url)
db_queue.put(("INSERT INTO cookies (crawl_id, domain, name, value, expiry, accessed, referrer, top_url) "
"VALUES (?,?,?,?,?,?,?,?)", data))
db_socket.send_pickled(("INSERT INTO cookies (crawl_id, domain, name, value, expiry, accessed, referrer, top_url) "
"VALUES (?,?,?,?,?,?,?,?)", data))
97 changes: 97 additions & 0 deletions automation/SocketManager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import Queue
import threading
import socket
import struct
import cPickle

#TODO - Implement a kill command for the server socket
#TODO - Add in thread joins and make sure everything exits cleanly

class serversocket:
'''
A server socket to recieve and process string messages
from client sockets to a central queue
'''
def __init__(self, verbose=False):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.bind(('localhost',0))
self.sock.listen(10) #queue a max of n connect requests
self.verbose = verbose
self.queue = Queue.Queue()
if self.verbose:
print "Server bound to: " + str(self.sock.getsockname())

def start_accepting(self):
''' Start the listener thread '''
thread = threading.Thread(target=self._accept, args=())
thread.start()

def _accept(self):
''' Listen for connections and pass handling to a new thread '''
while True:
(client, address) = self.sock.accept()
thread = threading.Thread(target=self._handle_conn, args=(client,address))
thread.start()

def _handle_conn(self, client, address):
'''
Recieve messages and pass to queue. Messages are prefixed with
a 4-byte integer to specify the message length.
'''
if self.verbose:
print "Thread: " + str(threading.current_thread()) + " connected to: " + str(address)
try:
while True:
msg = self.receive_msg(client, 5)
msglen, is_pickled = struct.unpack('>I?', msg)
msg = self.receive_msg(client, msglen)
if is_pickled:
msg = cPickle.loads(msg)
self.queue.put(msg)
except RuntimeError:
if self.verbose:
print "Client socket: " + str(address) + " closed"

def receive_msg(self, client, msglen):
msg = ''
while len(msg) < msglen:
chunk = client.recv(msglen-len(msg))
if chunk == '':
raise RuntimeError("socket connection broken")
msg = msg + chunk
return msg

class clientsocket:
def __init__(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def connect(self, host, port):
self.sock.connect((host,port))

def send(self, msg, is_pickled=False):
#prepend with message length
msg = struct.pack('>I?', len(msg), is_pickled) + msg
totalsent = 0
while totalsent < len(msg):
sent = self.sock.send(msg[totalsent:])
if sent == 0:
raise RuntimeError("socket connection broken")
totalsent = totalsent + sent

def send_pickled(self, obj):
pickled = cPickle.dumps(obj)
self.send(pickled,is_pickled=True)

def close(self):
self.sock.close()

if __name__ == '__main__':
import sys

#Just for testing
if sys.argv[1] == 's':
sock = serversocket(verbose=True)
sock.start_accepting()
elif sys.argv[1] == 'c':
sock = clientsocket()
import ipdb; ipdb.set_trace()
19 changes: 9 additions & 10 deletions automation/TaskManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(self, db_location, db_name, browser='firefox', timeout=30, headless
self.db_name = db_name

# sets up the crawl data database and extracts crawl id
#TODO update this -- locks likely aren't needed
if self.mp_lock is not None:
self.mp_lock.acquire()
self.db = sqlite3.connect(db_location + db_name)
Expand All @@ -63,10 +64,11 @@ def __init__(self, db_location, db_name, browser='firefox', timeout=30, headless
self.timeout = timeout
self.browser_command_queue = None # queue for passing command tuples to BrowserManager
self.browser_status_queue = None # queue for receiving command execution status from BrowserManager
self.aggregator_query_queue = None # queue for sending data/queries to DataAggregator
#self.aggregator_query_queue = None # queue for sending data/queries to DataAggregator #TODO remove
self.aggregator_status_queue = None # queue used for sending graceful KILL command to DataAggregator
self.browser_manager = self.launch_browser_manager()
self.data_aggregator = self.launch_data_aggregator()
self.aggregator_address = self.aggregator_status_queue.get() #socket location: (address, port)
self.browser_manager = self.launch_browser_manager()

# CRAWLER SETUP / KILL CODE

Expand Down Expand Up @@ -97,13 +99,12 @@ def launch_browser_manager(self, spawn_timeout=300):
successful_spawn = False
while not successful_spawn:
# Resets the command/status queues
(self.browser_command_queue, self.browser_status_queue, self.aggregator_query_queue) \
= (Queue(), Queue(), Queue())
(self.browser_command_queue, self.browser_status_queue) = (Queue(), Queue())

# builds and launches the browser_manager
browser_manager = Process(target=BrowserManager.BrowserManager,
args=(self.browser_command_queue, self.browser_status_queue,
self.aggregator_query_queue, self.browser_params, ))
self.aggregator_address, self.browser_params, ))
browser_manager.start()

# waits for BrowserManager to send success tuple i.e. (profile_path, browser pid, display pid)
Expand Down Expand Up @@ -135,8 +136,8 @@ def launch_browser_manager(self, spawn_timeout=300):
def launch_data_aggregator(self):
self.aggregator_status_queue = Queue()
aggregator = Process(target=DataAggregator.DataAggregator,
args=(self.crawl_id, self.db_loc + self.db_name,
self.aggregator_query_queue, self.aggregator_status_queue, ))
args=(self.crawl_id, self.db_loc + self.db_name,
self.aggregator_status_queue, ))
aggregator.start()
return aggregator

Expand All @@ -156,15 +157,13 @@ def kill_data_aggregator(self):
# <reset> marks whether we want to wipe the old profile
def restart_workers(self, reset=False):
self.kill_browser_manager()
self.kill_data_aggregator()

# in case of reset, hard-deletes old profile
if reset and self.profile_path is not None:
subprocess.call(["rm", "-r", self.profile_path])
self.profile_path = None

self.browser_manager = self.launch_browser_manager()
self.data_aggregator = self.launch_data_aggregator()

# closes the TaskManager for good and frees up memory
def close(self):
Expand Down Expand Up @@ -213,4 +212,4 @@ def dump_profile(self, dump_folder, overwrite_timeout=None):
# resets the worker processes with profile to a clean state
def reset(self):
if not self.is_fresh: # optimization in case resetting after a relaunch
self.restart_workers(reset=True)
self.restart_workers(reset=True)

0 comments on commit f2379dd

Please sign in to comment.