From 45967e135651541b3f23729aea65367df62b9be2 Mon Sep 17 00:00:00 2001 From: Asko Oja Date: Mon, 10 Dec 2012 12:42:27 +0000 Subject: [PATCH 1/2] Made it possible to use websockets over Brubeck and created echo demo that can be used to start building your own websocekt handler. In connections.py - Removed many unneccessary imports (ujson should also be removed - From connection class removed lots of stuff that was left hanging there after wsgi support was added. Seems that there are only two shared methods in current implementation and all the rest of the code was just complicating code and creating confusion - Did some cleanup in Mongrel2Connection but there is probably more room to make it consisten and nice to use - Removed hardcoding for only http responses from process_message so now it can be used also for websocket responses In request_handling.py - Removed some unneccessary imports - Removed uncommented and unneccessary function render() from module level. All calls to this replaced with appropriate http_response calls Also cleaned up ungly cross importing between modules that was in the way for websockets In m2reader.py - Set connect string to match mongrel2 conf in demos dir. In demo_websocket.py - Quite simplistic impleetation of websockets echo server. I plan to write decent support but i would like to get the fixed common code first incorporated into brubeck --- brubeck/connections.py | 99 +++++++-------------------------- brubeck/request_handling.py | 39 ++++--------- demos/demo_websocket.py | 108 ++++++++++++++++++++++++++++++++++++ demos/m2reader.py | 2 +- 4 files changed, 139 insertions(+), 109 deletions(-) create mode 100755 demos/demo_websocket.py diff --git a/brubeck/connections.py b/brubeck/connections.py index 6a44942..ec59c31 100644 --- a/brubeck/connections.py +++ b/brubeck/connections.py @@ -1,12 +1,10 @@ import ujson as json from uuid import uuid4 import cgi -import re import logging -import Cookie -from request import to_bytes, to_unicode, parse_netstring, Request -from request_handling import http_response, coro_spawn +from request import to_bytes, Request +from request_handling import coro_spawn ### @@ -21,29 +19,13 @@ class Connection(object): response is necessary. """ - def __init__(self, incoming=None, outgoing=None): - """The base `__init__()` function configures a unique ID and assigns - the incoming and outgoing mechanisms to a name. - - `in_sock` and `out_sock` feel like misnomers at this time but they are - preserved for a transition period. - """ - self.sender_id = uuid4().hex - self.in_sock = incoming - self.out_sock = outgoing - - def _unsupported(self, name): - """Simple function that raises an exception. - """ - error_msg = 'Subclass of Connection has not implemented `%s()`' % name - raise NotImplementedError(error_msg) - - - def recv(self): - """Receives a raw mongrel2.handler.Request object that you - can then work with. + def process_message(self, application, message): + """This coroutine looks at the message, determines which handler will + be used to process it, and then begins processing. + + The application is responsible for handling misconfigured routes. """ - self._unsupported('recv') + pass def _recv_forever_ever(self, fun_forever): """Calls a handler function that runs forever. The handler can be @@ -55,37 +37,6 @@ def _recv_forever_ever(self, fun_forever): # Put a newline after ^C print '\nBrubeck going down...' - def send(self, uuid, conn_id, msg): - """Function for sending a single message. - """ - self._unsupported('send') - - def reply(self, req, msg): - """Does a reply based on the given Request object and message. - """ - self.send(req.sender, req.conn_id, msg) - - def reply_bulk(self, uuid, idents, data): - """This lets you send a single message to many currently - connected clients. There's a MAX_IDENTS that you should - not exceed, so chunk your targets as needed. Each target - will receive the message once by Mongrel2, but you don't have - to loop which cuts down on reply volume. - """ - self._unsupported('reply_bulk') - self.send(uuid, ' '.join(idents), data) - - def close(self): - """Close the connection. - """ - self._unsupported('close') - - def close_bulk(self, uuid, idents): - """Same as close but does it to a whole bunch of idents at a time. - """ - self._unsupported('close_bulk') - self.reply_bulk(uuid, idents, "") - ### ### ZeroMQ @@ -141,17 +92,17 @@ def __init__(self, pull_addr, pub_addr): """ zmq = load_zmq() ctx = load_zmq_ctx() - - in_sock = ctx.socket(zmq.PULL) - out_sock = ctx.socket(zmq.PUB) - super(Mongrel2Connection, self).__init__(in_sock, out_sock) + self.sender_id = uuid4().hex + self.in_sock = ctx.socket(zmq.PULL) + self.out_sock = ctx.socket(zmq.PUB) + self.in_addr = pull_addr self.out_addr = pub_addr - in_sock.connect(pull_addr) - out_sock.setsockopt(zmq.IDENTITY, self.sender_id) - out_sock.connect(pub_addr) + self.in_sock.connect(pull_addr) + self.out_sock.setsockopt(zmq.IDENTITY, self.sender_id) + self.out_sock.connect(pub_addr) def process_message(self, application, message): """This coroutine looks at the message, determines which handler will @@ -161,21 +112,10 @@ def process_message(self, application, message): """ request = Request.parse_msg(message) if request.is_disconnect(): - return # Ignore disconnect msgs. Dont have areason to do otherwise + return # Ignore disconnect msgs. Dont have a reason to do otherwise handler = application.route_message(request) result = handler() - - http_content = http_response(result['body'], result['status_code'], - result['status_msg'], result['headers']) - - application.msg_conn.reply(request, http_content) - - def recv(self): - """Receives a raw mongrel2.handler.Request object that you from the - zeromq socket and return whatever is found. - """ - zmq_msg = self.in_sock.recv() - return zmq_msg + self.reply(request, result) def recv_forever_ever(self, application): """Defines a function that will run the primary connection Brubeck uses @@ -184,7 +124,7 @@ def recv_forever_ever(self, application): """ def fun_forever(): while True: - request = self.recv() + request = self.in_sock.recv() coro_spawn(self.process_message, application, request) self._recv_forever_ever(fun_forever) @@ -212,7 +152,7 @@ def reply_bulk(self, uuid, idents, data): def close(self): """Tells mongrel2 to explicitly close the HTTP connection. """ - pass + self.reply(req.sender, "") def close_bulk(self, uuid, idents): """Same as close but does it to a whole bunch of idents at a time. @@ -229,7 +169,6 @@ class WSGIConnection(Connection): """ def __init__(self, port=6767): - super(WSGIConnection, self).__init__() self.port = port def process_message(self, application, environ, callback): diff --git a/brubeck/request_handling.py b/brubeck/request_handling.py index 4c2b237..fc21d08 100755 --- a/brubeck/request_handling.py +++ b/brubeck/request_handling.py @@ -53,17 +53,16 @@ def coro_spawn(function, app, message, *a, **kw): import cPickle as pickle from itertools import chain import os, sys -from dictshield.base import ShieldException -from request import Request, to_bytes, to_unicode - +from request import Request, to_bytes import ujson as json ### ### Common helpers ### -HTTP_METHODS = ['get', 'post', 'put', 'delete', - 'head', 'options', 'trace', 'connect'] +METHODS = ['get', 'post', 'put', 'delete', + 'head', 'options', 'trace', 'connect', + 'websocket', 'websocket_handshake'] HTTP_FORMAT = "HTTP/1.1 %(code)s %(status)s\r\n%(headers)s\r\n\r\n%(body)s" @@ -76,16 +75,6 @@ class FourOhFourException(Exception): ### Result Processing ### -def render(body, status_code, status_msg, headers): - payload = { - 'body': body, - 'status_code': status_code, - 'status_msg': status_msg, - 'headers': headers, - } - return payload - - def http_response(body, code, status, headers): """Renders arguments into an HTTP response. """ @@ -221,7 +210,7 @@ def supported_methods(self): """List all the HTTP methods you have defined. """ supported_methods = [] - for mef in HTTP_METHODS: + for mef in METHODS: if callable(getattr(self, mef, False)): supported_methods.append(mef) return supported_methods @@ -318,7 +307,7 @@ def __call__(self): mef = self.message.method.lower() # M-E-T-H-O-D man! # Find function mapped to method on self - if mef in HTTP_METHODS: + if mef in METHODS: fun = getattr(self, mef, self.unsupported) else: fun = self.unsupported @@ -548,12 +537,11 @@ def render(self, status_code=None, http_200=False, **kwargs): self.convert_cookies() - response = render(self.body, status_code, self.status_msg, self.headers) - logging.info('%s %s %s (%s)' % (status_code, self.message.method, self.message.path, self.message.remote_addr)) - return response + + return http_response(self.body, status_code, self.status_msg, self.headers) class JSONMessageHandler(WebMessageHandler): @@ -576,13 +564,11 @@ def render(self, status_code=None, hide_status=False, **kwargs): else: body = json.dumps(self._payload) - response = render(body, self.status_code, self.status_msg, - self.headers) - logging.info('%s %s %s (%s)' % (self.status_code, self.message.method, self.message.path, self.message.remote_addr)) - return response + + return http_response(body, self.status_code, self.status_msg, self.headers) class JsonSchemaMessageHandler(WebMessageHandler): @@ -603,10 +589,7 @@ def render(self, status_code=None, **kwargs): self.convert_cookies() self.headers['Content-Type'] = "application/schema+json" - response = render(self.body, status_code, self.status_msg, - self.headers) - - return response + return http_response(self.body, status_code, self.status_msg, self.headers) ### ### Application logic diff --git a/demos/demo_websocket.py b/demos/demo_websocket.py new file mode 100755 index 0000000..aa152fd --- /dev/null +++ b/demos/demo_websocket.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python + +""" Exmaple of websocket message handler for brubeck +Demo implements echo server that works over websockets. + +Experiment with +http://isr.nu/ws/WebSocketTest.htm +or +http://www.websocket.org/echo.html +""" + +import logging + +from brubeck.request_handling import http_response, MessageHandler, Brubeck +from brubeck.connections import Mongrel2Connection + +def websocket_response(data, opcode=1, rsvd=0): + """Renders arguments into WebSocket response + """ + header = '' + header += chr(0x80|opcode|rsvd<<4) + realLength = len(data) + if realLength < 126: + dummyLength = realLength + elif realLength < 2**16: + dummyLength = 126 + else: + dummyLength = 127 + header += chr(dummyLength) + if dummyLength == 127: + header += chr(realLength >> 56 & 0xff) + header += chr(realLength >> 48 & 0xff) + header += chr(realLength >> 40 & 0xff) + header += chr(realLength >> 32 & 0xff) + header += chr(realLength >> 24 & 0xff) + header += chr(realLength >> 16 & 0xff) + if dummyLength == 126 or dummyLength == 127: + header += chr(realLength >> 8 & 0xff) + header += chr(realLength & 0xff) + return header + data + + +class WSMessageHandler(MessageHandler): + """ Class implements websocket with as little dependancies as possible + """ + OPCODE_CONTINUATION = 0x0 + OPCODE_TEXT = 0x1 + OPCODE_BINARY = 0x2 + OPCODE_CLOSE = 0x8 + OPCODE_PING = 0x9 + OPCODE_PONG = 0xa + + def initialize(self): + """Just pack out some flags + """ + ### WebSocket message parsing + if self.message.method == 'WEBSOCKET': + flags = int(self.message.headers.get('FLAGS'),16) + self.ws_final = flags & 0x80 == 0x80 + self.ws_rsvd = flags & 0x70 + self.ws_opcode = flags & 0xf + + def websocket_handshake(self): + """Does initial handshake. This is a good place to register the websocket + connection in some memory database + """ + headers = {} + headers['Content-Type'] = 'text/plain' + headers['Upgrade'] = 'websocket' + headers['Connection'] = 'Upgrade' + headers['Sec-WebSocket-Accept'] = self.message.body + + logging.info('%s %s %s (%s)' % ( + '101', self.message.method, self.message.path, self.message.remote_addr)) + return http_response('', '101', 'Switching Protocols', headers) + + def render(self, msg = None, opcode = 1, rsvd = 0, **kwargs): + """Renders payload and prepares the payload for a successful WS + response. + """ + logging.info('%s %s %s (%s)' % ( + opcode, self.message.method, self.message.path, self.message.remote_addr)) + return websocket_response(msg, opcode, rsvd) + +class EchoHandler(WSMessageHandler): + def websocket(self): + if self.ws_opcode == self.OPCODE_CLOSE: + return self.render('', self.OPCODE_CLOSE) + + elif self.ws_opcode == self.OPCODE_PING: + return self.render(self.message.body, self.OPCODE_PONG) + + elif self.ws_opcode == self.OPCODE_TEXT: + msg = self.message.body.decode('utf-8') + return self.render(msg.encode('utf-8'), self.OPCODE_TEXT) + else: + raise Exception('Unhandled opcode in WebsocketHandler') + +urls = [(r'^/echo', EchoHandler)] + +config = { + 'msg_conn': Mongrel2Connection('tcp://127.0.0.1:9999', 'tcp://127.0.0.1:9998'), + 'handler_tuples': urls, +} + +app = Brubeck(**config) + +app.run() diff --git a/demos/m2reader.py b/demos/m2reader.py index 99d4ea1..5ce2f0d 100755 --- a/demos/m2reader.py +++ b/demos/m2reader.py @@ -4,7 +4,7 @@ ctx = zmq.Context() s = ctx.socket(zmq.PULL) -s.connect("ipc://127.0.0.1:9999") +s.connect("tcp://127.0.0.1:9999") while True: msg = s.recv() From a3763cb21e8c3ffcd5fac67632e1809d82995883 Mon Sep 17 00:00:00 2001 From: Asko Oja Date: Thu, 20 Dec 2012 13:05:38 +0000 Subject: [PATCH 2/2] Long poll handler can return None now to declare itself --- brubeck/connections.py | 20 +++++++++++--------- brubeck/request_handling.py | 10 +++------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/brubeck/connections.py b/brubeck/connections.py index ec59c31..3cbf6c1 100644 --- a/brubeck/connections.py +++ b/brubeck/connections.py @@ -22,7 +22,7 @@ class Connection(object): def process_message(self, application, message): """This coroutine looks at the message, determines which handler will be used to process it, and then begins processing. - + The application is responsible for handling misconfigured routes. """ pass @@ -67,7 +67,7 @@ def load_zmq_ctx(): zmq = load_zmq() zmq_ctx = zmq.Context() load_zmq_ctx._zmq_ctx = zmq_ctx - + return load_zmq_ctx._zmq_ctx @@ -107,7 +107,7 @@ def __init__(self, pull_addr, pub_addr): def process_message(self, application, message): """This coroutine looks at the message, determines which handler will be used to process it, and then begins processing. - + The application is responsible for handling misconfigured routes. """ request = Request.parse_msg(message) @@ -115,7 +115,9 @@ def process_message(self, application, message): return # Ignore disconnect msgs. Dont have a reason to do otherwise handler = application.route_message(request) result = handler() - self.reply(request, result) + # in case of long poll we do not want to send reply + if result is not None: + self.reply(request, result) def recv_forever_ever(self, application): """Defines a function that will run the primary connection Brubeck uses @@ -161,7 +163,7 @@ def close_bulk(self, uuid, idents): ### -### WSGI +### WSGI ### class WSGIConnection(Connection): @@ -175,7 +177,7 @@ def process_message(self, application, environ, callback): request = Request.parse_wsgi_request(environ) handler = application.route_message(request) result = handler() - + wsgi_status = ' '.join([str(result['status_code']), result['status_msg']]) headers = [(k, v) for k,v in result['headers'].items()] callback(str(wsgi_status), headers) @@ -193,15 +195,15 @@ def fun_forever(): def proc_msg(environ, callback): return self.process_message(application, environ, callback) - + if CORO_LIBRARY == 'gevent': from gevent import wsgi server = wsgi.WSGIServer(('', self.port), proc_msg) server.serve_forever() - + elif CORO_LIBRARY == 'eventlet': import eventlet.wsgi server = eventlet.wsgi.server(eventlet.listen(('', self.port)), proc_msg) - + self._recv_forever_ever(fun_forever) diff --git a/brubeck/request_handling.py b/brubeck/request_handling.py index fc21d08..93b1cd2 100755 --- a/brubeck/request_handling.py +++ b/brubeck/request_handling.py @@ -61,7 +61,7 @@ def coro_spawn(function, app, message, *a, **kw): ### METHODS = ['get', 'post', 'put', 'delete', - 'head', 'options', 'trace', 'connect', + 'head', 'options', 'trace', 'connect', 'websocket', 'websocket_handshake'] HTTP_FORMAT = "HTTP/1.1 %(code)s %(status)s\r\n%(headers)s\r\n\r\n%(body)s" @@ -325,10 +325,6 @@ def __call__(self): rendered = fun(**kwargs) else: rendered = fun(*self._url_args) - - if rendered is None: - logging.debug('Handler had no return value: %s' % fun) - return '' except Exception, e: logging.error(e, exc_info=True) rendered = self.error(e) @@ -540,7 +536,7 @@ def render(self, status_code=None, http_200=False, **kwargs): logging.info('%s %s %s (%s)' % (status_code, self.message.method, self.message.path, self.message.remote_addr)) - + return http_response(self.body, status_code, self.status_msg, self.headers) @@ -567,7 +563,7 @@ def render(self, status_code=None, hide_status=False, **kwargs): logging.info('%s %s %s (%s)' % (self.status_code, self.message.method, self.message.path, self.message.remote_addr)) - + return http_response(body, self.status_code, self.status_msg, self.headers)