Skip to content

Commit

Permalink
gnatsd: -health and -rank options
Browse files Browse the repository at this point in the history
Leader election and priority
assignment are made available
with -health and -rank.

InternalClient interface offers
plugin capability for running
internal clients.

Fixes nats-io#433
  • Loading branch information
glycerine committed Feb 11, 2017
1 parent 37ce250 commit b44d1ad
Show file tree
Hide file tree
Showing 21 changed files with 1,976 additions and 11 deletions.
73 changes: 73 additions & 0 deletions health/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package health

import (
"github.com/nats-io/gnatsd/server"
"github.com/nats-io/gnatsd/server/lcon"
"net"
"time"
)

// Agent implements the InternalClient interface.
// It provides health status checks and
// leader election from among the candidate
// gnatsd instances in a cluster.
type Agent struct {
opts *server.Options
mship *Membership
}

// NewAgent makes a new Agent.
func NewAgent(opts *server.Options) *Agent {
return &Agent{
opts: opts,
}
}

// Name should identify the internal client for logging.
func (h *Agent) Name() string {
return "health-agent"
}

// Start makes an internal
// entirely in-process client that monitors
// cluster health and manages group
// membership functions.
//
func (h *Agent) Start(
info server.Info,
opts server.Options,
lsnReady chan struct{},
accept func(nc net.Conn),
logger server.Logger,
) error {

// To keep the health client fast and its traffic
// internal-only, we use an bi-directional,
// in-memory version of a TCP stream.
cli, srv := lcon.NewBidir(info.MaxPayload * 2)

rank := info.ServerRank
beat := time.Second

cfg := &MembershipCfg{
MaxClockSkew: time.Second,
BeatDur: beat,
MyRank: rank,
CliConn: cli,
Logger: logger,
}
h.mship = NewMembership(cfg)

go func() {
select {
case <-lsnReady:
accept(srv)
}
}()
return h.mship.Start()
}

// Stop halts the background goroutine.
func (h *Agent) Stop() {
h.mship.Stop()
}
59 changes: 59 additions & 0 deletions health/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package health

import (
"github.com/nats-io/gnatsd/server"
"net"
"time"
)

type MembershipCfg struct {

// max we allow for clocks to be out of sync.
// default to 1 second if not set.
MaxClockSkew time.Duration

// how often we heartbeat. defaults to 100msec
// if not set.
BeatDur time.Duration

// NatsUrl example "nats://127.0.0.1:4222"
NatsUrl string

// defaults to "_nats.cluster.members."
SysMemberPrefix string

// LeaseTime is the minimum time the
// leader is elected for. Defaults to 10 sec.
LeaseTime time.Duration

// provide a default until the server gives us rank
MyRank int

// optional, if provided we will use this connection on
// the client side.
CliConn net.Conn

Logger server.Logger
}

func (cfg *MembershipCfg) SetDefaults() {
if cfg.LeaseTime == 0 {
cfg.LeaseTime = time.Second * 10
}
if cfg.SysMemberPrefix == "" {
cfg.SysMemberPrefix = "_nats.cluster.members."
}
if cfg.BeatDur == 0 {
cfg.BeatDur = 100 * time.Millisecond
}
if cfg.MaxClockSkew == 0 {
cfg.MaxClockSkew = time.Second
}
if cfg.NatsUrl == "" {
cfg.NatsUrl = "nats://127.0.0.1:4222"
}
}

func (cfg *MembershipCfg) Dial(network, address string) (net.Conn, error) {
return cfg.CliConn, nil
}
40 changes: 40 additions & 0 deletions health/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package health

import (
"flag"
"fmt"
"log"
"os"
"strconv"
"time"
)

func Example() {

log.SetFlags(0)
flag.Usage = usage
flag.Parse()

args := flag.Args()
if len(args) < 1 {
usage()
}

rank := 0
var err error
if len(args) >= 2 {
rank, err = strconv.Atoi(args[1])
if err != nil {
fmt.Fprintf(os.Stderr, "2nd arg should be our numeric rank")
}
}

cfg := &MembershipCfg{
MaxClockSkew: time.Second,
BeatDur: 100 * time.Millisecond,
NatsUrl: "nats://" + args[0], // "nats://127.0.0.1:4222"
MyRank: rank,
}
m := NewMembership(cfg)
panicOn(m.Start())
}
Loading

0 comments on commit b44d1ad

Please sign in to comment.