Skip to content

Commit

Permalink
Added gevent and threading support, added disconnect function
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Aug 9, 2015
1 parent 8591ef4 commit 5a350c7
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 199 deletions.
243 changes: 190 additions & 53 deletions docs/index.rst

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions example/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

import time
from threading import Thread
from flask import Flask, render_template, session, request
from flask import Flask, render_template, session
from flask_socketio import SocketIO, emit, join_room, leave_room, \
close_room, rooms, disconnect

app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, logger=True)
socketio = SocketIO(app)
thread = None


Expand Down
110 changes: 53 additions & 57 deletions flask_socketio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import os
import sys

import eventlet
import socketio
import flask
from werkzeug.debug import DebuggedApplication
from werkzeug.serving import run_with_reloader
from werkzeug._internal import _log

from .test_client import SocketIOTestClient

Expand All @@ -23,6 +18,8 @@ class SocketIO(object):
def __init__(self, app=None, **kwargs):
self.app = None
self.server = None
self.server_options = None
self.handlers = {}
self.exception_handlers = {}
self.default_exception_handler = None
if app is not None:
Expand All @@ -35,12 +32,9 @@ def init_app(self, app, **kwargs):
if not hasattr(app, 'extensions'):
app.extensions = {}
app.extensions['socketio'] = self
self.server = socketio.Server(**kwargs)
self.server_options = kwargs
self.app = app

def _on_message(self, message, handler, namespace='/'):
self.server.on(message, handler, namespace=namespace)

def on(self, message, namespace=None):
"""Decorator to register a SocketIO event handler.
Expand Down Expand Up @@ -70,6 +64,7 @@ def _handler(sid, *args):
flask.session)
flask.request.sid = sid
flask.request.namespace = namespace
flask.request.event = {'message': message, 'args': args}
try:
if message == 'connect':
ret = handler()
Expand All @@ -87,7 +82,9 @@ def _handler(sid, *args):
flask.session,
self.server.environ[sid]['saved_session'])
return ret
self.server.on(message, _handler, namespace=namespace)
if namespace not in self.handlers:
self.handlers[namespace] = {}
self.handlers[namespace][message] = _handler
return decorator

def on_error(self, namespace=None):
Expand Down Expand Up @@ -188,7 +185,7 @@ def send(self, data, json=False, namespace=None, room=None, callback=None):
self.emit('message', data, namespace=namespace, room=room,
callback=callback)

def close_room(self, room, namespace='/'):
def close_room(self, room, namespace=None):
"""Close a room.
This function removes any users that are in the given room and then
Expand All @@ -201,7 +198,7 @@ def close_room(self, room, namespace='/'):
"""
self.server.close_room(room, namespace)

def run(self, app, host=None, port=None, **kwargs):
def run(self, app=None, host=None, port=None, **kwargs):
"""Run the SocketIO web server.
:param app: The Flask application instance.
Expand All @@ -211,33 +208,14 @@ def run(self, app, host=None, port=None, **kwargs):
5000.
:param use_reloader: ``True`` to enable the Flask reloader, ``False``
to disable it.
:param log_output: If ``True``, the server logs all incomming
connections. If ``False`` logging is disabled.
Defaults to ``True`` in debug mode, ``False``
otherwise.
:param resource: The SocketIO resource name. Defaults to
``'socket.io'``. Leave this as is unless you know what
you are doing.
:param transports: Optional list of transports to allow. List of
strings, each string should be one of
handler.SocketIOHandler.handler_types.
:param policy_server: Boolean describing whether or not to use the
Flash policy server. Defaults to ``True``.
:param policy_listener: A tuple containing (host, port) for the
policy server. This is optional and used only
if policy server is set to true. Defaults to
0.0.0.0:843.
:param heartbeat_interval: The timeout for the server, we should
receive a heartbeat from the client within
this interval. This should be less than the
``heartbeat_timeout``.
:param heartbeat_timeout: The timeout for the client when it should
send a new heartbeat to the server. This
value is sent to the client after a
successful handshake.
:param close_timeout: The timeout for the client, when it closes the
connection it still X amounts of seconds to do
re-open of the connection. This value is sent to
the client after a successful handshake.
:param log_file: The file in which you want the PyWSGI server to write
its access log. If not specified, it is sent to
``stderr`` (with gevent 0.13).
:param kwargs: additional options passed to the eventlet WSGI server.
"""
if host is None:
host = '127.0.0.1'
Expand All @@ -248,23 +226,41 @@ def run(self, app, host=None, port=None, **kwargs):
else:
port = 5000

resource = kwargs.pop('resource', 'socket.io')
# use_reloader = kwargs.pop('use_reloader', app.debug)
# if use_reloader:
# # monkey patching is required by the reloader
# from gevent import monkey
# monkey.patch_all()
#
# def run_server():
# self.server.serve_forever()
# if os.environ.get('WERKZEUG_RUN_MAIN') != 'true':
# _log('info', ' * Running on http://%s:%d/' % (host, port))
# run_with_reloader(run_server)
# else:
# _log('info', ' * Running on http://%s:%d/' % (host, port))
# self.server.serve_forever()
app = socketio.Middleware(self.server, app, socketio_path=resource)
eventlet.wsgi.server(eventlet.listen((host, port)), app)
if app is None:
app = self.app
else:
self.app = app

self.server_options.update(kwargs)
log_output = self.server_options.pop('log_output', app.debug)
use_reloader = self.server_options.pop('use_reloader', app.debug)
resource = self.server_options.pop('resource', 'socket.io')
if resource.startswith('/'):
resource = resource[1:]
if app.debug:
self.server_options['async_mode'] = 'threading'

self.server = socketio.Server(**self.server_options)
for namespace in self.handlers.keys():
for message, handler in self.handlers[namespace].items():
self.server.on(message, handler, namespace=namespace)

app.wsgi_app = socketio.Middleware(self.server, app.wsgi_app,
socketio_path=resource)

if self.server.eio.async_mode == 'threading':
app.run(host=host, port=port, threaded=True,
use_reloader=use_reloader)
elif self.server.eio.async_mode == 'eventlet':
import eventlet
eventlet.wsgi.server(eventlet.listen((host, port)), app,
log_output=log_output, **kwargs)
elif self.server.eio.async_mode == 'gevent':
from gevent import pywsgi
log = 'default'
if not log_output:
log = None
pywsgi.WSGIServer((host, port), app, log=log).serve_forever()

def test_client(self, app, namespace=None):
"""Return a simple SocketIO client that can be used for unit tests."""
Expand Down Expand Up @@ -420,8 +416,8 @@ def receive_message(msg):
disconnect()
# ...
:param silent: close the connection, but do not actually send a disconnect
packet to the client.
:param silent: this option is deprecated.
"""
#return flask.request.namespace.disconnect(silent)
raise NotImplementedError()
socketio = flask.current_app.extensions['socketio']
return socketio.server.disconnect(flask.request.sid,
namespace=flask.request.namespace)
75 changes: 45 additions & 30 deletions flask_socketio/test_client.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,82 @@
"""
This module contains a collection of auxiliary mock objects used by
unit tests.
"""

import uuid

from socketio import packet
from werkzeug.test import EnvironBuilder


_queue = {}


def _mock_send_packet(sid, pkt):
global _queue
if sid not in _queue:
_queue[sid] = []
if pkt.packet_type == packet.EVENT or \
pkt.packet_type == packet.BINARY_EVENT:
if pkt.data[0] == 'message' or pkt.data[0] == 'json':
_queue[sid].append({'name': pkt.data[0], 'args': pkt.data[1],
'namespace': pkt.namespace or '/'})
else:
_queue[sid].append({'name': pkt.data[0], 'args': pkt.data[1:],
'namespace': pkt.namespace or '/'})


class SocketIOTestClient(object):
"""Fake client useful for testing of a Flask-SocketIO server."""
queue = {}
ack = None

def __init__(self, app, socketio, namespace=None):
def _mock_send_packet(sid, pkt):
if pkt.packet_type == packet.EVENT or \
pkt.packet_type == packet.BINARY_EVENT:
if sid not in self.queue:
self.queue[sid] = []
if pkt.data[0] == 'message' or pkt.data[0] == 'json':
self.queue[sid].append({'name': pkt.data[0],
'args': pkt.data[1],
'namespace': pkt.namespace or '/'})
else:
self.queue[sid].append({'name': pkt.data[0],
'args': pkt.data[1:],
'namespace': pkt.namespace or '/'})
elif pkt.packet_type == packet.ACK or \
pkt.packet_type == packet.BINARY_ACK:
self.ack = {'args': pkt.data,
'namespace': pkt.namespace or '/'}

self.sid = uuid.uuid4().hex
self.callback_counter = 0
self.socketio = socketio
socketio.server._send_packet = _mock_send_packet
socketio.server.environ[self.sid] = {}
self.connect(namespace)

def connect(self, namespace=None):
"""Connect the client."""
environ = EnvironBuilder('/socket.io').get_environ()
self.socketio.server._handle_eio_connect(self.sid, environ)
if namespace is not None and namespace != '/':
pkt = packet.Packet(packet.CONNECT, namespace=namespace)
self.socketio.server._handle_eio_message(self.sid, pkt.encode())

def disconnect(self, namespace=None):
"""Disconnect the client."""
pkt = packet.Packet(packet.DISCONNECT, namespace=namespace)
self.socketio.server._handle_eio_message(self.sid, pkt.encode())

def emit(self, event, *args, **kwargs):
"""Emit an event to the server."""
namespace = kwargs.pop('namespace', None)
callback = kwargs.pop('callback', False)
id = None
if callback:
self.callback_counter += 1
id = self.callback_counter
pkt = packet.Packet(packet.EVENT, data=[event] + list(args),
namespace=namespace, binary=False)
namespace=namespace, id=id, binary=False)
self.ack = None
self.socketio.server._handle_eio_message(self.sid, pkt.encode())
if self.ack is not None:
return self.ack['args'][0] if len(self.ack['args']) == 1 \
else self.ack['args']

def send(self, data, json=False, namespace=None):
def send(self, data, json=False, callback=False, namespace=None):
"""Send a message to the server."""
if json:
msg = 'json'
else:
msg = 'message'
return self.emit(msg, data, namespace=namespace)
return self.emit(msg, data, callback=callback, namespace=namespace)

def get_received(self, namespace=None):
if self.sid not in _queue:
return []
"""Return the list of messages received from the server."""
namespace = namespace or '/'
r = [pkt for pkt in _queue[self.sid] if pkt['namespace'] == namespace]
_queue[self.sid] = [pkt for pkt in _queue[self.sid] if pkt not in r]
return r
r = [pkt for pkt in self.queue[self.sid]
if pkt['namespace'] == namespace]
self.queue[self.sid] = [pkt for pkt in self.queue[self.sid]
if pkt not in r]
return r
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
platforms='any',
install_requires=[
'Flask>=0.9',
'python-socketio>=0.2.0'
'python-socketio>=0.4.0'
],
tests_require=[
'coverage'
Expand Down
Loading

0 comments on commit 5a350c7

Please sign in to comment.