From f2379ddc16c5bd37d959201377c28279a7971e54 Mon Sep 17 00:00:00 2001 From: englehardt Date: Mon, 31 Mar 2014 02:19:27 -0400 Subject: [PATCH] Initial commit of socket interface. Tested and working with proxy --- .gitignore | 9 ++ automation/BrowserManager.py | 8 +- automation/DataAggregator/DataAggregator.py | 13 ++- automation/Proxy/MITMProxy.py | 14 +-- automation/Proxy/deploy_mitm_proxy.py | 6 +- automation/Proxy/mitm_commands.py | 16 ++-- automation/SocketManager.py | 97 +++++++++++++++++++++ automation/TaskManager.py | 19 ++-- 8 files changed, 148 insertions(+), 34 deletions(-) create mode 100644 .gitignore create mode 100644 automation/SocketManager.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..178a1297f --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +*.pyc +*~ +.*.sw* + +# A bug in selenium creates this on unix systems +C:\\nppdf32Log\\debuglog.txt + +#MAC filesystem explorer drops this +.DS_Store diff --git a/automation/BrowserManager.py b/automation/BrowserManager.py index bd7bb454f..6bb44446f 100644 --- a/automation/BrowserManager.py +++ b/automation/BrowserManager.py @@ -11,14 +11,14 @@ # is the queue through which the browser sends command tuples # is a queue through which the BrowserManager either signals command failure or success -# is the queue through which to send data to the DataAggregator to manipulate and write +# is the socket address through which to send data to the DataAggregator to manipulate and write # are browser parameter settings (e.g. whether we're using a proxy, headless, etc.) -def BrowserManager(command_queue, status_queue, db_query_queue, browser_params): +def BrowserManager(command_queue, status_queue, db_address, browser_params): # sets up the proxy (for now, mitmproxy) if necessary proxy_site_queue = None # used to pass the current site down to the proxy if browser_params['proxy']: - (local_port, proxy_site_queue) = deploy_mitm_proxy.init_proxy(db_query_queue, browser_params['crawl_id']) + (local_port, proxy_site_queue) = deploy_mitm_proxy.init_proxy(db_address, browser_params['crawl_id']) browser_params['proxy'] = local_port # Gets the WebDriver, profile folder (i.e. where history/cookies are stored) and display pid (None if not headless) @@ -47,4 +47,4 @@ def BrowserManager(command_queue, status_queue, db_query_queue, browser_params): except Exception as ex: print "CRASH IN DRIVER ORACLE:" + str(ex) + " RESTARTING BROWSER MANAGER" status_queue.put("FAILED") - break \ No newline at end of file + break diff --git a/automation/DataAggregator/DataAggregator.py b/automation/DataAggregator/DataAggregator.py index d3995ebb4..c6990870c 100644 --- a/automation/DataAggregator/DataAggregator.py +++ b/automation/DataAggregator/DataAggregator.py @@ -1,3 +1,4 @@ +from ..SocketManager import serversocket import sqlite3 import time @@ -5,6 +6,7 @@ # Executes queries until being told to die (then it will finish work and shut down) # This process should never be terminated un-gracefully # Currently uses SQLite but may move to different platform +# TODO: add killing of the serversocket once supported # is the id of the current crawl w.r.t a particular database (TODO: re-add this feature) # is the absolute path of the DB's current location @@ -12,11 +14,16 @@ # is a queue connect to the TaskManager used for # is the number of execution statements that should be made before a commit (used for speedup) -def DataAggregator(crawl_id, db_loc, query_queue, status_queue, commit_loop=1000): +def DataAggregator(crawl_id, db_loc, status_queue, commit_loop=1000): # sets up DB connection db = sqlite3.connect(db_loc, check_same_thread=False) curr = db.cursor() + # sets up the serversocket to start accepting connections + sock = serversocket() + status_queue.put(sock.sock.getsockname()) #let TM know location + sock.start_accepting() + counter = 0 # number of executions made since last commit while True: # received KILL command from TaskManager @@ -25,13 +32,13 @@ def DataAggregator(crawl_id, db_loc, query_queue, status_queue, commit_loop=1000 break # no command for now -> sleep to avoid pegging CPU on blocking get - if query_queue.empty(): + if sock.queue.empty(): time.sleep(0.001) continue # executes a query of form (template_string, arguments) # query is of form (template_string, arguments) - query = query_queue.get() + query = sock.queue.get() curr.execute(query[0], query[1]) # batch commit if necessary diff --git a/automation/Proxy/MITMProxy.py b/automation/Proxy/MITMProxy.py index 5f91a3eaa..3b5e56138 100644 --- a/automation/Proxy/MITMProxy.py +++ b/automation/Proxy/MITMProxy.py @@ -1,6 +1,7 @@ # Customized MITMProxy # Extends the proxy controller to add some additional # functionality to handling requests and responses +from ..SocketManager import clientsocket from libmproxy import controller import sys import Queue @@ -10,7 +11,7 @@ # Inspired by the following example. Note the gist has a lot of bugs. # https://gist.github.com/dannvix/5285924 class InterceptingMaster (controller.Master): - def __init__(self, server, crawl_id, url_queue, db_command_queue): + def __init__(self, server, crawl_id, url_queue, db_socket_address): self.crawl_id = crawl_id # Attributes used to flag the first-party domain @@ -19,8 +20,9 @@ def __init__(self, server, crawl_id, url_queue, db_command_queue): self.new_top_url = None # first-party url that BrowserManager just claimed to have visited self.changed = False # used to flag that new site has been visited so proxy looks for new site's traffic - # Queue to communicate with DataAggregator - self.db_queue = db_command_queue + # Open a socket to communicate with DataAggregator + self.db_socket = clientsocket() + self.db_socket.connect(*db_socket_address) controller.Master.__init__(self, server) @@ -36,7 +38,7 @@ def tick(self, q): # try to load/process message as usual try: msg = q.get(timeout=0.01) - controller.Master.handle(self, *msg) + controller.Master.handle(self, msg) except Queue.Empty: pass @@ -65,10 +67,10 @@ def handle_request(self, msg): self.curr_top_url = self.new_top_url self.changed = False - mitm_commands.process_general_mitm_request(self.db_queue, self.crawl_id, self.curr_top_url, msg) + mitm_commands.process_general_mitm_request(self.db_socket, self.crawl_id, self.curr_top_url, msg) # Record data from HTTP responses def handle_response(self, msg): msg.reply() - mitm_commands.process_general_mitm_response(self.db_queue, self.crawl_id, self.curr_top_url, msg) + mitm_commands.process_general_mitm_response(self.db_socket, self.crawl_id, self.curr_top_url, msg) diff --git a/automation/Proxy/deploy_mitm_proxy.py b/automation/Proxy/deploy_mitm_proxy.py index 5f45f78f4..77655edf2 100644 --- a/automation/Proxy/deploy_mitm_proxy.py +++ b/automation/Proxy/deploy_mitm_proxy.py @@ -10,7 +10,7 @@ # is the queue used to pass down query strings and arguments to the DataAggregator # is the id set by the TaskManager -def init_proxy(db_command_queue, crawl_id): +def init_proxy(db_socket_address, crawl_id): proxy_site_queue = Queue.Queue() # queue for crawler to communicate with proxy # gets local port from one of the free ports @@ -22,6 +22,6 @@ def init_proxy(db_command_queue, crawl_id): config = proxy.ProxyConfig(cacert=os.path.join(os.path.dirname(__file__), 'mitmproxy.pem'),) server = proxy.ProxyServer(config, proxy_port) print 'Intercepting Proxy listening on ' + str(proxy_port) - m = MITMProxy.InterceptingMaster(server, crawl_id, proxy_site_queue, db_command_queue) + m = MITMProxy.InterceptingMaster(server, crawl_id, proxy_site_queue, db_socket_address) thread.start_new_thread(m.run, ()) - return proxy_port, proxy_site_queue \ No newline at end of file + return proxy_port, proxy_site_queue diff --git a/automation/Proxy/mitm_commands.py b/automation/Proxy/mitm_commands.py index f57f19533..3ccc2ece0 100644 --- a/automation/Proxy/mitm_commands.py +++ b/automation/Proxy/mitm_commands.py @@ -7,30 +7,30 @@ # msg is the message object given by MITM # (crawl_id, url, method, referrer, top_url) -def process_general_mitm_request(db_queue, crawl_id, top_url, msg): +def process_general_mitm_request(db_socket, crawl_id, top_url, msg): if len(msg.headers['referer']) > 0: referrer = msg.headers['referer'][0] else: referrer = '' data = (crawl_id, msg.get_url(), msg.method, referrer, top_url) - db_queue.put(("INSERT INTO http_requests (crawl_id, url, method, referrer, top_url) VALUES (?,?,?,?,?)", data)) + db_socket.send_pickled(("INSERT INTO http_requests (crawl_id, url, method, referrer, top_url) VALUES (?,?,?,?,?)", data)) # msg is the message object given by MITM # (crawl_id, url, method, referrer, response_status, response_status_text, top_url) -def process_general_mitm_response(db_queue, crawl_id, top_url, msg): +def process_general_mitm_response(db_socket, crawl_id, top_url, msg): if len(msg.request.headers['referer']) > 0: referrer = msg.request.headers['referer'][0] else: referrer = '' if msg.get_cookies() is not None: - process_cookies(db_queue, crawl_id, top_url, referrer, msg.get_cookies()) + process_cookies(db_socket, crawl_id, top_url, referrer, msg.get_cookies()) else: data = (crawl_id, msg.request.get_url(), msg.request.method, referrer, msg.code, msg.msg, top_url) - db_queue.put(("INSERT INTO http_responses (crawl_id, url, method, referrer, response_status, " + db_socket.send_pickled(("INSERT INTO http_responses (crawl_id, url, method, referrer, response_status, " "response_status_text, top_url) VALUES (?,?,?,?,?,?,?)", data)) # returns canonical date-time string @@ -41,7 +41,7 @@ def parse_date(date): return str(datetime.datetime.now()) # add an entry for a cookie to the table -def process_cookies(db_queue, crawl_id, top_url, referrer, cookies): +def process_cookies(db_socket, crawl_id, top_url, referrer, cookies): for name in cookies: value, attr_dict = cookies[name] domain = '' if 'domain' not in attr_dict else unicode(attr_dict['domain'], errors='ignore') @@ -50,5 +50,5 @@ def process_cookies(db_queue, crawl_id, top_url, referrer, cookies): data = (crawl_id, domain, unicode(name, errors='ignore'), unicode(value, errors='ignore'), expiry, accessed, referrer, top_url) - db_queue.put(("INSERT INTO cookies (crawl_id, domain, name, value, expiry, accessed, referrer, top_url) " - "VALUES (?,?,?,?,?,?,?,?)", data)) \ No newline at end of file + db_socket.send_pickled(("INSERT INTO cookies (crawl_id, domain, name, value, expiry, accessed, referrer, top_url) " + "VALUES (?,?,?,?,?,?,?,?)", data)) diff --git a/automation/SocketManager.py b/automation/SocketManager.py new file mode 100644 index 000000000..e99ae0391 --- /dev/null +++ b/automation/SocketManager.py @@ -0,0 +1,97 @@ +import Queue +import threading +import socket +import struct +import cPickle + +#TODO - Implement a kill command for the server socket +#TODO - Add in thread joins and make sure everything exits cleanly + +class serversocket: + ''' + A server socket to recieve and process string messages + from client sockets to a central queue + ''' + def __init__(self, verbose=False): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.bind(('localhost',0)) + self.sock.listen(10) #queue a max of n connect requests + self.verbose = verbose + self.queue = Queue.Queue() + if self.verbose: + print "Server bound to: " + str(self.sock.getsockname()) + + def start_accepting(self): + ''' Start the listener thread ''' + thread = threading.Thread(target=self._accept, args=()) + thread.start() + + def _accept(self): + ''' Listen for connections and pass handling to a new thread ''' + while True: + (client, address) = self.sock.accept() + thread = threading.Thread(target=self._handle_conn, args=(client,address)) + thread.start() + + def _handle_conn(self, client, address): + ''' + Recieve messages and pass to queue. Messages are prefixed with + a 4-byte integer to specify the message length. + ''' + if self.verbose: + print "Thread: " + str(threading.current_thread()) + " connected to: " + str(address) + try: + while True: + msg = self.receive_msg(client, 5) + msglen, is_pickled = struct.unpack('>I?', msg) + msg = self.receive_msg(client, msglen) + if is_pickled: + msg = cPickle.loads(msg) + self.queue.put(msg) + except RuntimeError: + if self.verbose: + print "Client socket: " + str(address) + " closed" + + def receive_msg(self, client, msglen): + msg = '' + while len(msg) < msglen: + chunk = client.recv(msglen-len(msg)) + if chunk == '': + raise RuntimeError("socket connection broken") + msg = msg + chunk + return msg + +class clientsocket: + def __init__(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + def connect(self, host, port): + self.sock.connect((host,port)) + + def send(self, msg, is_pickled=False): + #prepend with message length + msg = struct.pack('>I?', len(msg), is_pickled) + msg + totalsent = 0 + while totalsent < len(msg): + sent = self.sock.send(msg[totalsent:]) + if sent == 0: + raise RuntimeError("socket connection broken") + totalsent = totalsent + sent + + def send_pickled(self, obj): + pickled = cPickle.dumps(obj) + self.send(pickled,is_pickled=True) + + def close(self): + self.sock.close() + +if __name__ == '__main__': + import sys + + #Just for testing + if sys.argv[1] == 's': + sock = serversocket(verbose=True) + sock.start_accepting() + elif sys.argv[1] == 'c': + sock = clientsocket() + import ipdb; ipdb.set_trace() diff --git a/automation/TaskManager.py b/automation/TaskManager.py index 18e081240..53b382218 100644 --- a/automation/TaskManager.py +++ b/automation/TaskManager.py @@ -37,6 +37,7 @@ def __init__(self, db_location, db_name, browser='firefox', timeout=30, headless self.db_name = db_name # sets up the crawl data database and extracts crawl id + #TODO update this -- locks likely aren't needed if self.mp_lock is not None: self.mp_lock.acquire() self.db = sqlite3.connect(db_location + db_name) @@ -63,10 +64,11 @@ def __init__(self, db_location, db_name, browser='firefox', timeout=30, headless self.timeout = timeout self.browser_command_queue = None # queue for passing command tuples to BrowserManager self.browser_status_queue = None # queue for receiving command execution status from BrowserManager - self.aggregator_query_queue = None # queue for sending data/queries to DataAggregator + #self.aggregator_query_queue = None # queue for sending data/queries to DataAggregator #TODO remove self.aggregator_status_queue = None # queue used for sending graceful KILL command to DataAggregator - self.browser_manager = self.launch_browser_manager() self.data_aggregator = self.launch_data_aggregator() + self.aggregator_address = self.aggregator_status_queue.get() #socket location: (address, port) + self.browser_manager = self.launch_browser_manager() # CRAWLER SETUP / KILL CODE @@ -97,13 +99,12 @@ def launch_browser_manager(self, spawn_timeout=300): successful_spawn = False while not successful_spawn: # Resets the command/status queues - (self.browser_command_queue, self.browser_status_queue, self.aggregator_query_queue) \ - = (Queue(), Queue(), Queue()) + (self.browser_command_queue, self.browser_status_queue) = (Queue(), Queue()) # builds and launches the browser_manager browser_manager = Process(target=BrowserManager.BrowserManager, args=(self.browser_command_queue, self.browser_status_queue, - self.aggregator_query_queue, self.browser_params, )) + self.aggregator_address, self.browser_params, )) browser_manager.start() # waits for BrowserManager to send success tuple i.e. (profile_path, browser pid, display pid) @@ -135,8 +136,8 @@ def launch_browser_manager(self, spawn_timeout=300): def launch_data_aggregator(self): self.aggregator_status_queue = Queue() aggregator = Process(target=DataAggregator.DataAggregator, - args=(self.crawl_id, self.db_loc + self.db_name, - self.aggregator_query_queue, self.aggregator_status_queue, )) + args=(self.crawl_id, self.db_loc + self.db_name, + self.aggregator_status_queue, )) aggregator.start() return aggregator @@ -156,7 +157,6 @@ def kill_data_aggregator(self): # marks whether we want to wipe the old profile def restart_workers(self, reset=False): self.kill_browser_manager() - self.kill_data_aggregator() # in case of reset, hard-deletes old profile if reset and self.profile_path is not None: @@ -164,7 +164,6 @@ def restart_workers(self, reset=False): self.profile_path = None self.browser_manager = self.launch_browser_manager() - self.data_aggregator = self.launch_data_aggregator() # closes the TaskManager for good and frees up memory def close(self): @@ -213,4 +212,4 @@ def dump_profile(self, dump_folder, overwrite_timeout=None): # resets the worker processes with profile to a clean state def reset(self): if not self.is_fresh: # optimization in case resetting after a relaunch - self.restart_workers(reset=True) \ No newline at end of file + self.restart_workers(reset=True)