Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets echo demo over Brubeck #98

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 27 additions & 86 deletions brubeck/connections.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import ujson as json
from uuid import uuid4
import cgi
import re
import logging
import Cookie

from request import to_bytes, to_unicode, parse_netstring, Request
from request_handling import http_response, coro_spawn
from request import to_bytes, Request
from request_handling import coro_spawn


###
Expand All @@ -21,29 +19,13 @@ class Connection(object):
response is necessary.
"""

def __init__(self, incoming=None, outgoing=None):
"""The base `__init__()` function configures a unique ID and assigns
the incoming and outgoing mechanisms to a name.

`in_sock` and `out_sock` feel like misnomers at this time but they are
preserved for a transition period.
"""
self.sender_id = uuid4().hex
self.in_sock = incoming
self.out_sock = outgoing

def _unsupported(self, name):
"""Simple function that raises an exception.
"""
error_msg = 'Subclass of Connection has not implemented `%s()`' % name
raise NotImplementedError(error_msg)

def process_message(self, application, message):
"""This coroutine looks at the message, determines which handler will
be used to process it, and then begins processing.

def recv(self):
"""Receives a raw mongrel2.handler.Request object that you
can then work with.
The application is responsible for handling misconfigured routes.
"""
self._unsupported('recv')
pass

def _recv_forever_ever(self, fun_forever):
"""Calls a handler function that runs forever. The handler can be
Expand All @@ -55,37 +37,6 @@ def _recv_forever_ever(self, fun_forever):
# Put a newline after ^C
print '\nBrubeck going down...'

def send(self, uuid, conn_id, msg):
"""Function for sending a single message.
"""
self._unsupported('send')

def reply(self, req, msg):
"""Does a reply based on the given Request object and message.
"""
self.send(req.sender, req.conn_id, msg)

def reply_bulk(self, uuid, idents, data):
"""This lets you send a single message to many currently
connected clients. There's a MAX_IDENTS that you should
not exceed, so chunk your targets as needed. Each target
will receive the message once by Mongrel2, but you don't have
to loop which cuts down on reply volume.
"""
self._unsupported('reply_bulk')
self.send(uuid, ' '.join(idents), data)

def close(self):
"""Close the connection.
"""
self._unsupported('close')

def close_bulk(self, uuid, idents):
"""Same as close but does it to a whole bunch of idents at a time.
"""
self._unsupported('close_bulk')
self.reply_bulk(uuid, idents, "")


###
### ZeroMQ
Expand Down Expand Up @@ -116,7 +67,7 @@ def load_zmq_ctx():
zmq = load_zmq()
zmq_ctx = zmq.Context()
load_zmq_ctx._zmq_ctx = zmq_ctx

return load_zmq_ctx._zmq_ctx


Expand All @@ -141,41 +92,32 @@ def __init__(self, pull_addr, pub_addr):
"""
zmq = load_zmq()
ctx = load_zmq_ctx()

in_sock = ctx.socket(zmq.PULL)
out_sock = ctx.socket(zmq.PUB)

super(Mongrel2Connection, self).__init__(in_sock, out_sock)
self.sender_id = uuid4().hex
self.in_sock = ctx.socket(zmq.PULL)
self.out_sock = ctx.socket(zmq.PUB)

self.in_addr = pull_addr
self.out_addr = pub_addr

in_sock.connect(pull_addr)
out_sock.setsockopt(zmq.IDENTITY, self.sender_id)
out_sock.connect(pub_addr)
self.in_sock.connect(pull_addr)
self.out_sock.setsockopt(zmq.IDENTITY, self.sender_id)
self.out_sock.connect(pub_addr)

def process_message(self, application, message):
"""This coroutine looks at the message, determines which handler will
be used to process it, and then begins processing.

The application is responsible for handling misconfigured routes.
"""
request = Request.parse_msg(message)
if request.is_disconnect():
return # Ignore disconnect msgs. Dont have areason to do otherwise
return # Ignore disconnect msgs. Dont have a reason to do otherwise
handler = application.route_message(request)
result = handler()

http_content = http_response(result['body'], result['status_code'],
result['status_msg'], result['headers'])

application.msg_conn.reply(request, http_content)

def recv(self):
"""Receives a raw mongrel2.handler.Request object that you from the
zeromq socket and return whatever is found.
"""
zmq_msg = self.in_sock.recv()
return zmq_msg
# in case of long poll we do not want to send reply
if result is not None:
self.reply(request, result)

def recv_forever_ever(self, application):
"""Defines a function that will run the primary connection Brubeck uses
Expand All @@ -184,7 +126,7 @@ def recv_forever_ever(self, application):
"""
def fun_forever():
while True:
request = self.recv()
request = self.in_sock.recv()
coro_spawn(self.process_message, application, request)
self._recv_forever_ever(fun_forever)

Expand Down Expand Up @@ -212,7 +154,7 @@ def reply_bulk(self, uuid, idents, data):
def close(self):
"""Tells mongrel2 to explicitly close the HTTP connection.
"""
pass
self.reply(req.sender, "")

def close_bulk(self, uuid, idents):
"""Same as close but does it to a whole bunch of idents at a time.
Expand All @@ -221,22 +163,21 @@ def close_bulk(self, uuid, idents):


###
### WSGI
### WSGI
###

class WSGIConnection(Connection):
"""
"""

def __init__(self, port=6767):
super(WSGIConnection, self).__init__()
self.port = port

def process_message(self, application, environ, callback):
request = Request.parse_wsgi_request(environ)
handler = application.route_message(request)
result = handler()

wsgi_status = ' '.join([str(result['status_code']), result['status_msg']])
headers = [(k, v) for k,v in result['headers'].items()]
callback(str(wsgi_status), headers)
Expand All @@ -254,15 +195,15 @@ def fun_forever():

def proc_msg(environ, callback):
return self.process_message(application, environ, callback)

if CORO_LIBRARY == 'gevent':
from gevent import wsgi
server = wsgi.WSGIServer(('', self.port), proc_msg)
server.serve_forever()

elif CORO_LIBRARY == 'eventlet':
import eventlet.wsgi
server = eventlet.wsgi.server(eventlet.listen(('', self.port)),
proc_msg)

self._recv_forever_ever(fun_forever)
43 changes: 11 additions & 32 deletions brubeck/request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,16 @@ def coro_spawn(function, app, message, *a, **kw):
import cPickle as pickle
from itertools import chain
import os, sys
from dictshield.base import ShieldException
from request import Request, to_bytes, to_unicode

from request import Request, to_bytes
import ujson as json

###
### Common helpers
###

HTTP_METHODS = ['get', 'post', 'put', 'delete',
'head', 'options', 'trace', 'connect']
METHODS = ['get', 'post', 'put', 'delete',
'head', 'options', 'trace', 'connect',
'websocket', 'websocket_handshake']

HTTP_FORMAT = "HTTP/1.1 %(code)s %(status)s\r\n%(headers)s\r\n\r\n%(body)s"

Expand All @@ -76,16 +75,6 @@ class FourOhFourException(Exception):
### Result Processing
###

def render(body, status_code, status_msg, headers):
payload = {
'body': body,
'status_code': status_code,
'status_msg': status_msg,
'headers': headers,
}
return payload


def http_response(body, code, status, headers):
"""Renders arguments into an HTTP response.
"""
Expand Down Expand Up @@ -221,7 +210,7 @@ def supported_methods(self):
"""List all the HTTP methods you have defined.
"""
supported_methods = []
for mef in HTTP_METHODS:
for mef in METHODS:
if callable(getattr(self, mef, False)):
supported_methods.append(mef)
return supported_methods
Expand Down Expand Up @@ -318,7 +307,7 @@ def __call__(self):
mef = self.message.method.lower() # M-E-T-H-O-D man!

# Find function mapped to method on self
if mef in HTTP_METHODS:
if mef in METHODS:
fun = getattr(self, mef, self.unsupported)
else:
fun = self.unsupported
Expand All @@ -336,10 +325,6 @@ def __call__(self):
rendered = fun(**kwargs)
else:
rendered = fun(*self._url_args)

if rendered is None:
logging.debug('Handler had no return value: %s' % fun)
return ''
except Exception, e:
logging.error(e, exc_info=True)
rendered = self.error(e)
Expand Down Expand Up @@ -548,12 +533,11 @@ def render(self, status_code=None, http_200=False, **kwargs):

self.convert_cookies()

response = render(self.body, status_code, self.status_msg, self.headers)

logging.info('%s %s %s (%s)' % (status_code, self.message.method,
self.message.path,
self.message.remote_addr))
return response

return http_response(self.body, status_code, self.status_msg, self.headers)


class JSONMessageHandler(WebMessageHandler):
Expand All @@ -576,13 +560,11 @@ def render(self, status_code=None, hide_status=False, **kwargs):
else:
body = json.dumps(self._payload)

response = render(body, self.status_code, self.status_msg,
self.headers)

logging.info('%s %s %s (%s)' % (self.status_code, self.message.method,
self.message.path,
self.message.remote_addr))
return response

return http_response(body, self.status_code, self.status_msg, self.headers)


class JsonSchemaMessageHandler(WebMessageHandler):
Expand All @@ -603,10 +585,7 @@ def render(self, status_code=None, **kwargs):
self.convert_cookies()
self.headers['Content-Type'] = "application/schema+json"

response = render(self.body, status_code, self.status_msg,
self.headers)

return response
return http_response(self.body, status_code, self.status_msg, self.headers)

###
### Application logic
Expand Down
Loading