From 781daffb818849a4dba32e1a38c636c7b1d8d97c Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Thu, 18 Feb 2016 09:40:44 +0100 Subject: [PATCH] Extract meshconn from raft-on-mesh experiment This is independently useful, and will have a separate lifecycle. --- examples/meshconn/README.md | 15 +++ examples/meshconn/mesh_addr.go | 21 ++++ examples/meshconn/peer.go | 181 +++++++++++++++++++++++++++++++++ examples/meshconn/pkt.go | 50 +++++++++ 4 files changed, 267 insertions(+) create mode 100644 examples/meshconn/README.md create mode 100644 examples/meshconn/mesh_addr.go create mode 100644 examples/meshconn/peer.go create mode 100644 examples/meshconn/pkt.go diff --git a/examples/meshconn/README.md b/examples/meshconn/README.md new file mode 100644 index 0000000..a4bb3b6 --- /dev/null +++ b/examples/meshconn/README.md @@ -0,0 +1,15 @@ +# meshconn + +meshconn implements [net.PacketConn](https://golang.org/pkg/net/#PacketConn) on top of mesh. +Think of it as UDP with benefits: + NAT and bastion host (DMZ) traversal, + broadcast/multicast in networks where this is normally not possible e.g. EC2, + and an up-to-date, queryable memberlist. + +meshconn supports [net.Addr](https://golang.org/pkg/net/#Addr) of the form `weavemesh://`. +By default, `` is a hardware address of the form `01:02:03:FD:FE:FF`. +Other forms of PeerName e.g. hashes are supported. + +meshconn itself is largely stateless and has best-effort delivery semantics. +As a future experiment, it could easily be amended to have basic resiliency guarantees. +Also, at the moment, PacketConn read and write deadlines are not supported. diff --git a/examples/meshconn/mesh_addr.go b/examples/meshconn/mesh_addr.go new file mode 100644 index 0000000..8e4eb87 --- /dev/null +++ b/examples/meshconn/mesh_addr.go @@ -0,0 +1,21 @@ +package meshconn + +import ( + "fmt" + "net" + + "github.com/weaveworks/mesh" +) + +// MeshAddr implements net.Addr for mesh peers. +type MeshAddr struct { + mesh.PeerName +} + +var _ net.Addr = MeshAddr{} + +// Network returns weavemesh. +func (a MeshAddr) Network() string { return "weavemesh" } + +// String returns weavemesh://. +func (a MeshAddr) String() string { return fmt.Sprintf("%s://%s", a.Network(), a.PeerName.String()) } diff --git a/examples/meshconn/peer.go b/examples/meshconn/peer.go new file mode 100644 index 0000000..fc90b89 --- /dev/null +++ b/examples/meshconn/peer.go @@ -0,0 +1,181 @@ +package meshconn + +import ( + "errors" + "log" + "net" + "time" + + "github.com/weaveworks/mesh" +) + +var ( + // ErrShortRead is returned by ReadFrom when the + // passed buffer is too small for the packet. + ErrShortRead = errors.New("short read") + + // ErrPeerClosed is returned by ReadFrom and WriteTo + // when the peer is closed during the operation. + ErrPeerClosed = errors.New("peer closed") + + // ErrGossipNotRegistered is returned by Write to when attempting + // to write before a mesh.Gossip has been registered in the peer. + ErrGossipNotRegistered = errors.New("gossip not registered") + + // ErrNotMeshAddr is returned by WriteTo when attempting + // to write to a non-mesh address. + ErrNotMeshAddr = errors.New("not a mesh addr") + + // ErrNotSupported is returned by methods that are not supported. + ErrNotSupported = errors.New("not supported") +) + +// Peer implements mesh.Gossiper and net.PacketConn. +type Peer struct { + name mesh.PeerName + gossip mesh.Gossip + recv chan pkt + actions chan func() + quit chan struct{} + logger *log.Logger +} + +// NewPeer returns a Peer, which can be used as a net.PacketConn. +// Clients must Register a mesh.Gossip before calling ReadFrom or WriteTo. +// Clients should aggressively consume from ReadFrom. +func NewPeer(name mesh.PeerName, logger *log.Logger) *Peer { + p := &Peer{ + name: name, + gossip: nil, // initially no gossip + recv: make(chan pkt), + actions: make(chan func()), + quit: make(chan struct{}), + logger: logger, + } + go p.loop() + return p +} + +func (p *Peer) loop() { + for { + select { + case f := <-p.actions: + f() + case <-p.quit: + return + } + } +} + +// Register injects the mesh.Gossip and enables full-duplex communication. +// Clients should consume from ReadFrom without blocking. +func (p *Peer) Register(gossip mesh.Gossip) { + p.actions <- func() { p.gossip = gossip } +} + +// ReadFrom implements net.PacketConn. +// Clients should consume from ReadFrom without blocking. +func (p *Peer) ReadFrom(b []byte) (n int, remote net.Addr, err error) { + c := make(chan struct{}) + p.actions <- func() { + go func() { // so as not to block loop + defer close(c) + select { + case pkt := <-p.recv: + n = copy(b, pkt.Buf) + remote = MeshAddr{pkt.Src} + if n < len(pkt.Buf) { + err = ErrShortRead + } + case <-p.quit: + err = ErrPeerClosed + } + }() + } + <-c + return n, remote, err +} + +// WriteTo implements net.PacketConn. +func (p *Peer) WriteTo(b []byte, dst net.Addr) (n int, err error) { + c := make(chan struct{}) + p.actions <- func() { + defer close(c) + if p.gossip == nil { + err = ErrGossipNotRegistered + return + } + meshAddr, ok := dst.(MeshAddr) + if !ok { + err = ErrNotMeshAddr + return + } + pkt := pkt{Src: p.name, Buf: b} + if meshAddr.PeerName == p.name { + p.recv <- pkt + return + } + // TODO(pb): detect and support broadcast + buf := pkt.encode() + n = len(buf) + err = p.gossip.GossipUnicast(meshAddr.PeerName, buf) + } + <-c + return n, err +} + +// Close implements net.PacketConn. +func (p *Peer) Close() error { + close(p.quit) + return nil +} + +// LocalAddr implements net.PacketConn. +func (p *Peer) LocalAddr() net.Addr { + return MeshAddr{p.name} +} + +// SetDeadline implements net.PacketConn. +// SetDeadline is not supported. +func (p *Peer) SetDeadline(time.Time) error { + return ErrNotSupported +} + +// SetReadDeadline implements net.PacketConn. +// SetReadDeadline is not supported. +func (p *Peer) SetReadDeadline(time.Time) error { + return ErrNotSupported +} + +// SetWriteDeadline implements net.PacketConn. +// SetWriteDeadline is not supported. +func (p *Peer) SetWriteDeadline(time.Time) error { + return ErrNotSupported +} + +// Gossip implements mesh.Gossiper. +func (p *Peer) Gossip() (complete mesh.GossipData) { + return pktSlice{} // we're stateless +} + +// OnGossip implements mesh.Gossiper. +// The buf is a single pkt. +func (p *Peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) { + return pktSlice{makePkt(buf)}, nil +} + +// OnGossipBroadcast implements mesh.Gossiper. +// The buf is a single pkt +func (p *Peer) OnGossipBroadcast(_ mesh.PeerName, buf []byte) (received mesh.GossipData, err error) { + pkt := makePkt(buf) + p.recv <- pkt // to ReadFrom + return pktSlice{pkt}, nil +} + +// OnGossipUnicast implements mesh.Gossiper. +// The buf is a single pkt. +func (p *Peer) OnGossipUnicast(_ mesh.PeerName, buf []byte) error { + pkt := makePkt(buf) + p.recv <- pkt // to ReadFrom + return nil +} diff --git a/examples/meshconn/pkt.go b/examples/meshconn/pkt.go new file mode 100644 index 0000000..46e337d --- /dev/null +++ b/examples/meshconn/pkt.go @@ -0,0 +1,50 @@ +package meshconn + +import ( + "bytes" + "encoding/gob" + + "github.com/weaveworks/mesh" +) + +type pkt struct { + Src mesh.PeerName + Buf []byte +} + +func makePkt(buf []byte) pkt { + var p pkt + if err := gob.NewDecoder(bytes.NewBuffer(buf)).Decode(&p); err != nil { + panic(err) + } + return p +} + +func (p pkt) encode() []byte { + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(p); err != nil { + panic(err) + } + return buf.Bytes() +} + +// pktSlice implements mesh.GossipData. +type pktSlice []pkt + +var _ mesh.GossipData = &pktSlice{} + +func (s pktSlice) Encode() [][]byte { + bufs := make([][]byte, len(s)) + for i, pkt := range s { + bufs[i] = pkt.encode() + } + return bufs +} + +func (s pktSlice) Merge(other mesh.GossipData) mesh.GossipData { + o := other.(pktSlice) + merged := make(pktSlice, 0, len(s)+len(o)) + merged = append(merged, s...) + merged = append(merged, o...) + return merged +}