Skip to content

Commit

Permalink
fixes #28: implement websocket transport
Browse files Browse the repository at this point in the history
  • Loading branch information
beatgammit committed Mar 12, 2014
1 parent 638e1b3 commit 46575aa
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 0 deletions.
6 changes: 6 additions & 0 deletions wampv2/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type JSONSerializer struct {
func (s *JSONSerializer) Serialize(msg Message) ([]byte, error) {
arr := []interface{}{int(msg.MessageType())}
val := reflect.ValueOf(msg)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
for i := 0; i < val.NumField(); i++ {
arr = append(arr, val.Field(i).Interface())
}
Expand Down Expand Up @@ -46,6 +49,9 @@ func (s *JSONSerializer) Deserialize(data []byte) (Message, error) {
}
for i := 0; i < val.NumField() && i < len(arr)-1; i++ {
f := val.Field(i)
if arr[i+1] == nil {
continue
}
arg := reflect.ValueOf(arr[i+1])
if arg.Kind() == reflect.Ptr {
arg = arg.Elem()
Expand Down
86 changes: 86 additions & 0 deletions wampv2/websocket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package wampv2

import "code.google.com/p/go.net/websocket"

type websocketEndpoint struct {
conn *websocket.Conn
serializer Serializer
messages chan Message
payloadType byte
}

func NewJSONWebsocketClient(url, origin string) (Endpoint, error) {
return newWebsocketClient(url, jsonWebsocketProtocol, origin, &JSONSerializer{}, websocket.TextFrame)
}

func newWebsocketClient(url, protocol, origin string, serializer Serializer, payloadType byte) (Endpoint, error) {
conn, err := websocket.Dial(url, protocol, origin)
if err != nil {
return nil, err
}
ep := &websocketEndpoint{
conn: conn,
messages: make(chan Message, 10),
serializer: serializer,
payloadType: payloadType,
}
switch payloadType {
case websocket.TextFrame:
go ep.receiveTextFrames()
case websocket.BinaryFrame:
go ep.receiveBinaryFrames()
}
return ep, nil
}

func (ep *websocketEndpoint) receiveTextFrames() error {
for {
var str string
if err := websocket.Message.Receive(ep.conn, &str); err != nil {
// TODO: check if it's a closing err
return err
}
if msg, err := ep.serializer.Deserialize([]byte(str)); err != nil {
// TODO: handle error
} else {
ep.messages <- msg
}
}
return nil
}

func (ep *websocketEndpoint) receiveBinaryFrames() (err error) {
for {
var b []byte
if err := websocket.Message.Receive(ep.conn, &b); err != nil {
// TODO: check if it's a closing err
return err
}
if msg, err := ep.serializer.Deserialize(b); err != nil {
// TODO: handle error
} else {
ep.messages <- msg
}
}
return nil
}

func (ep *websocketEndpoint) Send(msg Message) error {
b, err := ep.serializer.Serialize(msg)
if err != nil {
return err
}
switch ep.payloadType {
case websocket.TextFrame:
return websocket.Message.Send(ep.conn, string(b))
case websocket.BinaryFrame:
return websocket.Message.Send(ep.conn, b)
}
return nil
}
func (ep *websocketEndpoint) Receive() <-chan Message {
return ep.messages
}
func (ep *websocketEndpoint) Close() error {
return ep.conn.Close()
}
117 changes: 117 additions & 0 deletions wampv2/websocket_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package wampv2

import (
"code.google.com/p/go.net/websocket"
"fmt"
"net/http"
)

const (
jsonWebsocketProtocol = "wamp.2.json"
msgpackWebsocketProtocol = "wamp.2.msgpack"
)

type invalidPayload byte
func (e invalidPayload) Error() string {
return fmt.Sprintf("Invalid payloadType: %d", e)
}

type protocolExists string
func (e protocolExists) Error() string {
return "This protocol has already been registered: " + string(e)
}

type protocol struct {
payloadType byte
serializer Serializer
}

// WebsocketServer handles websocket connections.
type WebsocketServer struct {
server websocket.Server
router Router

protocols map[string]protocol

// The serializer to use for text frames. Defaults to JSONSerializer.
TextSerializer Serializer
// The serializer to use for binary frames. Defaults to JSONSerializer.
BinarySerializer Serializer
}

func NewWebsocketServer(r Router) *WebsocketServer {
s := &WebsocketServer{
router: r,
protocols: make(map[string]protocol),
}

s.server = websocket.Server{
Handshake: s.handshake,
Handler: websocket.Handler(s.handleWebsocket),
}
return s
}

func (s *WebsocketServer) RegisterProtocol(proto string, payloadType byte, serializer Serializer) error {
if payloadType != websocket.TextFrame && payloadType != websocket.BinaryFrame {
return invalidPayload(payloadType)
}
if _, ok := s.protocols[proto]; ok {
return protocolExists(proto)
}
s.protocols[proto] = protocol{payloadType, serializer}
return nil
}

func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.server.ServeHTTP(w, r)
}

func (s *WebsocketServer) handshake(config *websocket.Config, req *http.Request) error {
for _, protocol := range config.Protocol {
if _, ok := s.protocols[protocol]; ok {
config.Protocol = []string{protocol}
return nil
}
if protocol == jsonWebsocketProtocol || protocol == msgpackWebsocketProtocol {
config.Protocol = []string{protocol}
return nil
}
}
return websocket.ErrBadWebSocketProtocol
}

func (s *WebsocketServer) handleWebsocket(conn *websocket.Conn) {
var serializer Serializer
var payloadType byte
for _, proto := range conn.Config().Protocol {
if protocol, ok := s.protocols[proto]; ok {
serializer = protocol.serializer
payloadType = protocol.payloadType
break
} else if proto == "wamp.2.json" {
serializer = new(JSONSerializer)
payloadType = websocket.TextFrame
break
} else if proto == "wamp.2.msgpack" {
// TODO: implement msgpack
}
}
if serializer == nil {
conn.Close()
return
}

ep := websocketEndpoint{
conn: conn,
serializer: serializer,
messages: make(chan Message, 10),
payloadType: payloadType,
}
if payloadType == websocket.TextFrame {
go ep.receiveTextFrames()
} else if payloadType == websocket.BinaryFrame {
go ep.receiveBinaryFrames()
}
s.router.Accept(&ep)
}
45 changes: 45 additions & 0 deletions wampv2/websocket_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package wampv2

import (
"testing"
"net"
"net/http"
"io"
"fmt"
)

func newWebsocketServer(t *testing.T) (int, Router, io.Closer) {
r := NewBasicRouter()
r.RegisterRealm(test_realm, NewBasicRealm())
s := NewWebsocketServer(r)
server := &http.Server{
Handler: s,
}

var addr net.TCPAddr
l, err := net.ListenTCP("tcp", &addr)
if err != nil {
t.Fatal(err)
}
go server.Serve(l)
return l.Addr().(*net.TCPAddr).Port, r, l
}

func TestWSHandshake(t *testing.T) {
port, r, closer := newWebsocketServer(t)
defer closer.Close()

ep, err := NewJSONWebsocketClient(fmt.Sprintf("ws://localhost:%d/", port), "http://localhost")
if err != nil {
t.Fatal(err)
}

ep.Send(&Hello{Realm: test_realm})
go r.Accept(ep)

if msg, ok := <-ep.Receive(); !ok {
t.Fatal("Receive buffer closed")
} else if _, ok := msg.(*Welcome); !ok {
t.Errorf("Message not Welcome message: %T, %+v", msg, msg)
}
}

0 comments on commit 46575aa

Please sign in to comment.