Skip to content

Commit

Permalink
Merge pull request #101 from murali-reddy/gossip-interval
Browse files Browse the repository at this point in the history
make gossipInterval configurable through router config
  • Loading branch information
bboreham authored Jul 31, 2019
2 parents 512bdb7 + ad2f946 commit b7aea39
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 5 deletions.
4 changes: 4 additions & 0 deletions local_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func (peer *localPeer) encode(enc *gob.Encoder) {
// ACTOR server

func (peer *localPeer) actorLoop(actionChan <-chan localPeerAction) {
gossipInterval := defaultGossipInterval
if peer.router != nil {
gossipInterval = peer.router.gossipInterval()
}
gossipTimer := time.Tick(gossipInterval)
for {
select {
Expand Down
14 changes: 12 additions & 2 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ var (
// ChannelSize is the buffer size used by so-called actor goroutines
// throughout mesh.
ChannelSize = 16

defaultGossipInterval = 30 * time.Second
)

const (
tcpHeartbeat = 30 * time.Second
gossipInterval = 30 * time.Second
maxDuration = time.Duration(math.MaxInt64)
acceptMaxTokens = 100
acceptTokenDelay = 100 * time.Millisecond // [2]
Expand All @@ -37,6 +38,7 @@ type Config struct {
ProtocolMinVersion byte
PeerDiscovery bool
TrustedSubnets []*net.IPNet
GossipInterval *time.Duration
}

// Router manages communication between this peer and the rest of the mesh.
Expand Down Expand Up @@ -154,7 +156,7 @@ func (router *Router) gossipChannel(channelName string) *gossipChannel {
if channel, found = router.gossipChannels[channelName]; found {
return channel
}
channel = newGossipChannel(channelName, router.Ourself, router.Routes, &surrogateGossiper{}, router.logger)
channel = newGossipChannel(channelName, router.Ourself, router.Routes, &surrogateGossiper{router: router}, router.logger)
channel.logf("created surrogate channel")
router.gossipChannels[channelName] = channel
return channel
Expand All @@ -170,6 +172,14 @@ func (router *Router) gossipChannelSet() map[*gossipChannel]struct{} {
return channels
}

func (router *Router) gossipInterval() time.Duration {
if router.Config.GossipInterval != nil {
return *router.Config.GossipInterval
} else {
return defaultGossipInterval
}
}

func (router *Router) handleGossip(tag protocolTag, payload []byte) error {
decoder := gob.NewDecoder(bytes.NewReader(payload))
var channelName string
Expand Down
5 changes: 5 additions & 0 deletions surrogate_gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
type surrogateGossiper struct {
sync.Mutex
prevUpdates []prevUpdate
router *Router
}

type prevUpdate struct {
Expand Down Expand Up @@ -56,6 +57,10 @@ func (s *surrogateGossiper) OnGossip(update []byte) (GossipData, error) {
// (this time limit is arbitrary; surrogateGossiper should pass on new gossip immediately
// so there should be no reason for a duplicate to show up after a long time)
updateTime := now()
gossipInterval := defaultGossipInterval
if s.router != nil {
gossipInterval = s.router.gossipInterval()
}
deleteBefore := updateTime.Add(-gossipInterval)
keepFrom := len(s.prevUpdates)
for i, p := range s.prevUpdates {
Expand Down
6 changes: 3 additions & 3 deletions surrogate_gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ func TestSurrogateGossiperOnGossip(t *testing.T) {
checkOnGossip(t, s, msg[1], msg[1])
checkOnGossip(t, s, msg[0], nil)
checkOnGossip(t, s, msg[1], nil)
myTime = myTime.Add(gossipInterval / 2) // Should not trigger cleardown
checkOnGossip(t, s, msg[2], msg[2]) // Only clears out old ones on new entry
myTime = myTime.Add(defaultGossipInterval / 2) // Should not trigger cleardown
checkOnGossip(t, s, msg[2], msg[2]) // Only clears out old ones on new entry
checkOnGossip(t, s, msg[0], nil)
checkOnGossip(t, s, msg[1], nil)
myTime = myTime.Add(gossipInterval)
myTime = myTime.Add(defaultGossipInterval)
checkOnGossip(t, s, msg[0], nil)
checkOnGossip(t, s, msg[3], msg[3]) // Only clears out old ones on new entry
checkOnGossip(t, s, msg[0], msg[0])
Expand Down

0 comments on commit b7aea39

Please sign in to comment.