forked from nats-io/nats-server
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
gnatsd: -health, -rank, -lease, -beat
options to control health monitoring. Leader election among gnatsd instances, cluster health update topics, and priority rank assignment from the command line are made available. InternalClient interface offers a general plugin interface for running internal clients. Fixes nats-io#433
- Loading branch information
Showing
18 changed files
with
1,998 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
# The ALLCALL leased-leader election algorithm. | ||
|
||
Jason E. Aten | ||
|
||
February 2017 | ||
|
||
constants (examples) | ||
----------- | ||
|
||
Let heartBeat = 1 sec. | ||
|
||
Let leaseTime = 10 sec. | ||
|
||
Let maxClockSkew = 1 sec, where maxClockSkew | ||
is a bound on how far out of sync our local | ||
clocks may drift. | ||
|
||
givens | ||
-------- | ||
|
||
* Given: Let each server have a numeric integer rank, that is distinct | ||
and unique to that server. If necessary an extremely long | ||
true random number is used to break ties between server ranks, so | ||
that we may assert, with probability 1, that all ranks are distinct integers, | ||
and that each server, absent an in-force lease, can be put into a strict total order. | ||
|
||
* Rule: The lower the rank is preferred for being the leader. | ||
|
||
* Ordering by lease time then rank: we order the pair (leaseExpires, rank) first | ||
by largest leaseExpires time, then by lower rank. If | ||
both leases have expired, then lease time is not considered as | ||
a part of the ordering, and rank alone determines the | ||
new leader. | ||
|
||
ALLCALL Algorithm phases | ||
=========================== | ||
|
||
### I. Init phase | ||
|
||
When a server joins the cluster, | ||
it does not issue allcalls (a ping of all members) | ||
until after leaseTime + maxClockSkew time has elapsed. | ||
|
||
During init, the server does, however, accept and respond to allcalls() from | ||
other cluster members. The allcall() ping will contain | ||
the current (lease, leader-rank) and leader-id | ||
according to the issuer of the allcall(). Every | ||
recipient of the allcall updates her local information | ||
of who she thinks the leader is, so long as the | ||
received information is monotone in the (lease, leader-rank) | ||
ordering; so a later unexpired lease will replace an | ||
earlier unexpired lease, and if both are expired then | ||
the lower rank will replace the larger rank as winner | ||
of the current leader role. | ||
|
||
### II. regular ping phase | ||
|
||
After a server has finished its Init phase, it | ||
enters its ping phase, where is stays until it | ||
is shut down. | ||
|
||
During ping phase, the server continues to accept and respond | ||
to allcall requests from other servers. Now in addition, | ||
the server also issues its own allcall() pings every | ||
heartBeat seconds. | ||
|
||
### III. Election and Lease determination | ||
|
||
Election and leasing are computed locally, by each | ||
node, one heartBeat after receiving any replies | ||
from the allcall(). The localnode who issued | ||
the allcall sorts the respondents, including | ||
itself, and if all leases have expired, it | ||
determines who is the new leader and marks | ||
their lease as starting from now. Lease | ||
and leader computation is done locally | ||
and independently on each server. | ||
Once in ping phase, this new determination | ||
is broadcast at the next heartbeat when | ||
the local node issues an allcall(). | ||
|
||
If a node receives an allcall with a leader | ||
claim, and that leader claim has a shorter | ||
lease expirtaion time than the existing | ||
leader, the new proposed leader is rejected | ||
in favor of the current leader. This | ||
favors continuity of leadership until | ||
the end of the current leaders term, and | ||
is a product of the sort order | ||
described above where we sort | ||
candidates by lease time then rank. | ||
|
||
## Properties of the allcall | ||
|
||
The allcalls() are heard by all active cluster members, and | ||
contain the sender's computed result of who the current leader is, | ||
and replies answer back with the recipient's own rank and id. Each | ||
recipient of an allcall() replies to all cluster members. | ||
Both the sending and the replying to the allcall are | ||
broadcasts that are published to a well known topic. | ||
|
||
## Safety/Convergence: ALLCALL converges to one leader | ||
|
||
Suppose two nodes are partitioned and so both are leaders on | ||
their own side of the network. Then suppose the network | ||
is joined again, so the two leaders are brought together | ||
by a healing of the network, or by adding a new link | ||
between the networks. The two nodes exchange Ids and lease | ||
times, and the node with the shorter valid lease time | ||
adopts the node with the longer lease as leader, | ||
since that is the sort order. The adoption takes | ||
effect as soon as the loser is informed of the | ||
other leader's lease via its allcall. | ||
|
||
## Liveness: a leader will be chosen | ||
|
||
Given the total order among nodes, exactly one | ||
will be lowest rank and thus be the preferred | ||
leader at the end of the any current leader's | ||
lease, even if the current lease holder | ||
has failed. Hence, with at least one live | ||
node (that lives beyond one lease term), | ||
the system can run for at most one | ||
lease term before electing a leader. | ||
|
||
|
||
## commentary | ||
|
||
ALLCALL does not guarantee that there will | ||
never be more than one leader. Availability | ||
in the face of network partition is | ||
desirable in many cases, and ALLCALL is | ||
appropriate for these. ALLCALL does | ||
not guarantee that a leader will always | ||
be present, but with live nodes it | ||
does provide that the cluster will | ||
have a leader after one lease term + | ||
maxClockSkew has expired. | ||
|
||
By design, ALLCALL functions well | ||
in cluster with only one and two | ||
nodes, or with any number, including | ||
an even number, of nodes. Compared to quorum | ||
based elections like raft and paxos, | ||
where an odd number of at least three | ||
nodes is required, this can be very desirable. | ||
ALLCALL is appropriate for AP, | ||
rather than CP, style systems, where | ||
availability is more important | ||
than having a single writer. When | ||
writes are idempotent or deduplicated | ||
downstream, it is better for uptime | ||
to run an always-on leadership system. | ||
|
||
implementation | ||
------------ | ||
|
||
ALLCALL is implented on top of | ||
the Nats system here, currently | ||
on a branch. | ||
|
||
https://github.com/glycerine/gnatsd/blob/minhealth/health/health.go | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package health | ||
|
||
import ( | ||
"fmt" | ||
"net" | ||
"time" | ||
|
||
"github.com/nats-io/gnatsd/server" | ||
//"github.com/nats-io/gnatsd/server/lcon" | ||
) | ||
|
||
// Agent implements the InternalClient interface. | ||
// It provides health status checks and | ||
// leader election from among the candidate | ||
// gnatsd instances in a cluster. | ||
type Agent struct { | ||
opts *server.Options | ||
mship *Membership | ||
} | ||
|
||
// NewAgent makes a new Agent. | ||
func NewAgent(opts *server.Options) *Agent { | ||
return &Agent{ | ||
opts: opts, | ||
} | ||
} | ||
|
||
// Name should identify the internal client for logging. | ||
func (h *Agent) Name() string { | ||
return "health-agent" | ||
} | ||
|
||
// Start makes an internal | ||
// entirely in-process client that monitors | ||
// cluster health and manages group | ||
// membership functions. | ||
// | ||
func (h *Agent) Start( | ||
info server.Info, | ||
opts server.Options, | ||
logger server.Logger, | ||
|
||
) (net.Conn, error) { | ||
|
||
// To keep the health client fast and its traffic | ||
// internal-only, we use an bi-directional, | ||
// in-memory version of a TCP stream. | ||
// | ||
// The buffers really do have to be of | ||
// sufficient size, or we will | ||
// deadlock/livelock the system. | ||
// | ||
cli, srv, err := NewInternalClientPair() | ||
if err != nil { | ||
return nil, fmt.Errorf("NewInternalClientPair() returned error: %s", err) | ||
} | ||
|
||
rank := opts.HealthRank | ||
beat := opts.HealthBeat | ||
lease := opts.HealthLease | ||
|
||
cfg := &MembershipCfg{ | ||
MaxClockSkew: time.Second, | ||
BeatDur: beat, | ||
LeaseTime: lease, | ||
MyRank: rank, | ||
CliConn: cli, | ||
Log: logger, | ||
} | ||
h.mship = NewMembership(cfg) | ||
go h.mship.Start() | ||
return srv, nil | ||
} | ||
|
||
// Stop halts the background goroutine. | ||
func (h *Agent) Stop() { | ||
h.mship.Stop() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package health | ||
|
||
import "sync" | ||
|
||
// atomic map from string to *ServerLoc | ||
|
||
type AtomicServerLocMap struct { | ||
U map[string]*ServerLoc `json:"U"` | ||
tex sync.RWMutex | ||
} | ||
|
||
func NewAtomicServerLocMap() *AtomicServerLocMap { | ||
return &AtomicServerLocMap{ | ||
U: make(map[string]*ServerLoc), | ||
} | ||
} | ||
|
||
func (m *AtomicServerLocMap) Len() int { | ||
m.tex.RLock() | ||
n := len(m.U) | ||
m.tex.RUnlock() | ||
return n | ||
} | ||
|
||
func (m *AtomicServerLocMap) Get(key string) *ServerLoc { | ||
m.tex.RLock() | ||
s := m.U[key] | ||
m.tex.RUnlock() | ||
return s | ||
} | ||
|
||
func (m *AtomicServerLocMap) Get2(key string) (*ServerLoc, bool) { | ||
m.tex.RLock() | ||
v, ok := m.U[key] | ||
m.tex.RUnlock() | ||
return v, ok | ||
} | ||
|
||
func (m *AtomicServerLocMap) Set(key string, val *ServerLoc) { | ||
m.tex.Lock() | ||
m.U[key] = val | ||
m.tex.Unlock() | ||
} | ||
|
||
func (m *AtomicServerLocMap) Del(key string) { | ||
m.tex.Lock() | ||
delete(m.U, key) | ||
m.tex.Unlock() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package health | ||
|
||
import ( | ||
"net" | ||
"time" | ||
|
||
"github.com/nats-io/gnatsd/logger" | ||
"github.com/nats-io/gnatsd/server" | ||
) | ||
|
||
const DEAF_TRUE = 1 | ||
const DEAF_FALSE = 0 | ||
|
||
type MembershipCfg struct { | ||
|
||
// max we allow for clocks to be out of sync. | ||
// default to 1 second if not set. | ||
MaxClockSkew time.Duration | ||
|
||
// how often we heartbeat. defaults to 100msec | ||
// if not set. | ||
BeatDur time.Duration | ||
|
||
// NatsUrl example "nats://127.0.0.1:4222" | ||
NatsUrl string | ||
|
||
// defaults to "_nats.cluster.members." | ||
SysMemberPrefix string | ||
|
||
// LeaseTime is the minimum time the | ||
// leader is elected for. Defaults to 10 sec. | ||
LeaseTime time.Duration | ||
|
||
// provide a default until the server gives us rank | ||
MyRank int | ||
|
||
// optional, if provided we will use this connection on | ||
// the client side. | ||
CliConn net.Conn | ||
|
||
// where we log stuff. | ||
Log server.Logger | ||
|
||
// for testing under network partition | ||
deaf int64 | ||
|
||
// how much history to save | ||
historyCount int | ||
} | ||
|
||
func (cfg *MembershipCfg) SetDefaults() { | ||
if cfg.LeaseTime == 0 { | ||
cfg.LeaseTime = time.Second * 10 | ||
} | ||
if cfg.SysMemberPrefix == "" { | ||
cfg.SysMemberPrefix = "_nats.cluster.members." | ||
} | ||
if cfg.BeatDur == 0 { | ||
cfg.BeatDur = 100 * time.Millisecond | ||
} | ||
if cfg.MaxClockSkew == 0 { | ||
cfg.MaxClockSkew = time.Second | ||
} | ||
if cfg.NatsUrl == "" { | ||
cfg.NatsUrl = "nats://127.0.0.1:4222" | ||
} | ||
if cfg.Log == nil { | ||
// stderr | ||
cfg.Log = logger.NewStdLogger(micros, debug, trace, colors, pid) | ||
} | ||
} | ||
|
||
const colors = false | ||
const micros, pid = true, true | ||
const trace = false | ||
|
||
//const debug = true | ||
const debug = false | ||
|
||
func (cfg *MembershipCfg) Dial(network, address string) (net.Conn, error) { | ||
return cfg.CliConn, nil | ||
} |
Oops, something went wrong.