Skip to content

Commit

Permalink
Implement basic RPC
Browse files Browse the repository at this point in the history
  - issue #32
  • Loading branch information
jcelliott committed Aug 7, 2014
1 parent 06f8510 commit bf62f8c
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 47 deletions.
149 changes: 149 additions & 0 deletions wampv2/dealer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package wampv2

type Callee interface {
ErrorHandler
// Acknowledge that the endpoint was succesfully registered
SendRegistered(*Registered)
// Acknowledge that the endpoint was succesfully unregistered
SendUnregistered(*Unregistered)
// Dealer requests fulfillment of a procedure call
SendInvocation(*Invocation)
}

type Caller interface {
ErrorHandler
// Dealer sends the returned result from the procedure call
SendResult(*Result)
}

type Dealer interface {
// Register a procedure on an endpoint
Register(Callee, *Register)
// Unregister a procedure on an endpoint
Unregister(Callee, *Unregister)
// Call a procedure on an endpoint
Call(Caller, *Call)
// Return the result of a procedure call
Yield(Callee, *Yield)
}

type RemoteProcedure struct {
Endpoint Callee
Procedure URI
}

type DefaultDealer struct {
// map registration IDs to procedures
procedures map[ID]RemoteProcedure
// map procedure URIs to registration IDs
// TODO: this will eventually need to be `map[URI][]ID` to support
// multiple callees for the same procedure
registrations map[URI]ID
// keep track of call IDs so we can send the response to the caller
calls map[ID]Caller
// link the invocation ID to the call ID
invocations map[ID]ID
}

func NewDefaultDealer() *DefaultDealer {
return &DefaultDealer{
procedures: make(map[ID]RemoteProcedure),
registrations: make(map[URI]ID),
calls: make(map[ID]Caller),
invocations: make(map[ID]ID),
}
}

func (d *DefaultDealer) Register(callee Callee, msg *Register) {
if _, ok := d.registrations[msg.Procedure]; ok {
callee.SendError(&Error{
Type: msg.MessageType(),
Request: msg.Request,
Error: WAMP_ERROR_PROCEDURE_ALREADY_EXISTS,
})
return
}
reg := NewID()
d.procedures[reg] = RemoteProcedure{callee, msg.Procedure}
d.registrations[msg.Procedure] = reg
callee.SendRegistered(&Registered{
Request: msg.Request,
Registration: reg,
})
}

func (d *DefaultDealer) Unregister(callee Callee, msg *Unregister) {
if procedure, ok := d.procedures[msg.Registration]; !ok {
// the registration doesn't exist
callee.SendError(&Error{
Type: msg.MessageType(),
Request: msg.Request,
Error: WAMP_ERROR_NO_SUCH_REGISTRATION,
})
} else {
delete(d.registrations, procedure.Procedure)
delete(d.procedures, msg.Registration)
callee.SendUnregistered(&Unregistered{
Request: msg.Request,
})
}
}

func (d *DefaultDealer) Call(caller Caller, msg *Call) {
if reg, ok := d.registrations[msg.Procedure]; !ok {
caller.SendError(&Error{
Type: msg.MessageType(),
Request: msg.Request,
Error: WAMP_ERROR_NO_SUCH_PROCEDURE,
})
} else {
if rproc, ok := d.procedures[reg]; !ok {
// found a registration id, but doesn't match any remote procedure
caller.SendError(&Error{
Type: msg.MessageType(),
Request: msg.Request,
// TODO: what should this error be?
Error: URI("wamp.error.internal_error"),
})
} else {
// everything checks out, make the invocation request
d.calls[msg.Request] = caller
invocationID := NewID()
d.invocations[invocationID] = msg.Request
rproc.Endpoint.SendInvocation(&Invocation{
Request: invocationID,
Registration: reg,
Arguments: msg.Arguments,
ArgumentsKw: msg.ArgumentsKw,
})
}
}
}

func (d *DefaultDealer) Yield(callee Callee, msg *Yield) {
if callID, ok := d.invocations[msg.Request]; !ok {
callee.SendError(&Error{
Type: msg.MessageType(),
Request: msg.Request,
// TODO: what should this error be?
Error: URI("wamp.error.no_such_invocation"),
})
} else {
if caller, ok := d.calls[callID]; !ok {
// found the invocation id, but doesn't match any call id
callee.SendError(&Error{
Type: msg.MessageType(),
Request: msg.Request,
// TODO: what should this error be?
Error: URI("wamp.error.no_such_call"),
})
} else {
// return the result to the caller
caller.SendResult(&Result{
Request: callID,
Arguments: msg.Arguments,
ArgumentsKw: msg.ArgumentsKw,
})
}
}
}
46 changes: 46 additions & 0 deletions wampv2/dealer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package wampv2

import (
. "github.com/smartystreets/goconvey/convey"
"testing"
)

type TestCallee struct {
received Message
}

func (c *TestCallee) SendError(msg *Error) { c.received = msg }
func (c *TestCallee) SendRegistered(msg *Registered) { c.received = msg }
func (c *TestCallee) SendUnregistered(msg *Unregistered) { c.received = msg }
func (c *TestCallee) SendInvocation(msg *Invocation) { c.received = msg }

func TestRegister(t *testing.T) {
Convey("Registering a procedure", t, func() {
dealer := NewDefaultDealer()
callee := &TestCallee{}
testProcedure := URI("turnpike.test.endpoint")
msg := &Register{Request: 123, Procedure: testProcedure}
dealer.Register(callee, msg)

Convey("The callee should have received a REGISTERED message", func() {
reg := callee.received.(*Registered).Registration
So(reg, ShouldNotEqual, 0)
})

Convey("The dealer should have the endpoint registered", func() {
reg := callee.received.(*Registered).Registration
reg2, ok := dealer.registrations[testProcedure]
So(ok, ShouldBeTrue)
So(reg, ShouldEqual, reg2)
proc, ok := dealer.procedures[reg]
So(ok, ShouldBeTrue)
So(proc.Procedure, ShouldEqual, testProcedure)
})

Convey("The same procedure cannot be registered more than once", func() {
msg := &Register{Request: 321, Procedure: testProcedure}
dealer.Register(callee, msg)
So(callee.received, ShouldHaveSameTypeAs, &Error{})
})
})
}
7 changes: 7 additions & 0 deletions wampv2/realm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ type Realm interface {
// Broker returns a custom broker for this realm.
// If this is nil, the default broker will be used.
Broker() Broker
// Dealer returns a custom dealer for this realm.
// If this is nil, the default dealer will be used.
Dealer() Dealer
}

type DefaultRealm struct {
Expand All @@ -17,3 +20,7 @@ func NewDefaultRealm() *DefaultRealm {
func (realm *DefaultRealm) Broker() Broker {
return nil
}

func (realm *DefaultRealm) Dealer() Dealer {
return nil
}
64 changes: 46 additions & 18 deletions wampv2/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Router interface {
// DefaultRouter is a very basic WAMP router.
type DefaultRouter struct {
*DefaultBroker
*DefaultDealer

realms map[URI]Realm
clients map[URI][]Session
Expand All @@ -41,6 +42,7 @@ type DefaultRouter struct {
func NewDefaultRouter() *DefaultRouter {
return &DefaultRouter{
DefaultBroker: NewDefaultBroker(),
DefaultDealer: NewDefaultDealer(),
realms: make(map[URI]Realm),
clients: make(map[URI][]Session),
}
Expand Down Expand Up @@ -71,6 +73,13 @@ func (r *DefaultRouter) broker(realm URI) Broker {
return r
}

func (r *DefaultRouter) dealer(realm URI) Dealer {
if d := r.realms[realm].Dealer(); d != nil {
return d
}
return r
}

func (r *DefaultRouter) handleSession(sess Session, realm URI) {
defer sess.Close()

Expand Down Expand Up @@ -100,34 +109,45 @@ func (r *DefaultRouter) handleSession(sess Session, realm URI) {
if pub, ok := sess.Endpoint.(Publisher); ok {
r.broker(realm).Publish(pub, v)
} else {
err := &Error{
Type: v.MessageType(),
Request: v.Request,
Error: WAMP_ERROR_NOT_AUTHORIZED,
}
sess.Send(err)
r.invalidSessionError(sess, v, v.Request)
}
case *Subscribe:
if sub, ok := sess.Endpoint.(Subscriber); ok {
r.broker(realm).Subscribe(sub, v)
} else {
err := &Error{
Type: v.MessageType(),
Request: v.Request,
Error: WAMP_ERROR_NOT_AUTHORIZED,
}
sess.Send(err)
r.invalidSessionError(sess, v, v.Request)
}
case *Unsubscribe:
if sub, ok := sess.Endpoint.(Subscriber); ok {
r.broker(realm).Unsubscribe(sub, v)
} else {
err := &Error{
Type: v.MessageType(),
Request: v.Request,
Error: WAMP_ERROR_NOT_AUTHORIZED,
}
sess.Send(err)
r.invalidSessionError(sess, v, v.Request)
}

// Dealer messages
case *Register:
if callee, ok := sess.Endpoint.(Callee); ok {
r.dealer(realm).Register(callee, v)
} else {
r.invalidSessionError(sess, v, v.Request)
}
case *Unregister:
if callee, ok := sess.Endpoint.(Callee); ok {
r.dealer(realm).Unregister(callee, v)
} else {
r.invalidSessionError(sess, v, v.Request)
}
case *Call:
if caller, ok := sess.Endpoint.(Caller); ok {
r.dealer(realm).Call(caller, v)
} else {
r.invalidSessionError(sess, v, v.Request)
}
case *Yield:
if callee, ok := sess.Endpoint.(Callee); ok {
r.dealer(realm).Yield(callee, v)
} else {
r.invalidSessionError(sess, v, v.Request)
}

default:
Expand All @@ -136,6 +156,14 @@ func (r *DefaultRouter) handleSession(sess Session, realm URI) {
}
}

func (r *DefaultRouter) invalidSessionError(sess Session, msg Message, req ID) {
sess.Send(&Error{
Type: msg.MessageType(),
Request: req,
Error: WAMP_ERROR_NOT_AUTHORIZED,
})
}

func (r *DefaultRouter) Accept(ep Endpoint) error {
if r.closing {
ep.Send(&Abort{Reason: WAMP_ERROR_SYSTEM_SHUTDOWN})
Expand Down
Loading

0 comments on commit bf62f8c

Please sign in to comment.