Skip to content

Commit

Permalink
gnatsd: -health, -rank, -lease, -beat
Browse files Browse the repository at this point in the history
 options to control health monitoring.

Leader election and priority
assignment are made available.

InternalClient interface offers
plugin capability for running
internal clients.

Fixes nats-io#433
  • Loading branch information
glycerine committed Feb 12, 2017
1 parent 37ce250 commit a3c5adf
Show file tree
Hide file tree
Showing 25 changed files with 2,525 additions and 12 deletions.
163 changes: 163 additions & 0 deletions health/ALGORITHM.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# The ALLCALL leased-leader election algorithm.

Jason E. Aten

February 2017

constants (examples)
-----------

Let heartBeat = 1 sec.

Let leaseTime = 10 sec.

Let maxClockSkew = 1 sec, where maxClockSkew
is a bound on how far out of sync our local
clocks may drift.

givens
--------

* Given: Let each server have a numeric integer rank, that is distinct
and unique to that server. If necessary an extremely long
true random number is used to break ties between server ranks, so
that we may assert, with probability 1, that all ranks are distinct integers,
and that each server, absent an in-force lease, can be put into a strict total order.

* Rule: The lower the rank is preferred for being the leader.

* Ordering by lease time then rank: we order the pair (leaseExpires, rank) first
by largest leaseExpires time, then by lower rank. If
both leases have expired, then lease time is not considered as
a part of the ordering, and rank alone determines the
new leader.

ALLCALL Algorithm phases
===========================

### I. Init phase

When a server joins the cluster,
it does not issue allcalls (a ping of all members)
until after leaseTime + maxClockSkew time has elapsed.

During init, the server does, however, accept and respond to allcalls() from
other cluster members. The allcall() ping will contain
the current (lease, leader-rank) and leader-id
according to the issuer of the allcall(). Every
recipient of the allcall updates her local information
of who she thinks the leader is, so long as the
received information is monotone in the (lease, leader-rank)
ordering; so a later unexpired lease will replace an
earlier unexpired lease, and if both are expired then
the lower rank will replace the larger rank as winner
of the current leader role.

### II. regular ping phase

After a server has finished its Init phase, it
enters its ping phase, where is stays until it
is shut down.

During ping phase, the server continues to accept and respond
to allcall requests from other servers. Now in addition,
the server also issues its own allcall() pings every
heartBeat seconds.

### III. Election and Lease determination

Election and leasing are computed locally, by each
node, one heartBeat after receiving any replies
from the allcall(). The localnode who issued
the allcall sorts the respondents, including
itself, and if all leases have expired, it
determines who is the new leader and marks
their lease as starting from now. Lease
and leader computation is done locally
and independently on each server.
Once in ping phase, this new determination
is broadcast at the next heartbeat when
the local node issues an allcall().

If a node receives an allcall with a leader
claim, and that leader claim has a shorter
lease expirtaion time than the existing
leader, the new proposed leader is rejected
in favor of the current leader. This
favors continuity of leadership until
the end of the current leaders term, and
is a product of the sort order
described above where we sort
candidates by lease time then rank.

## Properties of the allcall

The allcalls() are heard by all active cluster members, and
contain the sender's computed result of who the current leader is,
and replies answer back with the recipient's own rank and id. Each
recipient of an allcall() replies to all cluster members.
Both the sending and the replying to the allcall are
broadcasts that are published to a well known topic.

## Safety/Convergence: ALLCALL converges to one leader

Suppose two nodes are partitioned and so both are leaders on
their own side of the network. Then suppose the network
is joined again, so the two leaders are brought together
by a healing of the network, or by adding a new link
between the networks. The two nodes exchange Ids and lease
times, and the node with the shorter valid lease time
adopts the node with the longer lease as leader,
since that is the sort order. The adoption takes
effect as soon as the loser is informed of the
other leader's lease via its allcall.

## Liveness: a leader will be chosen

Given the total order among nodes, exactly one
will be lowest rank and thus be the preferred
leader at the end of the any current leader's
lease, even if the current lease holder
has failed. Hence, with at least one live
node (that lives beyond one lease term),
the system can run for at most one
lease term before electing a leader.


## commentary

ALLCALL does not guarantee that there will
never be more than one leader. Availability
in the face of network partition is
desirable in many cases, and ALLCALL is
appropriate for these. ALLCALL does
not guarantee that a leader will always
be present, but with live nodes it
does provide that the cluster will
have a leader after one lease term +
maxClockSkew has expired.

By design, ALLCALL functions well
in cluster with only one and two
nodes, or with any number, including
an even number, of nodes. Compared to quorum
based elections like raft and paxos,
where an odd number of at least three
nodes is required, this can be very desirable.
ALLCALL is appropriate for AP,
rather than CP, style systems, where
availability is more important
than having a single writer. When
writes are idempotent or deduplicated
downstream, it is better for uptime
to run an always-on leadership system.

implementation
------------

ALLCALL is implented on top of
the Nats system here, currently
on a branch.

https://github.com/glycerine/gnatsd/blob/minhealth/health/health.go

74 changes: 74 additions & 0 deletions health/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package health

import (
"net"
"time"

"github.com/nats-io/gnatsd/server"
"github.com/nats-io/gnatsd/server/lcon"
)

// Agent implements the InternalClient interface.
// It provides health status checks and
// leader election from among the candidate
// gnatsd instances in a cluster.
type Agent struct {
opts *server.Options
mship *Membership
}

// NewAgent makes a new Agent.
func NewAgent(opts *server.Options) *Agent {
return &Agent{
opts: opts,
}
}

// Name should identify the internal client for logging.
func (h *Agent) Name() string {
return "health-agent"
}

// Start makes an internal
// entirely in-process client that monitors
// cluster health and manages group
// membership functions.
//
func (h *Agent) Start(
info server.Info,
opts server.Options,
logger server.Logger,

) (net.Conn, error) {

// To keep the health client fast and its traffic
// internal-only, we use an bi-directional,
// in-memory version of a TCP stream.
//
// The buffers really do have to be of
// sufficient size, or we will
// deadlock/livelock the system.
//
cli, srv := lcon.NewBidir(info.MaxPayload * 2)

rank := opts.HealthRank
beat := opts.HealthBeat
lease := opts.HealthLease

cfg := &MembershipCfg{
MaxClockSkew: time.Second,
BeatDur: beat,
LeaseTime: lease,
MyRank: rank,
CliConn: cli,
Log: logger,
}
h.mship = NewMembership(cfg)
go h.mship.Start()
return srv, nil
}

// Stop halts the background goroutine.
func (h *Agent) Stop() {
h.mship.Stop()
}
49 changes: 49 additions & 0 deletions health/amap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package health

import "sync"

// atomic map from string to *ServerLoc

type AtomicServerLocMap struct {
U map[string]*ServerLoc `json:"U"`
tex sync.RWMutex
}

func NewAtomicServerLocMap() *AtomicServerLocMap {
return &AtomicServerLocMap{
U: make(map[string]*ServerLoc),
}
}

func (m *AtomicServerLocMap) Len() int {
m.tex.RLock()
n := len(m.U)
m.tex.RUnlock()
return n
}

func (m *AtomicServerLocMap) Get(key string) *ServerLoc {
m.tex.RLock()
s := m.U[key]
m.tex.RUnlock()
return s
}

func (m *AtomicServerLocMap) Get2(key string) (*ServerLoc, bool) {
m.tex.RLock()
v, ok := m.U[key]
m.tex.RUnlock()
return v, ok
}

func (m *AtomicServerLocMap) Set(key string, val *ServerLoc) {
m.tex.Lock()
m.U[key] = val
m.tex.Unlock()
}

func (m *AtomicServerLocMap) Del(key string) {
m.tex.Lock()
delete(m.U, key)
m.tex.Unlock()
}
67 changes: 67 additions & 0 deletions health/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package health

import (
"net"
"time"

"github.com/nats-io/gnatsd/logger"
"github.com/nats-io/gnatsd/server"
)

type MembershipCfg struct {

// max we allow for clocks to be out of sync.
// default to 1 second if not set.
MaxClockSkew time.Duration

// how often we heartbeat. defaults to 100msec
// if not set.
BeatDur time.Duration

// NatsUrl example "nats://127.0.0.1:4222"
NatsUrl string

// defaults to "_nats.cluster.members."
SysMemberPrefix string

// LeaseTime is the minimum time the
// leader is elected for. Defaults to 10 sec.
LeaseTime time.Duration

// provide a default until the server gives us rank
MyRank int

// optional, if provided we will use this connection on
// the client side.
CliConn net.Conn

Log server.Logger
}

func (cfg *MembershipCfg) SetDefaults() {
if cfg.LeaseTime == 0 {
cfg.LeaseTime = time.Second * 10
}
if cfg.SysMemberPrefix == "" {
cfg.SysMemberPrefix = "_nats.cluster.members."
}
if cfg.BeatDur == 0 {
cfg.BeatDur = 100 * time.Millisecond
}
if cfg.MaxClockSkew == 0 {
cfg.MaxClockSkew = time.Second
}
if cfg.NatsUrl == "" {
cfg.NatsUrl = "nats://127.0.0.1:4222"
}
if cfg.Log == nil {
// stderr
const micros, pid = true, true
const colors, trace, debug = false, false, false
cfg.Log = logger.NewStdLogger(micros, debug, trace, colors, pid)
}
}

func (cfg *MembershipCfg) Dial(network, address string) (net.Conn, error) {
return cfg.CliConn, nil
}
Loading

0 comments on commit a3c5adf

Please sign in to comment.