Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract meshconn from raft-on-mesh experiment #11

Merged
merged 1 commit into from
Feb 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions examples/meshconn/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# meshconn

meshconn implements [net.PacketConn](https://golang.org/pkg/net/#PacketConn) on top of mesh.
Think of it as UDP with benefits:
NAT and bastion host (DMZ) traversal,
broadcast/multicast in networks where this is normally not possible e.g. EC2,
and an up-to-date, queryable memberlist.

meshconn supports [net.Addr](https://golang.org/pkg/net/#Addr) of the form `weavemesh://<PeerName>`.
By default, `<PeerName>` is a hardware address of the form `01:02:03:FD:FE:FF`.
Other forms of PeerName e.g. hashes are supported.

meshconn itself is largely stateless and has best-effort delivery semantics.
As a future experiment, it could easily be amended to have basic resiliency guarantees.
Also, at the moment, PacketConn read and write deadlines are not supported.
21 changes: 21 additions & 0 deletions examples/meshconn/mesh_addr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package meshconn

import (
"fmt"
"net"

"github.com/weaveworks/mesh"
)

// MeshAddr implements net.Addr for mesh peers.
type MeshAddr struct {
mesh.PeerName
}

var _ net.Addr = MeshAddr{}

// Network returns weavemesh.
func (a MeshAddr) Network() string { return "weavemesh" }

// String returns weavemesh://<PeerName>.
func (a MeshAddr) String() string { return fmt.Sprintf("%s://%s", a.Network(), a.PeerName.String()) }
181 changes: 181 additions & 0 deletions examples/meshconn/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package meshconn

import (
"errors"
"log"
"net"
"time"

"github.com/weaveworks/mesh"
)

var (
// ErrShortRead is returned by ReadFrom when the
// passed buffer is too small for the packet.
ErrShortRead = errors.New("short read")

// ErrPeerClosed is returned by ReadFrom and WriteTo
// when the peer is closed during the operation.
ErrPeerClosed = errors.New("peer closed")

// ErrGossipNotRegistered is returned by Write to when attempting
// to write before a mesh.Gossip has been registered in the peer.
ErrGossipNotRegistered = errors.New("gossip not registered")

// ErrNotMeshAddr is returned by WriteTo when attempting
// to write to a non-mesh address.
ErrNotMeshAddr = errors.New("not a mesh addr")

// ErrNotSupported is returned by methods that are not supported.
ErrNotSupported = errors.New("not supported")
)

// Peer implements mesh.Gossiper and net.PacketConn.
type Peer struct {
name mesh.PeerName
gossip mesh.Gossip
recv chan pkt
actions chan func()
quit chan struct{}
logger *log.Logger
}

// NewPeer returns a Peer, which can be used as a net.PacketConn.
// Clients must Register a mesh.Gossip before calling ReadFrom or WriteTo.
// Clients should aggressively consume from ReadFrom.
func NewPeer(name mesh.PeerName, logger *log.Logger) *Peer {
p := &Peer{
name: name,
gossip: nil, // initially no gossip
recv: make(chan pkt),
actions: make(chan func()),
quit: make(chan struct{}),
logger: logger,
}
go p.loop()
return p
}

func (p *Peer) loop() {
for {
select {
case f := <-p.actions:
f()
case <-p.quit:
return
}
}
}

// Register injects the mesh.Gossip and enables full-duplex communication.
// Clients should consume from ReadFrom without blocking.
func (p *Peer) Register(gossip mesh.Gossip) {
p.actions <- func() { p.gossip = gossip }
}

// ReadFrom implements net.PacketConn.
// Clients should consume from ReadFrom without blocking.
func (p *Peer) ReadFrom(b []byte) (n int, remote net.Addr, err error) {
c := make(chan struct{})
p.actions <- func() {
go func() { // so as not to block loop
defer close(c)
select {
case pkt := <-p.recv:
n = copy(b, pkt.Buf)
remote = MeshAddr{pkt.Src}
if n < len(pkt.Buf) {
err = ErrShortRead
}
case <-p.quit:
err = ErrPeerClosed
}
}()
}
<-c
return n, remote, err
}

// WriteTo implements net.PacketConn.
func (p *Peer) WriteTo(b []byte, dst net.Addr) (n int, err error) {
c := make(chan struct{})
p.actions <- func() {
defer close(c)
if p.gossip == nil {
err = ErrGossipNotRegistered
return
}
meshAddr, ok := dst.(MeshAddr)
if !ok {
err = ErrNotMeshAddr
return
}
pkt := pkt{Src: p.name, Buf: b}
if meshAddr.PeerName == p.name {
p.recv <- pkt
return
}
// TODO(pb): detect and support broadcast

This comment was marked as abuse.

This comment was marked as abuse.

buf := pkt.encode()
n = len(buf)
err = p.gossip.GossipUnicast(meshAddr.PeerName, buf)
}
<-c
return n, err
}

// Close implements net.PacketConn.
func (p *Peer) Close() error {
close(p.quit)
return nil
}

// LocalAddr implements net.PacketConn.
func (p *Peer) LocalAddr() net.Addr {
return MeshAddr{p.name}
}

// SetDeadline implements net.PacketConn.
// SetDeadline is not supported.
func (p *Peer) SetDeadline(time.Time) error {
return ErrNotSupported
}

// SetReadDeadline implements net.PacketConn.
// SetReadDeadline is not supported.
func (p *Peer) SetReadDeadline(time.Time) error {
return ErrNotSupported
}

// SetWriteDeadline implements net.PacketConn.
// SetWriteDeadline is not supported.
func (p *Peer) SetWriteDeadline(time.Time) error {
return ErrNotSupported
}

// Gossip implements mesh.Gossiper.
func (p *Peer) Gossip() (complete mesh.GossipData) {
return pktSlice{} // we're stateless
}

// OnGossip implements mesh.Gossiper.
// The buf is a single pkt.
func (p *Peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) {
return pktSlice{makePkt(buf)}, nil
}

// OnGossipBroadcast implements mesh.Gossiper.
// The buf is a single pkt
func (p *Peer) OnGossipBroadcast(_ mesh.PeerName, buf []byte) (received mesh.GossipData, err error) {
pkt := makePkt(buf)
p.recv <- pkt // to ReadFrom
return pktSlice{pkt}, nil
}

// OnGossipUnicast implements mesh.Gossiper.
// The buf is a single pkt.
func (p *Peer) OnGossipUnicast(_ mesh.PeerName, buf []byte) error {
pkt := makePkt(buf)
p.recv <- pkt // to ReadFrom
return nil
}
50 changes: 50 additions & 0 deletions examples/meshconn/pkt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package meshconn

import (
"bytes"
"encoding/gob"

"github.com/weaveworks/mesh"
)

type pkt struct {
Src mesh.PeerName
Buf []byte
}

func makePkt(buf []byte) pkt {
var p pkt
if err := gob.NewDecoder(bytes.NewBuffer(buf)).Decode(&p); err != nil {
panic(err)
}
return p
}

func (p pkt) encode() []byte {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(p); err != nil {
panic(err)
}
return buf.Bytes()
}

// pktSlice implements mesh.GossipData.
type pktSlice []pkt

var _ mesh.GossipData = &pktSlice{}

func (s pktSlice) Encode() [][]byte {
bufs := make([][]byte, len(s))
for i, pkt := range s {
bufs[i] = pkt.encode()
}
return bufs
}

func (s pktSlice) Merge(other mesh.GossipData) mesh.GossipData {
o := other.(pktSlice)
merged := make(pktSlice, 0, len(s)+len(o))
merged = append(merged, s...)
merged = append(merged, o...)
return merged
}