Skip to content

Commit

Permalink
gnatsd: -health and -rank options
Browse files Browse the repository at this point in the history
Leader election and priority
assignment are made available
with -health and -rank.

InternalClient interface offers
plugin capability for running
internal clients.

Fixes nats-io#433
  • Loading branch information
glycerine committed Feb 11, 2017
1 parent 37ce250 commit 30f4930
Show file tree
Hide file tree
Showing 23 changed files with 2,167 additions and 11 deletions.
66 changes: 66 additions & 0 deletions health/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package health

import (
"github.com/nats-io/gnatsd/server"
"github.com/nats-io/gnatsd/server/lcon"
"net"
"time"
)

type Agent struct {
opts *server.Options
mship *Membership
}

func NewAgent(opts *server.Options) *Agent {
return &Agent{
opts: opts,
}
}

func (h *Agent) Name() string {
return "health-agent"
}

// Start makes an internal
// entirely in-process client that monitors
// cluster health and manages group
// membership functions.
//
func (h *Agent) Start(
info server.Info,
opts server.Options,
lsnReady chan struct{},
accept func(nc net.Conn),
logger server.Logger,
) error {

// To keep the health client fast and its traffic
// internal-only, we use an bi-directional,
// in-memory version of a TCP stream.
cli, srv := lcon.NewBidir(info.MaxPayload * 2)

rank := info.ServerRank
beat := time.Second

cfg := &MembershipCfg{
MaxClockSkew: time.Second,
BeatDur: beat,
MyRank: rank,
CliConn: cli,
Logger: logger,
}
h.mship = NewMembership(cfg)

go func() {
select {
case <-lsnReady:
accept(srv)
}
}()
return h.mship.Start()
}

func (h *Agent) Stop() {
h.mship.Stop()
}
130 changes: 130 additions & 0 deletions health/bchan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package health

import (
"sync"
)

// Bchan is an 1:M non-blocking value-loadable channel.
// The client needs to only know about one
// rule: after a receive on Ch, you must call Bchan.BcastAck().
//
type Bchan struct {
Ch chan interface{}
mu sync.Mutex
on bool
cur interface{}
}

// NewBchan constructor should be told
// how many recipients are expected in
// expectedDiameter. If the expectedDiameter
// is wrong the Bchan will still function,
// but you may get slower concurrency
// than if the number is accurate. It
// is fine to overestimate the diameter by
// a little or even be off completely,
// but the extra slots in the buffered channel
// take up some memory and will add
// linearly to the service time
// as they are maintained.
func NewBchan(expectedDiameter int) *Bchan {
if expectedDiameter <= 0 {
expectedDiameter = 1
}
return &Bchan{
Ch: make(chan interface{}, expectedDiameter+1),
}
}

// On turns on the broadcast channel without
// changing the value to be transmitted.
//
func (b *Bchan) On() {
b.mu.Lock()
defer b.mu.Unlock()
b.on = true
b.fill()
}

// Set stores a value to be broadcast
// and clears any prior queued up
// old values. Call On() after set
// to activate the new value.
// See also Bcast() that does Set()
// followed by On() in one call.
//
func (b *Bchan) Set(val interface{}) {
b.mu.Lock()
defer b.mu.Unlock()
b.cur = val
b.drain()
}

// Get returns the currently set
// broadcast value.
func (b *Bchan) Get() interface{} {
b.mu.Lock()
defer b.mu.Unlock()
return b.cur
}

// Bcast is the common case of doing
// both Set() and then On() together
// to start broadcasting a new value.
//
func (b *Bchan) Bcast(val interface{}) {
b.mu.Lock()
defer b.mu.Unlock()
b.cur = val
b.drain()
b.on = true
b.fill()
}

// Clear turns off broadcasting and
// empties the channel of any old values.
func (b *Bchan) Clear() {
b.mu.Lock()
defer b.mu.Unlock()
b.on = false
b.drain()
b.cur = nil
}

// drain all messages, leaving b.Ch empty.
// Users typically want Clear() instead.
func (b *Bchan) drain() {
// empty chan
for {
select {
case <-b.Ch:
default:
return
}
}
}

// BcastAck must be called immediately after
// a client receives on Ch. All
// clients on every channel receive must call BcastAck after receiving
// on the channel Ch. This makes such channels
// self-servicing, as BcastAck will re-fill the
// async channel with the current value.
func (b *Bchan) BcastAck() {
b.mu.Lock()
defer b.mu.Unlock()
if b.on {
b.fill()
}
}

// fill up the channel
func (b *Bchan) fill() {
for {
select {
case b.Ch <- b.cur:
default:
return
}
}
}
63 changes: 63 additions & 0 deletions health/bchan_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package health

import (
"testing"
)

func TestBchan(t *testing.T) {

sz := 3
bc := NewBchan(sz)

select {
case <-bc.Ch:
t.Fatal("bc starts open; it should have blocked")
default:
// ok, good.
}

aligator := "bill"
bc.Bcast(aligator)

for j := 0; j < 3; j++ {
for i := 0; i < sz; i++ {
select {
case b := <-bc.Ch:
bc.BcastAck()
if b != aligator {
t.Fatal("Bcast(aligator) means aligator should always be read on the bc")
}
default:
t.Fatal("bc is now closed, should have read back aligator. Refresh should have restocked us.")
}
}
}

// multiple Set are fine:
crocadile := "lyle"
bc.Bcast(crocadile)

for j := 0; j < 3; j++ {
for i := 0; i < sz; i++ {
select {
case b := <-bc.Ch:
bc.BcastAck()
if b != crocadile {
t.Fatal("Bcast(crocadile) means crocadile should always be read on the bc")
}
default:
t.Fatal("bc is now closed, should have read back crocadile. Refresh should have restocked us.")
}
}
}

// and after Off, we should block
bc.Clear()
select {
case <-bc.Ch:
t.Fatal("Clear() means recevie should have blocked.")
default:
// ok, good.
}

}
59 changes: 59 additions & 0 deletions health/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package health

import (
"github.com/nats-io/gnatsd/server"
"net"
"time"
)

type MembershipCfg struct {

// max we allow for clocks to be out of sync.
// default to 1 second if not set.
MaxClockSkew time.Duration

// how often we heartbeat. defaults to 100msec
// if not set.
BeatDur time.Duration

// NatsUrl example "nats://127.0.0.1:4222"
NatsUrl string

// defaults to "_nats.cluster.members."
SysMemberPrefix string

// LeaseTime is the minimum time the
// leader is elected for. Defaults to 10 sec.
LeaseTime time.Duration

// provide a default until the server gives us rank
MyRank int

// optional, if provided we will use this connection on
// the client side.
CliConn net.Conn

Logger server.Logger
}

func (cfg *MembershipCfg) SetDefaults() {
if cfg.LeaseTime == 0 {
cfg.LeaseTime = time.Second * 10
}
if cfg.SysMemberPrefix == "" {
cfg.SysMemberPrefix = "_nats.cluster.members."
}
if cfg.BeatDur == 0 {
cfg.BeatDur = 100 * time.Millisecond
}
if cfg.MaxClockSkew == 0 {
cfg.MaxClockSkew = time.Second
}
if cfg.NatsUrl == "" {
cfg.NatsUrl = "nats://127.0.0.1:4222"
}
}

func (cfg *MembershipCfg) Dial(network, address string) (net.Conn, error) {
return cfg.CliConn, nil
}
40 changes: 40 additions & 0 deletions health/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package health

import (
"flag"
"fmt"
"log"
"os"
"strconv"
"time"
)

func Example() {

log.SetFlags(0)
flag.Usage = usage
flag.Parse()

args := flag.Args()
if len(args) < 1 {
usage()
}

rank := 0
var err error
if len(args) >= 2 {
rank, err = strconv.Atoi(args[1])
if err != nil {
fmt.Fprintf(os.Stderr, "2nd arg should be our numeric rank")
}
}

cfg := &MembershipCfg{
MaxClockSkew: time.Second,
BeatDur: 100 * time.Millisecond,
NatsUrl: "nats://" + args[0], // "nats://127.0.0.1:4222"
MyRank: rank,
}
m := NewMembership(cfg)
panicOn(m.Start())
}
Loading

0 comments on commit 30f4930

Please sign in to comment.