-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract meshconn from raft-on-mesh experiment #11
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
# meshconn | ||
|
||
meshconn implements [net.PacketConn](https://golang.org/pkg/net/#PacketConn) on top of mesh. | ||
Think of it as UDP with benefits: | ||
NAT and bastion host (DMZ) traversal, | ||
broadcast/multicast in networks where this is normally not possible e.g. EC2, | ||
and an up-to-date, queryable memberlist. | ||
|
||
meshconn supports [net.Addr](https://golang.org/pkg/net/#Addr) of the form `weavemesh://<PeerName>`. | ||
By default, `<PeerName>` is a hardware address of the form `01:02:03:FD:FE:FF`. | ||
Other forms of PeerName e.g. hashes are supported. | ||
|
||
meshconn itself is largely stateless and has best-effort delivery semantics. | ||
As a future experiment, it could easily be amended to have basic resiliency guarantees. | ||
Also, at the moment, PacketConn read and write deadlines are not supported. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package meshconn | ||
|
||
import ( | ||
"fmt" | ||
"net" | ||
|
||
"github.com/weaveworks/mesh" | ||
) | ||
|
||
// MeshAddr implements net.Addr for mesh peers. | ||
type MeshAddr struct { | ||
mesh.PeerName | ||
} | ||
|
||
var _ net.Addr = MeshAddr{} | ||
|
||
// Network returns weavemesh. | ||
func (a MeshAddr) Network() string { return "weavemesh" } | ||
|
||
// String returns weavemesh://<PeerName>. | ||
func (a MeshAddr) String() string { return fmt.Sprintf("%s://%s", a.Network(), a.PeerName.String()) } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
package meshconn | ||
|
||
import ( | ||
"errors" | ||
"log" | ||
"net" | ||
"time" | ||
|
||
"github.com/weaveworks/mesh" | ||
) | ||
|
||
var ( | ||
// ErrShortRead is returned by ReadFrom when the | ||
// passed buffer is too small for the packet. | ||
ErrShortRead = errors.New("short read") | ||
|
||
// ErrPeerClosed is returned by ReadFrom and WriteTo | ||
// when the peer is closed during the operation. | ||
ErrPeerClosed = errors.New("peer closed") | ||
|
||
// ErrGossipNotRegistered is returned by Write to when attempting | ||
// to write before a mesh.Gossip has been registered in the peer. | ||
ErrGossipNotRegistered = errors.New("gossip not registered") | ||
|
||
// ErrNotMeshAddr is returned by WriteTo when attempting | ||
// to write to a non-mesh address. | ||
ErrNotMeshAddr = errors.New("not a mesh addr") | ||
|
||
// ErrNotSupported is returned by methods that are not supported. | ||
ErrNotSupported = errors.New("not supported") | ||
) | ||
|
||
// Peer implements mesh.Gossiper and net.PacketConn. | ||
type Peer struct { | ||
name mesh.PeerName | ||
gossip mesh.Gossip | ||
recv chan pkt | ||
actions chan func() | ||
quit chan struct{} | ||
logger *log.Logger | ||
} | ||
|
||
// NewPeer returns a Peer, which can be used as a net.PacketConn. | ||
// Clients must Register a mesh.Gossip before calling ReadFrom or WriteTo. | ||
// Clients should aggressively consume from ReadFrom. | ||
func NewPeer(name mesh.PeerName, logger *log.Logger) *Peer { | ||
p := &Peer{ | ||
name: name, | ||
gossip: nil, // initially no gossip | ||
recv: make(chan pkt), | ||
actions: make(chan func()), | ||
quit: make(chan struct{}), | ||
logger: logger, | ||
} | ||
go p.loop() | ||
return p | ||
} | ||
|
||
func (p *Peer) loop() { | ||
for { | ||
select { | ||
case f := <-p.actions: | ||
f() | ||
case <-p.quit: | ||
return | ||
} | ||
} | ||
} | ||
|
||
// Register injects the mesh.Gossip and enables full-duplex communication. | ||
// Clients should consume from ReadFrom without blocking. | ||
func (p *Peer) Register(gossip mesh.Gossip) { | ||
p.actions <- func() { p.gossip = gossip } | ||
} | ||
|
||
// ReadFrom implements net.PacketConn. | ||
// Clients should consume from ReadFrom without blocking. | ||
func (p *Peer) ReadFrom(b []byte) (n int, remote net.Addr, err error) { | ||
c := make(chan struct{}) | ||
p.actions <- func() { | ||
go func() { // so as not to block loop | ||
defer close(c) | ||
select { | ||
case pkt := <-p.recv: | ||
n = copy(b, pkt.Buf) | ||
remote = MeshAddr{pkt.Src} | ||
if n < len(pkt.Buf) { | ||
err = ErrShortRead | ||
} | ||
case <-p.quit: | ||
err = ErrPeerClosed | ||
} | ||
}() | ||
} | ||
<-c | ||
return n, remote, err | ||
} | ||
|
||
// WriteTo implements net.PacketConn. | ||
func (p *Peer) WriteTo(b []byte, dst net.Addr) (n int, err error) { | ||
c := make(chan struct{}) | ||
p.actions <- func() { | ||
defer close(c) | ||
if p.gossip == nil { | ||
err = ErrGossipNotRegistered | ||
return | ||
} | ||
meshAddr, ok := dst.(MeshAddr) | ||
if !ok { | ||
err = ErrNotMeshAddr | ||
return | ||
} | ||
pkt := pkt{Src: p.name, Buf: b} | ||
if meshAddr.PeerName == p.name { | ||
p.recv <- pkt | ||
return | ||
} | ||
// TODO(pb): detect and support broadcast | ||
buf := pkt.encode() | ||
n = len(buf) | ||
err = p.gossip.GossipUnicast(meshAddr.PeerName, buf) | ||
} | ||
<-c | ||
return n, err | ||
} | ||
|
||
// Close implements net.PacketConn. | ||
func (p *Peer) Close() error { | ||
close(p.quit) | ||
return nil | ||
} | ||
|
||
// LocalAddr implements net.PacketConn. | ||
func (p *Peer) LocalAddr() net.Addr { | ||
return MeshAddr{p.name} | ||
} | ||
|
||
// SetDeadline implements net.PacketConn. | ||
// SetDeadline is not supported. | ||
func (p *Peer) SetDeadline(time.Time) error { | ||
return ErrNotSupported | ||
} | ||
|
||
// SetReadDeadline implements net.PacketConn. | ||
// SetReadDeadline is not supported. | ||
func (p *Peer) SetReadDeadline(time.Time) error { | ||
return ErrNotSupported | ||
} | ||
|
||
// SetWriteDeadline implements net.PacketConn. | ||
// SetWriteDeadline is not supported. | ||
func (p *Peer) SetWriteDeadline(time.Time) error { | ||
return ErrNotSupported | ||
} | ||
|
||
// Gossip implements mesh.Gossiper. | ||
func (p *Peer) Gossip() (complete mesh.GossipData) { | ||
return pktSlice{} // we're stateless | ||
} | ||
|
||
// OnGossip implements mesh.Gossiper. | ||
// The buf is a single pkt. | ||
func (p *Peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) { | ||
return pktSlice{makePkt(buf)}, nil | ||
} | ||
|
||
// OnGossipBroadcast implements mesh.Gossiper. | ||
// The buf is a single pkt | ||
func (p *Peer) OnGossipBroadcast(_ mesh.PeerName, buf []byte) (received mesh.GossipData, err error) { | ||
pkt := makePkt(buf) | ||
p.recv <- pkt // to ReadFrom | ||
return pktSlice{pkt}, nil | ||
} | ||
|
||
// OnGossipUnicast implements mesh.Gossiper. | ||
// The buf is a single pkt. | ||
func (p *Peer) OnGossipUnicast(_ mesh.PeerName, buf []byte) error { | ||
pkt := makePkt(buf) | ||
p.recv <- pkt // to ReadFrom | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package meshconn | ||
|
||
import ( | ||
"bytes" | ||
"encoding/gob" | ||
|
||
"github.com/weaveworks/mesh" | ||
) | ||
|
||
type pkt struct { | ||
Src mesh.PeerName | ||
Buf []byte | ||
} | ||
|
||
func makePkt(buf []byte) pkt { | ||
var p pkt | ||
if err := gob.NewDecoder(bytes.NewBuffer(buf)).Decode(&p); err != nil { | ||
panic(err) | ||
} | ||
return p | ||
} | ||
|
||
func (p pkt) encode() []byte { | ||
var buf bytes.Buffer | ||
if err := gob.NewEncoder(&buf).Encode(p); err != nil { | ||
panic(err) | ||
} | ||
return buf.Bytes() | ||
} | ||
|
||
// pktSlice implements mesh.GossipData. | ||
type pktSlice []pkt | ||
|
||
var _ mesh.GossipData = &pktSlice{} | ||
|
||
func (s pktSlice) Encode() [][]byte { | ||
bufs := make([][]byte, len(s)) | ||
for i, pkt := range s { | ||
bufs[i] = pkt.encode() | ||
} | ||
return bufs | ||
} | ||
|
||
func (s pktSlice) Merge(other mesh.GossipData) mesh.GossipData { | ||
o := other.(pktSlice) | ||
merged := make(pktSlice, 0, len(s)+len(o)) | ||
merged = append(merged, s...) | ||
merged = append(merged, o...) | ||
return merged | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This comment was marked as abuse.
Sorry, something went wrong.
This comment was marked as abuse.
Sorry, something went wrong.