Skip to content

Commit

Permalink
Merge pull request #38 from beatgammit/wamp-2
Browse files Browse the repository at this point in the history
MessagePack serializer and change to gorilla/websocket
  • Loading branch information
jcelliott committed Aug 3, 2014
2 parents 46575aa + d314dfa commit 691eeac
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 121 deletions.
20 changes: 10 additions & 10 deletions wampv2/broker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package wampv2

func spliceSubscribers(subs []Subscriber, i int) []Subscriber {
if i == len(subs) - 1 {
if i == len(subs)-1 {
return subs[:i]
}
return append(subs[:i], subs[i+1:]...)
Expand Down Expand Up @@ -45,13 +45,13 @@ type Broker interface {

// A super simple broker that matches URIs to Subscribers.
type BasicBroker struct {
routes map[URI]map[ID]Subscriber
routes map[URI]map[ID]Subscriber
subscriptions map[ID]URI
}

func NewBasicBroker() *BasicBroker {
return &BasicBroker{
routes: make(map[URI]map[ID]Subscriber),
routes: make(map[URI]map[ID]Subscriber),
subscriptions: make(map[ID]URI),
}
}
Expand All @@ -60,7 +60,7 @@ func (br *BasicBroker) Publish(pub Publisher, msg *Publish) {
pubId := NewID()
evtTemplate := Event{
Publication: pubId,
Arguments: msg.Arguments,
Arguments: msg.Arguments,
ArgumentsKw: msg.ArgumentsKw,
}
for id, sub := range br.routes[msg.Topic] {
Expand Down Expand Up @@ -90,26 +90,26 @@ func (br *BasicBroker) Unsubscribe(sub Subscriber, msg *Unsubscribe) {
topic, ok := br.subscriptions[msg.Subscription]
if !ok {
err := &Error{
Type: msg.MessageType(),
Type: msg.MessageType(),
Request: msg.Request,
Error: WAMP_ERROR_NO_SUCH_SUBSCRIPTION,
Error: WAMP_ERROR_NO_SUCH_SUBSCRIPTION,
}
sub.SendError(err)
return
}

if r, ok := br.routes[topic]; !ok {
err := &Error{
Type: msg.MessageType(),
Type: msg.MessageType(),
Request: msg.Request,
Error: URI("wamp.error.internal_error"),
Error: URI("wamp.error.internal_error"),
}
sub.SendError(err)
} else if _, ok := r[msg.Subscription]; !ok {
err := &Error{
Type: msg.MessageType(),
Type: msg.MessageType(),
Request: msg.Request,
Error: URI("wamp.error.internal_error"),
Error: URI("wamp.error.internal_error"),
}
sub.SendError(err)
} else {
Expand Down
1 change: 1 addition & 0 deletions wampv2/realm.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Realm interface {

type BasicRealm struct {
}

func NewBasicRealm() *BasicRealm {
return &BasicRealm{}
}
Expand Down
79 changes: 79 additions & 0 deletions wampv2/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,91 @@ import (
"encoding/json"
"fmt"
"reflect"

"github.com/ugorji/go/codec"
)

type Serializer interface {
Serialize(Message) ([]byte, error)
Deserialize([]byte) (Message, error)
}

type MessagePackSerializer struct {
}

func (s *MessagePackSerializer) Serialize(msg Message) ([]byte, error) {
arr := []interface{}{int(msg.MessageType())}
val := reflect.ValueOf(msg)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
for i := 0; i < val.NumField(); i++ {
arr = append(arr, val.Field(i).Interface())
}

var b []byte
return b, codec.NewEncoderBytes(&b, new(codec.MsgpackHandle)).Encode(arr)
}

func (s *MessagePackSerializer) Deserialize(data []byte) (Message, error) {
var arr []interface{}
if err := codec.NewDecoderBytes(data, new(codec.MsgpackHandle)).Decode(&arr); err != nil {
return nil, err
} else if len(arr) == 0 {
return nil, fmt.Errorf("Invalid message")
}

var msgType MessageType
if typ, ok := arr[0].(int64); ok {
msgType = MessageType(typ)
} else {
return nil, fmt.Errorf("Unsupported message format")
}

msg := msgType.New()
if msg == nil {
return nil, fmt.Errorf("Unsupported message type")
}
val := reflect.ValueOf(msg)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
for i := 0; i < val.NumField() && i < len(arr)-1; i++ {
f := val.Field(i)
if arr[i+1] == nil {
continue
}
arg := reflect.ValueOf(arr[i+1])
if arg.Kind() == reflect.Ptr {
arg = arg.Elem()
}
if !arg.Type().ConvertibleTo(f.Type()) {
// special-case map maps
if arg.Type().Kind() == reflect.Map && f.Type().Kind() == reflect.Map {
keyType := f.Type().Key()
valType := f.Type().Elem()
m := reflect.MakeMap(f.Type())
for _, k := range arg.MapKeys() {
if !k.Type().ConvertibleTo(keyType) {
return nil, fmt.Errorf("Message format error: %dth field not recognizable")
}
v := arg.MapIndex(k)
if !v.Type().ConvertibleTo(valType) {
return nil, fmt.Errorf("Message format error: %dth field not recognizable")
}
m.SetMapIndex(k.Convert(keyType), v.Convert(valType))
}
f.Set(m)
} else {
return nil, fmt.Errorf("Message format error: %dth field not recognizable", i+1)
}
} else {
f.Set(arg.Convert(f.Type()))
}
}
return msg, nil
}

type JSONSerializer struct {
}

Expand All @@ -25,6 +103,7 @@ func (s *JSONSerializer) Serialize(msg Message) ([]byte, error) {
}
return json.Marshal(arr)
}

func (s *JSONSerializer) Deserialize(data []byte) (Message, error) {
var arr []interface{}
if err := json.Unmarshal(data, &arr); err != nil {
Expand Down
96 changes: 39 additions & 57 deletions wampv2/websocket.go
Original file line number Diff line number Diff line change
@@ -1,82 +1,64 @@
package wampv2

import "code.google.com/p/go.net/websocket"
import (
"github.com/gorilla/websocket"
)

type websocketEndpoint struct {
conn *websocket.Conn
serializer Serializer
messages chan Message
payloadType byte
conn *websocket.Conn
serializer Serializer
messages chan Message
payloadType int
}

func NewJSONWebsocketClient(url, origin string) (Endpoint, error) {
return newWebsocketClient(url, jsonWebsocketProtocol, origin, &JSONSerializer{}, websocket.TextFrame)
return newWebsocketClient(url, jsonWebsocketProtocol, origin, new(JSONSerializer), websocket.TextMessage)
}

func newWebsocketClient(url, protocol, origin string, serializer Serializer, payloadType byte) (Endpoint, error) {
conn, err := websocket.Dial(url, protocol, origin)
func NewMessagePackWebsocketClient(url, origin string) (Endpoint, error) {
return newWebsocketClient(url, msgpackWebsocketProtocol, origin, new(MessagePackSerializer), websocket.BinaryMessage)
}

func newWebsocketClient(url, protocol, origin string, serializer Serializer, payloadType int) (Endpoint, error) {
dialer := websocket.Dialer{
Subprotocols: []string{protocol},
}
conn, _, err := dialer.Dial(url, nil)
if err != nil {
return nil, err
}
ep := &websocketEndpoint{
conn: conn,
messages: make(chan Message, 10),
serializer: serializer,
payloadType: payloadType,
}
switch payloadType {
case websocket.TextFrame:
go ep.receiveTextFrames()
case websocket.BinaryFrame:
go ep.receiveBinaryFrames()
}
return ep, nil
}

func (ep *websocketEndpoint) receiveTextFrames() error {
for {
var str string
if err := websocket.Message.Receive(ep.conn, &str); err != nil {
// TODO: check if it's a closing err
return err
}
if msg, err := ep.serializer.Deserialize([]byte(str)); err != nil {
// TODO: handle error
} else {
ep.messages <- msg
}
conn: conn,
messages: make(chan Message, 10),
serializer: serializer,
payloadType: payloadType,
}
return nil
}

func (ep *websocketEndpoint) receiveBinaryFrames() (err error) {
for {
var b []byte
if err := websocket.Message.Receive(ep.conn, &b); err != nil {
// TODO: check if it's a closing err
return err
go func() {
for {
// TODO: use conn.NextMessage() and stream
// TODO: do something different based on binary/text frames
if _, b, err := conn.ReadMessage(); err != nil {
conn.Close()
break
} else {
msg, err := serializer.Deserialize(b)
if err != nil {
// TODO: handle error
} else {
ep.messages <- msg
}
}
}
if msg, err := ep.serializer.Deserialize(b); err != nil {
// TODO: handle error
} else {
ep.messages <- msg
}
}
return nil
}()
return ep, nil
}

func (ep *websocketEndpoint) Send(msg Message) error {
b, err := ep.serializer.Serialize(msg)
if err != nil {
return err
}
switch ep.payloadType {
case websocket.TextFrame:
return websocket.Message.Send(ep.conn, string(b))
case websocket.BinaryFrame:
return websocket.Message.Send(ep.conn, b)
}
return nil
return ep.conn.WriteMessage(ep.payloadType, b)
}
func (ep *websocketEndpoint) Receive() <-chan Message {
return ep.messages
Expand Down
Loading

0 comments on commit 691eeac

Please sign in to comment.