Skip to content

Commit

Permalink
gnatsd: -health, -rank, -lease, -beat
Browse files Browse the repository at this point in the history
options to control health monitoring.

The InternalClient interface offers
a general plugin interface
for running internal clients
within a gnatsd process.

The -health flag to gnatsd starts
an internal client that
runs a leader election
among the available gnatsd
instances and publishes cluster
membership changes to a set
of cluster health topics.

The -beat and -lease flags
control how frequently health
checks are run, and how long
leader leases persist.

The health agent can also be
run standalone as healthcmd.
See the main method in
gnatsd/health/healthcmd.

The -rank flag to gnatsd
adds priority rank assignment
from the command line. The
lowest ranking gnatsd instance wins
the lease on the current
election. The election
algorithm is described in
gnatsd/health/ALGORITHM.md
and is implemented in
gnatsd/health/health.go.

Fixes nats-io#433
  • Loading branch information
glycerine committed Feb 12, 2017
1 parent 37ce250 commit c42837c
Show file tree
Hide file tree
Showing 18 changed files with 2,095 additions and 10 deletions.
176 changes: 176 additions & 0 deletions health/ALGORITHM.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# The ALLCALL leased-leader election algorithm.

Jason E. Aten

February 2017

definitions, with example values.
-----------

Let heartBeat = 1 sec. This is how frequently
we will assess cluster health by
sending out an allcall ping.

Let leaseTime = 10 sec. This is how long a leader's lease
lasts for. A failed leader won't be replaced until its
lease (+maxClockSkew) has expired. The lease
also lets leaders efficiently serve reads
without doing quorum checks.

Let maxClockSkew = 1 sec. The maxClockSkew
is a bound on how far out of sync our local
clocks may drift.

givens
--------

* Given: Let each server have a numeric integer rank, that is distinct
and unique to that server. If necessary an extremely long
true random number is used to break ties between server ranks, so
that we may assert, with probability 1, that all ranks are distinct integers,
and that each server, absent an in-force lease, can be put into a strict total order.

* Rule: The lower the rank is preferred for being the leader.

* Ordering by lease time then rank: we order the pair (leaseExpires, rank) first
by largest leaseExpires time, then by lower rank. If
both leases have expired, then lease time is not considered as
a part of the ordering, and rank alone determines the
new leader.

ALLCALL Algorithm phases
===========================

### I. Init phase

When a server joins the cluster,
it does not issue allcalls (a ping of all members)
until after leaseTime + maxClockSkew time has elapsed.

During init, the server does, however, accept and respond to allcalls() from
other cluster members. The allcall() ping will contain
the current (lease, leader-rank) and leader-id
according to the issuer of the allcall(). Every
recipient of the allcall updates her local information
of who she thinks the leader is, so long as the
received information is monotone in the (lease, leader-rank)
ordering; so a later unexpired lease will replace an
earlier unexpired lease, and if both are expired then
the lower rank will replace the larger rank as winner
of the current leader role.

### II. regular ping phase

After a server has finished its Init phase, it
enters its ping phase, where is stays until it
is shut down.

During ping phase, the server continues to accept and respond
to allcall requests from other servers. Now in addition,
the server also issues its own allcall() pings every
heartBeat seconds.

### III. Election and Lease determination

Election and leasing are computed locally, by each
node, one heartBeat after receiving any replies
from the allcall(). The localnode who issued
the allcall sorts the respondents, including
itself, and if all leases have expired, it
determines who is the new leader and marks
their lease as starting from now. Lease
and leader computation is done locally
and independently on each server.
Once in ping phase, this new determination
is broadcast at the next heartbeat when
the local node issues an allcall().

If a node receives an allcall with a leader
claim, and that leader claim has a shorter
lease expirtaion time than the existing
leader, the new proposed leader is rejected
in favor of the current leader. This
favors continuity of leadership until
the end of the current leaders term, and
is a product of the sort order
described above where we sort
candidates by lease time then rank.

## Properties of the allcall

The allcalls() are heard by all active cluster members, and
contain the sender's computed result of who the current leader is,
and replies answer back with the recipient's own rank and id. Each
recipient of an allcall() replies to all cluster members.
Both the sending and the replying to the allcall are
broadcasts that are published to a well known topic.

## Safety/Convergence: ALLCALL converges to one leader

Suppose two nodes are partitioned and so both are leaders on
their own side of the network. Then suppose the network
is joined again, so the two leaders are brought together
by a healing of the network, or by adding a new link
between the networks. The two nodes exchange Ids and lease
times, and the node with the shorter valid lease time
adopts the node with the longer lease as leader,
since that is the sort order. The adoption takes
effect as soon as the loser's current lease expires.
Hence the two leader situation persists for at
most one lease term after the network join.

## Liveness: a leader will be chosen

Given the total order among nodes, exactly one
will be lowest rank and thus be the preferred
leader at the end of the any current leader's
lease, even if the current lease holder
has failed. Hence, with at least one live
node, the system can run for at most one
lease term before electing a leader.


## commentary

ALLCALL does not guarantee that there will
never be more than one leader. Availability
in the face of network partition is
desirable in many cases, and ALLCALL is
appropriate for these. This is congruent
with Nats design as an always-on system.
ALLCALL does not guarantee that a
leader will always be present, but
with live nodes it does provide
that the cluster will have a leader
after one lease term + maxClockSkew
has expired.

By design, ALLCALL functions well
in cluster with any number of nodes.
One and two nodes, or an even number
of nodes will work just fine.

Compared to quorum based elections
like raft and paxos, where an odd
number of at least three
nodes is required, this can be very desirable.

ALLCALL is appropriate for AP,
rather than CP, style systems, where
availability is more important
than having a single writer. When
writes are idempotent or deduplicated
downstream, this is typically preferred.
It is better for availability and
uptime to run an always-on leadership
system.

implementation
------------

ALLCALL is implented on top of
the Nats system, see the health/
subdirectory of

https://github.com/nats-io/gnatsd

78 changes: 78 additions & 0 deletions health/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package health

import (
"fmt"
"net"
"time"

"github.com/nats-io/gnatsd/server"
//"github.com/nats-io/gnatsd/server/lcon"
)

// Agent implements the InternalClient interface.
// It provides health status checks and
// leader election from among the candidate
// gnatsd instances in a cluster.
type Agent struct {
opts *server.Options
mship *Membership
}

// NewAgent makes a new Agent.
func NewAgent(opts *server.Options) *Agent {
return &Agent{
opts: opts,
}
}

// Name should identify the internal client for logging.
func (h *Agent) Name() string {
return "health-agent"
}

// Start makes an internal
// entirely in-process client that monitors
// cluster health and manages group
// membership functions.
//
func (h *Agent) Start(
info server.Info,
opts server.Options,
logger server.Logger,

) (net.Conn, error) {

// To keep the health client fast and its traffic
// internal-only, we use an bi-directional,
// in-memory version of a TCP stream.
//
// The buffers really do have to be of
// sufficient size, or we will
// deadlock/livelock the system.
//
cli, srv, err := NewInternalClientPair()
if err != nil {
return nil, fmt.Errorf("NewInternalClientPair() returned error: %s", err)
}

rank := opts.HealthRank
beat := opts.HealthBeat
lease := opts.HealthLease

cfg := &MembershipCfg{
MaxClockSkew: time.Second,
BeatDur: beat,
LeaseTime: lease,
MyRank: rank,
CliConn: cli,
Log: logger,
}
h.mship = NewMembership(cfg)
go h.mship.Start()
return srv, nil
}

// Stop halts the background goroutine.
func (h *Agent) Stop() {
h.mship.Stop()
}
49 changes: 49 additions & 0 deletions health/amap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package health

import "sync"

// atomic map from string to *ServerLoc

type AtomicServerLocMap struct {
U map[string]*ServerLoc `json:"U"`
tex sync.RWMutex
}

func NewAtomicServerLocMap() *AtomicServerLocMap {
return &AtomicServerLocMap{
U: make(map[string]*ServerLoc),
}
}

func (m *AtomicServerLocMap) Len() int {
m.tex.RLock()
n := len(m.U)
m.tex.RUnlock()
return n
}

func (m *AtomicServerLocMap) Get(key string) *ServerLoc {
m.tex.RLock()
s := m.U[key]
m.tex.RUnlock()
return s
}

func (m *AtomicServerLocMap) Get2(key string) (*ServerLoc, bool) {
m.tex.RLock()
v, ok := m.U[key]
m.tex.RUnlock()
return v, ok
}

func (m *AtomicServerLocMap) Set(key string, val *ServerLoc) {
m.tex.Lock()
m.U[key] = val
m.tex.Unlock()
}

func (m *AtomicServerLocMap) Del(key string) {
m.tex.Lock()
delete(m.U, key)
m.tex.Unlock()
}
82 changes: 82 additions & 0 deletions health/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package health

import (
"net"
"time"

"github.com/nats-io/gnatsd/logger"
"github.com/nats-io/gnatsd/server"
)

const DEAF_TRUE = 1
const DEAF_FALSE = 0

type MembershipCfg struct {

// max we allow for clocks to be out of sync.
// default to 1 second if not set.
MaxClockSkew time.Duration

// how often we heartbeat. defaults to 100msec
// if not set.
BeatDur time.Duration

// NatsUrl example "nats://127.0.0.1:4222"
NatsUrl string

// defaults to "_nats.cluster.members."
SysMemberPrefix string

// LeaseTime is the minimum time the
// leader is elected for. Defaults to 10 sec.
LeaseTime time.Duration

// provide a default until the server gives us rank
MyRank int

// optional, if provided we will use this connection on
// the client side.
CliConn net.Conn

// where we log stuff.
Log server.Logger

// for testing under network partition
deaf int64

// how much history to save
historyCount int
}

func (cfg *MembershipCfg) SetDefaults() {
if cfg.LeaseTime == 0 {
cfg.LeaseTime = time.Second * 10
}
if cfg.SysMemberPrefix == "" {
cfg.SysMemberPrefix = "_nats.cluster.members."
}
if cfg.BeatDur == 0 {
cfg.BeatDur = 100 * time.Millisecond
}
if cfg.MaxClockSkew == 0 {
cfg.MaxClockSkew = time.Second
}
if cfg.NatsUrl == "" {
cfg.NatsUrl = "nats://127.0.0.1:4222"
}
if cfg.Log == nil {
// stderr
cfg.Log = logger.NewStdLogger(micros, debug, trace, colors, pid)
}
}

const colors = false
const micros, pid = true, true
const trace = false

//const debug = true
const debug = false

func (cfg *MembershipCfg) Dial(network, address string) (net.Conn, error) {
return cfg.CliConn, nil
}
Loading

0 comments on commit c42837c

Please sign in to comment.