Skip to content

Commit

Permalink
Extract meshconn from raft-on-mesh experiment
Browse files Browse the repository at this point in the history
This is independently useful, and will have a separate lifecycle.
  • Loading branch information
peterbourgon committed Feb 18, 2016
1 parent 240359e commit 781daff
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 0 deletions.
15 changes: 15 additions & 0 deletions examples/meshconn/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# meshconn

meshconn implements [net.PacketConn](https://golang.org/pkg/net/#PacketConn) on top of mesh.
Think of it as UDP with benefits:
NAT and bastion host (DMZ) traversal,
broadcast/multicast in networks where this is normally not possible e.g. EC2,
and an up-to-date, queryable memberlist.

meshconn supports [net.Addr](https://golang.org/pkg/net/#Addr) of the form `weavemesh://<PeerName>`.
By default, `<PeerName>` is a hardware address of the form `01:02:03:FD:FE:FF`.
Other forms of PeerName e.g. hashes are supported.

meshconn itself is largely stateless and has best-effort delivery semantics.
As a future experiment, it could easily be amended to have basic resiliency guarantees.
Also, at the moment, PacketConn read and write deadlines are not supported.
21 changes: 21 additions & 0 deletions examples/meshconn/mesh_addr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package meshconn

import (
"fmt"
"net"

"github.com/weaveworks/mesh"
)

// MeshAddr implements net.Addr for mesh peers.
type MeshAddr struct {
mesh.PeerName
}

var _ net.Addr = MeshAddr{}

// Network returns weavemesh.
func (a MeshAddr) Network() string { return "weavemesh" }

// String returns weavemesh://<PeerName>.
func (a MeshAddr) String() string { return fmt.Sprintf("%s://%s", a.Network(), a.PeerName.String()) }
181 changes: 181 additions & 0 deletions examples/meshconn/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package meshconn

import (
"errors"
"log"
"net"
"time"

"github.com/weaveworks/mesh"
)

var (
// ErrShortRead is returned by ReadFrom when the
// passed buffer is too small for the packet.
ErrShortRead = errors.New("short read")

// ErrPeerClosed is returned by ReadFrom and WriteTo
// when the peer is closed during the operation.
ErrPeerClosed = errors.New("peer closed")

// ErrGossipNotRegistered is returned by Write to when attempting
// to write before a mesh.Gossip has been registered in the peer.
ErrGossipNotRegistered = errors.New("gossip not registered")

// ErrNotMeshAddr is returned by WriteTo when attempting
// to write to a non-mesh address.
ErrNotMeshAddr = errors.New("not a mesh addr")

// ErrNotSupported is returned by methods that are not supported.
ErrNotSupported = errors.New("not supported")
)

// Peer implements mesh.Gossiper and net.PacketConn.
type Peer struct {
name mesh.PeerName
gossip mesh.Gossip
recv chan pkt
actions chan func()
quit chan struct{}
logger *log.Logger
}

// NewPeer returns a Peer, which can be used as a net.PacketConn.
// Clients must Register a mesh.Gossip before calling ReadFrom or WriteTo.
// Clients should aggressively consume from ReadFrom.
func NewPeer(name mesh.PeerName, logger *log.Logger) *Peer {
p := &Peer{
name: name,
gossip: nil, // initially no gossip
recv: make(chan pkt),
actions: make(chan func()),
quit: make(chan struct{}),
logger: logger,
}
go p.loop()
return p
}

func (p *Peer) loop() {
for {
select {
case f := <-p.actions:
f()
case <-p.quit:
return
}
}
}

// Register injects the mesh.Gossip and enables full-duplex communication.
// Clients should consume from ReadFrom without blocking.
func (p *Peer) Register(gossip mesh.Gossip) {
p.actions <- func() { p.gossip = gossip }
}

// ReadFrom implements net.PacketConn.
// Clients should consume from ReadFrom without blocking.
func (p *Peer) ReadFrom(b []byte) (n int, remote net.Addr, err error) {
c := make(chan struct{})
p.actions <- func() {
go func() { // so as not to block loop
defer close(c)
select {
case pkt := <-p.recv:
n = copy(b, pkt.Buf)
remote = MeshAddr{pkt.Src}
if n < len(pkt.Buf) {
err = ErrShortRead
}
case <-p.quit:
err = ErrPeerClosed
}
}()
}
<-c
return n, remote, err
}

// WriteTo implements net.PacketConn.
func (p *Peer) WriteTo(b []byte, dst net.Addr) (n int, err error) {
c := make(chan struct{})
p.actions <- func() {
defer close(c)
if p.gossip == nil {
err = ErrGossipNotRegistered
return
}
meshAddr, ok := dst.(MeshAddr)
if !ok {
err = ErrNotMeshAddr
return
}
pkt := pkt{Src: p.name, Buf: b}
if meshAddr.PeerName == p.name {
p.recv <- pkt
return
}
// TODO(pb): detect and support broadcast
buf := pkt.encode()
n = len(buf)
err = p.gossip.GossipUnicast(meshAddr.PeerName, buf)
}
<-c
return n, err
}

// Close implements net.PacketConn.
func (p *Peer) Close() error {
close(p.quit)
return nil
}

// LocalAddr implements net.PacketConn.
func (p *Peer) LocalAddr() net.Addr {
return MeshAddr{p.name}
}

// SetDeadline implements net.PacketConn.
// SetDeadline is not supported.
func (p *Peer) SetDeadline(time.Time) error {
return ErrNotSupported
}

// SetReadDeadline implements net.PacketConn.
// SetReadDeadline is not supported.
func (p *Peer) SetReadDeadline(time.Time) error {
return ErrNotSupported
}

// SetWriteDeadline implements net.PacketConn.
// SetWriteDeadline is not supported.
func (p *Peer) SetWriteDeadline(time.Time) error {
return ErrNotSupported
}

// Gossip implements mesh.Gossiper.
func (p *Peer) Gossip() (complete mesh.GossipData) {
return pktSlice{} // we're stateless
}

// OnGossip implements mesh.Gossiper.
// The buf is a single pkt.
func (p *Peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) {
return pktSlice{makePkt(buf)}, nil
}

// OnGossipBroadcast implements mesh.Gossiper.
// The buf is a single pkt
func (p *Peer) OnGossipBroadcast(_ mesh.PeerName, buf []byte) (received mesh.GossipData, err error) {
pkt := makePkt(buf)
p.recv <- pkt // to ReadFrom
return pktSlice{pkt}, nil
}

// OnGossipUnicast implements mesh.Gossiper.
// The buf is a single pkt.
func (p *Peer) OnGossipUnicast(_ mesh.PeerName, buf []byte) error {
pkt := makePkt(buf)
p.recv <- pkt // to ReadFrom
return nil
}
50 changes: 50 additions & 0 deletions examples/meshconn/pkt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package meshconn

import (
"bytes"
"encoding/gob"

"github.com/weaveworks/mesh"
)

type pkt struct {
Src mesh.PeerName
Buf []byte
}

func makePkt(buf []byte) pkt {
var p pkt
if err := gob.NewDecoder(bytes.NewBuffer(buf)).Decode(&p); err != nil {
panic(err)
}
return p
}

func (p pkt) encode() []byte {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(p); err != nil {
panic(err)
}
return buf.Bytes()
}

// pktSlice implements mesh.GossipData.
type pktSlice []pkt

var _ mesh.GossipData = &pktSlice{}

func (s pktSlice) Encode() [][]byte {
bufs := make([][]byte, len(s))
for i, pkt := range s {
bufs[i] = pkt.encode()
}
return bufs
}

func (s pktSlice) Merge(other mesh.GossipData) mesh.GossipData {
o := other.(pktSlice)
merged := make(pktSlice, 0, len(s)+len(o))
merged = append(merged, s...)
merged = append(merged, o...)
return merged
}

0 comments on commit 781daff

Please sign in to comment.