From 98ea50ab39a59e95b74155818275225066cbb41b Mon Sep 17 00:00:00 2001 From: "Jason E. Aten" Date: Thu, 9 Feb 2017 15:11:19 -0600 Subject: [PATCH] gnatsd: -health, -rank, -lease, -beat options to control health monitoring. The InternalClient interface offers a general plugin interface for running internal clients within a gnatsd process. The -health flag to gnatsd starts an internal client that runs a leader election among the available gnatsd instances and publishes cluster membership changes to a set of cluster health topics. The -beat and -lease flags control how frequently health checks are run, and how long leader leases persist. The health agent can also be run standalone as healthcmd. See the main method in gnatsd/health/healthcmd. The -rank flag to gnatsd adds priority rank assignment from the command line. The lowest ranking gnatsd instance wins the lease on the current election. The election algorithm is described in gnatsd/health/ALGORITHM.md and is implemented in gnatsd/health/health.go. Fixes #433 --- health/ALGORITHM.md | 163 ++++++++ health/agent.go | 78 ++++ health/amap.go | 49 +++ health/config.go | 82 ++++ health/health.go | 788 +++++++++++++++++++++++++++++++++++++++ health/health_test.go | 195 ++++++++++ health/healthcmd/main.go | 56 +++ health/icc.go | 64 ++++ health/icc_test.go | 48 +++ health/idem.go | 106 ++++++ health/rbuf.go | 128 +++++++ health/setutil.go | 112 ++++++ main.go | 15 + server/client.go | 6 + server/icli.go | 89 +++++ server/monitor.go | 9 +- server/opts.go | 16 +- server/server.go | 72 +++- 18 files changed, 2066 insertions(+), 10 deletions(-) create mode 100644 health/ALGORITHM.md create mode 100644 health/agent.go create mode 100644 health/amap.go create mode 100644 health/config.go create mode 100644 health/health.go create mode 100644 health/health_test.go create mode 100644 health/healthcmd/main.go create mode 100644 health/icc.go create mode 100644 health/icc_test.go create mode 100644 health/idem.go create mode 100644 health/rbuf.go create mode 100644 health/setutil.go create mode 100644 server/icli.go diff --git a/health/ALGORITHM.md b/health/ALGORITHM.md new file mode 100644 index 00000000000..48d16a2ccc5 --- /dev/null +++ b/health/ALGORITHM.md @@ -0,0 +1,163 @@ +# The ALLCALL leased-leader election algorithm. + +Jason E. Aten + +February 2017 + +constants (examples) +----------- + +Let heartBeat = 1 sec. + +Let leaseTime = 10 sec. + +Let maxClockSkew = 1 sec, where maxClockSkew +is a bound on how far out of sync our local +clocks may drift. + +givens +-------- + +* Given: Let each server have a numeric integer rank, that is distinct +and unique to that server. If necessary an extremely long +true random number is used to break ties between server ranks, so +that we may assert, with probability 1, that all ranks are distinct integers, +and that each server, absent an in-force lease, can be put into a strict total order. + +* Rule: The lower the rank is preferred for being the leader. + +* Ordering by lease time then rank: we order the pair (leaseExpires, rank) first +by largest leaseExpires time, then by lower rank. If +both leases have expired, then lease time is not considered as +a part of the ordering, and rank alone determines the +new leader. + +ALLCALL Algorithm phases +=========================== + +### I. Init phase + +When a server joins the cluster, +it does not issue allcalls (a ping of all members) +until after leaseTime + maxClockSkew time has elapsed. + +During init, the server does, however, accept and respond to allcalls() from +other cluster members. The allcall() ping will contain +the current (lease, leader-rank) and leader-id +according to the issuer of the allcall(). Every +recipient of the allcall updates her local information +of who she thinks the leader is, so long as the +received information is monotone in the (lease, leader-rank) +ordering; so a later unexpired lease will replace an +earlier unexpired lease, and if both are expired then +the lower rank will replace the larger rank as winner +of the current leader role. + +### II. regular ping phase + +After a server has finished its Init phase, it +enters its ping phase, where is stays until it +is shut down. + +During ping phase, the server continues to accept and respond +to allcall requests from other servers. Now in addition, +the server also issues its own allcall() pings every +heartBeat seconds. + +### III. Election and Lease determination + +Election and leasing are computed locally, by each +node, one heartBeat after receiving any replies +from the allcall(). The localnode who issued +the allcall sorts the respondents, including +itself, and if all leases have expired, it +determines who is the new leader and marks +their lease as starting from now. Lease +and leader computation is done locally +and independently on each server. +Once in ping phase, this new determination +is broadcast at the next heartbeat when +the local node issues an allcall(). + +If a node receives an allcall with a leader +claim, and that leader claim has a shorter +lease expirtaion time than the existing +leader, the new proposed leader is rejected +in favor of the current leader. This +favors continuity of leadership until +the end of the current leaders term, and +is a product of the sort order +described above where we sort +candidates by lease time then rank. + +## Properties of the allcall + +The allcalls() are heard by all active cluster members, and +contain the sender's computed result of who the current leader is, +and replies answer back with the recipient's own rank and id. Each +recipient of an allcall() replies to all cluster members. +Both the sending and the replying to the allcall are +broadcasts that are published to a well known topic. + +## Safety/Convergence: ALLCALL converges to one leader + +Suppose two nodes are partitioned and so both are leaders on +their own side of the network. Then suppose the network +is joined again, so the two leaders are brought together +by a healing of the network, or by adding a new link +between the networks. The two nodes exchange Ids and lease +times, and the node with the shorter valid lease time +adopts the node with the longer lease as leader, +since that is the sort order. The adoption takes +effect as soon as the loser is informed of the +other leader's lease via its allcall. + +## Liveness: a leader will be chosen + +Given the total order among nodes, exactly one +will be lowest rank and thus be the preferred +leader at the end of the any current leader's +lease, even if the current lease holder +has failed. Hence, with at least one live +node (that lives beyond one lease term), +the system can run for at most one +lease term before electing a leader. + + +## commentary + +ALLCALL does not guarantee that there will +never be more than one leader. Availability +in the face of network partition is +desirable in many cases, and ALLCALL is +appropriate for these. ALLCALL does +not guarantee that a leader will always +be present, but with live nodes it +does provide that the cluster will +have a leader after one lease term + +maxClockSkew has expired. + +By design, ALLCALL functions well +in cluster with only one and two +nodes, or with any number, including +an even number, of nodes. Compared to quorum +based elections like raft and paxos, +where an odd number of at least three +nodes is required, this can be very desirable. +ALLCALL is appropriate for AP, +rather than CP, style systems, where +availability is more important +than having a single writer. When +writes are idempotent or deduplicated +downstream, it is better for uptime +to run an always-on leadership system. + +implementation +------------ + +ALLCALL is implented on top of +the Nats system, see the health/ +subdirectory of + +https://github.com/nats-io/gnatsd + diff --git a/health/agent.go b/health/agent.go new file mode 100644 index 00000000000..899c3eb5eec --- /dev/null +++ b/health/agent.go @@ -0,0 +1,78 @@ +package health + +import ( + "fmt" + "net" + "time" + + "github.com/nats-io/gnatsd/server" + //"github.com/nats-io/gnatsd/server/lcon" +) + +// Agent implements the InternalClient interface. +// It provides health status checks and +// leader election from among the candidate +// gnatsd instances in a cluster. +type Agent struct { + opts *server.Options + mship *Membership +} + +// NewAgent makes a new Agent. +func NewAgent(opts *server.Options) *Agent { + return &Agent{ + opts: opts, + } +} + +// Name should identify the internal client for logging. +func (h *Agent) Name() string { + return "health-agent" +} + +// Start makes an internal +// entirely in-process client that monitors +// cluster health and manages group +// membership functions. +// +func (h *Agent) Start( + info server.Info, + opts server.Options, + logger server.Logger, + +) (net.Conn, error) { + + // To keep the health client fast and its traffic + // internal-only, we use an bi-directional, + // in-memory version of a TCP stream. + // + // The buffers really do have to be of + // sufficient size, or we will + // deadlock/livelock the system. + // + cli, srv, err := NewInternalClientPair() + if err != nil { + return nil, fmt.Errorf("NewInternalClientPair() returned error: %s", err) + } + + rank := opts.HealthRank + beat := opts.HealthBeat + lease := opts.HealthLease + + cfg := &MembershipCfg{ + MaxClockSkew: time.Second, + BeatDur: beat, + LeaseTime: lease, + MyRank: rank, + CliConn: cli, + Log: logger, + } + h.mship = NewMembership(cfg) + go h.mship.Start() + return srv, nil +} + +// Stop halts the background goroutine. +func (h *Agent) Stop() { + h.mship.Stop() +} diff --git a/health/amap.go b/health/amap.go new file mode 100644 index 00000000000..8315dac6ec6 --- /dev/null +++ b/health/amap.go @@ -0,0 +1,49 @@ +package health + +import "sync" + +// atomic map from string to *ServerLoc + +type AtomicServerLocMap struct { + U map[string]*ServerLoc `json:"U"` + tex sync.RWMutex +} + +func NewAtomicServerLocMap() *AtomicServerLocMap { + return &AtomicServerLocMap{ + U: make(map[string]*ServerLoc), + } +} + +func (m *AtomicServerLocMap) Len() int { + m.tex.RLock() + n := len(m.U) + m.tex.RUnlock() + return n +} + +func (m *AtomicServerLocMap) Get(key string) *ServerLoc { + m.tex.RLock() + s := m.U[key] + m.tex.RUnlock() + return s +} + +func (m *AtomicServerLocMap) Get2(key string) (*ServerLoc, bool) { + m.tex.RLock() + v, ok := m.U[key] + m.tex.RUnlock() + return v, ok +} + +func (m *AtomicServerLocMap) Set(key string, val *ServerLoc) { + m.tex.Lock() + m.U[key] = val + m.tex.Unlock() +} + +func (m *AtomicServerLocMap) Del(key string) { + m.tex.Lock() + delete(m.U, key) + m.tex.Unlock() +} diff --git a/health/config.go b/health/config.go new file mode 100644 index 00000000000..8394b218619 --- /dev/null +++ b/health/config.go @@ -0,0 +1,82 @@ +package health + +import ( + "net" + "time" + + "github.com/nats-io/gnatsd/logger" + "github.com/nats-io/gnatsd/server" +) + +const DEAF_TRUE = 1 +const DEAF_FALSE = 0 + +type MembershipCfg struct { + + // max we allow for clocks to be out of sync. + // default to 1 second if not set. + MaxClockSkew time.Duration + + // how often we heartbeat. defaults to 100msec + // if not set. + BeatDur time.Duration + + // NatsUrl example "nats://127.0.0.1:4222" + NatsUrl string + + // defaults to "_nats.cluster.members." + SysMemberPrefix string + + // LeaseTime is the minimum time the + // leader is elected for. Defaults to 10 sec. + LeaseTime time.Duration + + // provide a default until the server gives us rank + MyRank int + + // optional, if provided we will use this connection on + // the client side. + CliConn net.Conn + + // where we log stuff. + Log server.Logger + + // for testing under network partition + deaf int64 + + // how much history to save + historyCount int +} + +func (cfg *MembershipCfg) SetDefaults() { + if cfg.LeaseTime == 0 { + cfg.LeaseTime = time.Second * 10 + } + if cfg.SysMemberPrefix == "" { + cfg.SysMemberPrefix = "_nats.cluster.members." + } + if cfg.BeatDur == 0 { + cfg.BeatDur = 100 * time.Millisecond + } + if cfg.MaxClockSkew == 0 { + cfg.MaxClockSkew = time.Second + } + if cfg.NatsUrl == "" { + cfg.NatsUrl = "nats://127.0.0.1:4222" + } + if cfg.Log == nil { + // stderr + cfg.Log = logger.NewStdLogger(micros, debug, trace, colors, pid) + } +} + +const colors = false +const micros, pid = true, true +const trace = false + +//const debug = true +const debug = false + +func (cfg *MembershipCfg) Dial(network, address string) (net.Conn, error) { + return cfg.CliConn, nil +} diff --git a/health/health.go b/health/health.go new file mode 100644 index 00000000000..56e11b3e2fe --- /dev/null +++ b/health/health.go @@ -0,0 +1,788 @@ +package health + +import ( + "encoding/json" + "fmt" + "log" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/nats-io/go-nats" +) + +// sysMemberPrefix creates a namespace +// for system cluster membership communication. +// This prefix aims to avoid collisions +// with user-level topics. Only system +// processes / internal clients should +// write to these topics, but everyone +// is welcome to listen on them. +// +// note: `_nats` is for now, can easily +// changed to be `_SYS` later once +// we're sure everything is working. +// +const sysMemberPrefix = "_nats.cluster.members." + +// ServerLoc conveys to interested parties +// the Id and location of one gnatsd +// server in the cluster. +type ServerLoc struct { + Id string `json:"serverId"` + Host string `json:"host"` + Port int `json:"port"` + + // Are we the leader? + IsLeader bool `json:"leader"` + + // LeaseExpires is zero for any + // non-leader. For the leader, + // LeaseExpires tells you when + // the leaders lease expires. + LeaseExpires time.Time `json:"leaseExpires"` + + // lower rank is leader until lease + // expires. Ties are broken by Id. + // Rank should be assignable on the + // gnatsd command line with -rank to + // let the operator prioritize + // leadership for certain hosts. + Rank int `json:"rank"` +} + +func (s *ServerLoc) String() string { + by, err := json.Marshal(s) + panicOn(err) + return string(by) +} + +func (s *ServerLoc) fromBytes(by []byte) error { + return json.Unmarshal(by, s) +} + +// Membership tracks the nats server cluster +// membership, issuing health checks and +// choosing a leader. +type Membership struct { + Cfg MembershipCfg + + elec *leadHolder + nc *nats.Conn + myLoc ServerLoc + + subjAllCall string + subjAllReply string + subjMemberLost string + subjMemberAdded string + subjMembership string + + halt *halter + mu sync.Mutex + stopping bool +} + +func (m *Membership) deaf() bool { + v := atomic.LoadInt64(&m.Cfg.deaf) + return v == DEAF_TRUE +} + +func (m *Membership) setDeaf() { + atomic.StoreInt64(&m.Cfg.deaf, DEAF_TRUE) +} + +func (m *Membership) unDeaf() { + atomic.StoreInt64(&m.Cfg.deaf, DEAF_FALSE) +} + +func NewMembership(cfg *MembershipCfg) *Membership { + m := &Membership{ + Cfg: *cfg, + halt: newHalter(), + } + m.elec = m.newLeadHolder(cfg.historyCount) + return m +} + +// leadHolder holds who is the current leader, +// and what their lease is. Used to synchronize +// access between various goroutines. +type leadHolder struct { + mu sync.Mutex + sloc ServerLoc + + myId string + myRank int + myLocHasBeenSet bool + + history *RingBuf + histsz int + + m *Membership +} + +func (m *Membership) newLeadHolder(histsz int) *leadHolder { + if histsz == 0 { + histsz = 100 + } + return &leadHolder{ + history: NewRingBuf(histsz), + histsz: histsz, + m: m, + } +} + +func (e *leadHolder) setMyLoc(myLoc *ServerLoc) { + e.mu.Lock() + if e.myLocHasBeenSet { + panic("no double set!") + } + e.myLocHasBeenSet = true + e.myId = myLoc.Id + e.myRank = myLoc.Rank + e.mu.Unlock() +} + +// getLeader retreives the stored e.sloc value. +func (e *leadHolder) getLeader() ServerLoc { + e.mu.Lock() + defer e.mu.Unlock() + return e.sloc +} + +// setLeader aims to copy sloc and store it +// for future getLeader() calls to access. +// +// However we reject any attempt to replace +// a leader with a one that doesn't rank lower, where rank +// includes the LeaseExpires time +// (see the ServerLocLessThan() function). +// +// If we accept sloc +// we return slocWon true. If we reject sloc then +// we return slocWon false. In short, we will only +// accept sloc if ServerLocLessThan(sloc, e.sloc), +// and we return ServerLocLessThan(sloc, e.sloc). +// +// If we return slocWon false, alt contains the +// value we favored, which is the current value +// of our retained e.sloc. If we return true, +// then alt contains a copy of sloc. We +// return a value in alt to avoid data races. +// +func (e *leadHolder) setLeader(sloc *ServerLoc) (slocWon bool, alt ServerLoc) { + e.mu.Lock() + defer e.mu.Unlock() + + if sloc == nil || sloc.Id == "" { + return false, e.sloc + } + + slocWon = ServerLocLessThan(sloc, &e.sloc, time.Now()) + if !slocWon { + return false, e.sloc + } + + e.sloc = *sloc + histcp := *sloc + e.history.Append(&histcp) + return true, e.sloc +} + +func (e *leadHolder) getLeaderAsBytes() []byte { + lead := e.getLeader() + by, err := json.Marshal(&lead) + panicOn(err) + return by +} + +func (m *Membership) Undeaf() { + +} + +// Stop blocks until the Membership goroutine +// acknowledges the shutdown request. +func (m *Membership) Stop() { + m.mu.Lock() + if m.stopping { + m.mu.Unlock() + return + } + m.stopping = true + m.mu.Unlock() + m.halt.ReqStop.Close() + <-m.halt.Done.Chan +} + +func (m *Membership) Start() error { + + m.Cfg.SetDefaults() + + pc := newPongCollector() + nc, err := m.setupNatsClient(pc) + if err != nil { + m.halt.Done.Close() + return err + } + m.nc = nc + go m.start(nc, pc) + return nil +} + +func (m *Membership) start(nc *nats.Conn, pc *pongCollector) { + + defer func() { + m.halt.Done.Close() + }() + + m.Cfg.Log.Debugf("health-agent: Listening on [%s]\n", m.subjAllCall) + log.SetFlags(log.LstdFlags) + + prevCount, curCount := 0, 0 + var curMember, prevMember *members + var curLead *ServerLoc + + // do an initial allcall() to discover any + // current leader. + m.Cfg.Log.Tracef("health-agent: "+ + "init: doing initial allcall "+ + "to discover any existing leader...") + + err := m.allcall() + if err != nil { + m.Cfg.Log.Debugf("health-agent: "+ + "error back from allcall, "+ + "terminating on: %s", err) + return + } + + select { + case <-time.After(m.Cfg.BeatDur): + case <-m.halt.ReqStop.Chan: + return + } + + prevCount, prevMember = pc.getSetAndClear(m.myLoc) + + now := time.Now() + + firstSeenLead := m.elec.getLeader() + xpire := firstSeenLead.LeaseExpires + + limit := xpire.Add(m.Cfg.MaxClockSkew) + if !xpire.IsZero() && limit.After(now) { + + m.Cfg.Log.Tracef("health-agent: init: "+ + "after one heartbeat, "+ + "we detect current leader '%s'"+ + " of rank %v with lease good "+ + "for %v until expiration + "+ + "maxClockSkew=='%v'", + firstSeenLead.Id, + firstSeenLead.Rank, + limit.Sub(now), + limit, + ) + } else { + m.Cfg.Log.Tracef("health-agent: "+ + "init: after one heartbeat,"+ + " no leader found. waiting "+ + "for a full leader lease "+ + "term of %s to expire...", + m.Cfg.LeaseTime) + + select { + case <-time.After(m.Cfg.LeaseTime): + case <-m.halt.ReqStop.Chan: + return + } + } + + // prev responses should be back by now. + var expired bool + var prevLead *ServerLoc + var nextLeadReportTm time.Time + + for { + // NB: replies to an + // allcall can/will change + // what the current leader + // is in elec. + err = m.allcall() + if err != nil { + // err could be: "write on closed buffer" + // typically means we are shutting down. + + m.Cfg.Log.Tracef("health-agent: "+ + "error on allcall, "+ + "shutting down the "+ + "health-agent: %s", + err) + return + } + + select { + case <-time.After(m.Cfg.BeatDur): + case <-m.halt.ReqStop.Chan: + return + } + lastSeenLead := m.elec.getLeader() + + // cur responses should be back by now + // and we can compare prev and cur. + curCount, curMember = pc.getSetAndClear(m.myLoc) + + now = time.Now() + expired, curLead = curMember.leaderLeaseExpired( + now, + m.Cfg.LeaseTime, + &lastSeenLead, + m.Cfg.MaxClockSkew, + ) + + // tell pong + won, alt := m.elec.setLeader(curLead) + if !won { + curLead = &alt + } + + loc, _ := nc.ServerLocation() + if loc != nil { + loc.Rank = m.Cfg.MyRank + if loc.Id == curLead.Id { + + if now.After(nextLeadReportTm) || + prevLead == nil || + prevLead.Id != curLead.Id { + + left := curLead.LeaseExpires.Sub(now) + m.Cfg.Log.Debugf("health-agent: "+ + "I am LEAD, my Id: '%s', "+ + "rank %v. lease expires "+ + "in %s", + loc.Id, + loc.Rank, + left) + + nextLeadReportTm = now.Add(left).Add(time.Second) + } + } else { + if prevLead != nil && + prevLead.Id == loc.Id { + + m.Cfg.Log.Debugf("health-agent: "+ + "I am no longer lead, "+ + "new LEAD is '%s', rank %v. "+ + "lease expires in %s", + curLead.Id, + curLead.Rank, + curLead.LeaseExpires.Sub(now)) + + } else { + if curLead != nil && + (nextLeadReportTm.IsZero() || now.After(nextLeadReportTm)) { + + left := curLead.LeaseExpires.Sub(now) + if curLead.Id == "" { + m.Cfg.Log.Debugf("health-agent: "+ + "I am '%s'/rank=%v. "+ + "lead is unknown.", + m.myLoc.Id, + m.myLoc.Rank) + + } else { + m.Cfg.Log.Debugf("health-agent: "+ + "I am not lead. lead is '%s', "+ + "rank %v, for %v", + curLead.Id, + curLead.Rank, + left) + + } + nextLeadReportTm = now.Add(left).Add(time.Second) + } + } + } + } + + lost := setDiff(prevMember, curMember, curLead) + gained := setDiff(curMember, prevMember, curLead) + same := setsEqual(prevMember, curMember) + + if same { + // nothing more to do. + // This is the common case when nothing changes. + } else { + lostBytes := lost.mustJsonBytes() + if !lost.setEmpty() { + if !m.deaf() { + nc.Publish(m.subjMemberLost, lostBytes) + // ignore errors on purpose; + // don't crash mid-health-report + // if at all possible. + } + } + gainedBytes := gained.mustJsonBytes() + if !gained.setEmpty() { + if !m.deaf() { + nc.Publish(m.subjMemberAdded, gainedBytes) + // same error approach as above. + } + } + } + if curCount < prevCount { + m.Cfg.Log.Debugf("health-agent: ---- "+ + "PAGE PAGE PAGE!! we went "+ + "down a server, from %v -> %v."+ + "lost: '%s'", + prevCount, + curCount, + lost) + + } else if curCount > prevCount && curCount > 0 { + m.Cfg.Log.Debugf("health-agent: ++++ "+ + "MORE ROBUSTNESS GAINED; "+ + "we went from %v -> %v. "+ + "gained: '%s'", + prevCount, + curCount, + gained) + + } + + if expired { + curBytes := curMember.mustJsonBytes() + if !m.deaf() { + nc.Publish(m.subjMembership, curBytes) + } + } + + // done with compare, now loop + prevCount = curCount + prevMember = curMember + prevLead = curLead + } +} + +func pong(nc *nats.Conn, subj string, msg []byte) { + err := nc.Publish(subj, msg) + panicOn(err) + nc.FlushTimeout(2 * time.Second) + // ignore error on nc.Flush(). + // might be: nats: connection closed on shutdown. +} + +// allcall sends out a health ping on the +// subjAllCall topic. +// +// The ping consists of sending the ServerLoc +// forf the current leader, which provides lease +// and full contact info for the leader. +// +// This gives a round-trip connectivity check. +// +func (m *Membership) allcall() error { + // allcall broadcasts the current leader + lease + leadby := m.elec.getLeaderAsBytes() + return m.nc.PublishRequest(m.subjAllCall, m.subjAllReply, leadby) +} + +// pongCollector collects the responses +// from an allcall request. +type pongCollector struct { + replies int + from *members + mu sync.Mutex +} + +func newPongCollector() *pongCollector { + return &pongCollector{ + from: newMembers(), + } +} + +// acumulate pong responses +func (pc *pongCollector) receivePong(msg *nats.Msg) { + pc.mu.Lock() + + pc.replies++ + + var loc ServerLoc + err := loc.fromBytes(msg.Data) + if err != nil { + pc.from.Amap.Set(loc.Id, &loc) + } + pc.mu.Unlock() +} + +func (pc *pongCollector) clear() { + pc.mu.Lock() + pc.from.clear() + pc.mu.Unlock() +} + +// getSet returns the count and set so far, then +// clears the set, emptying it, and then adding +// back just myLoc +func (pc *pongCollector) getSetAndClear(myLoc ServerLoc) (int, *members) { + + mem := pc.from.clone() + mem.clearLeaderAndLease() + pc.clear() + pc.from.Amap.Set(myLoc.Id, &myLoc) + return mem.Amap.Len(), mem +} + +// leaderLeaseExpired evaluates the lease as of now, +// and returns the leader or best candiate. Returns +// expired == true if any prior leader lease has +// lapsed. In this case we return the best new +// leader with its IsLeader bit set and its +// LeaseExpires set to now + lease. +// +// If expired == false then the we return +// the current leader in lead. +// +// PRE: there are only 0 or 1 leaders in m.Amap +// who have a non-zero LeaseExpires field. +// +// If m.Amap is empty, we return (true, nil). +// +// The method is where the actual "election" +// happens. See the ServerLocLessThan() +// function below for exactly how +// we rank candidates. +// +func (m *members) leaderLeaseExpired( + now time.Time, + leaseLen time.Duration, + prevLead *ServerLoc, + maxClockSkew time.Duration, + +) (expired bool, lead *ServerLoc) { + + if prevLead.LeaseExpires.Add(maxClockSkew).After(now) { + // honor the leases until they expire + return false, prevLead + } + + if m.Amap.Len() == 0 { + return false, prevLead + } + + // INVAR: any lease has expired. + + var sortme []*ServerLoc + m.Amap.tex.Lock() + for _, v := range m.Amap.U { + sortme = append(sortme, v) + } + m.Amap.tex.Unlock() + m.clearLeaderAndLease() + + sort.Sort(&byRankThenId{s: sortme, now: now}) + lead = sortme[0] + lead.IsLeader = true + lead.LeaseExpires = now.Add(leaseLen).UTC() + + return true, lead +} + +func (m *members) clearLeaderAndLease() { + m.Amap.tex.Lock() + for _, v := range m.Amap.U { + v.IsLeader = false + v.LeaseExpires = time.Time{} + } + m.Amap.tex.Unlock() +} + +type byRankThenId struct { + s []*ServerLoc + now time.Time +} + +func (p byRankThenId) Len() int { return len(p.s) } +func (p byRankThenId) Swap(i, j int) { p.s[i], p.s[j] = p.s[j], p.s[i] } + +// Less must be stable and computable locally yet +// applicable globally: it is how we choose a leader +// in a stable fashion. +func (p byRankThenId) Less(i, j int) bool { + return ServerLocLessThan(p.s[i], p.s[j], p.now) +} + +// ServerLocLessThan returns true iff i < j, in terms of rank. +// Lower rank is more electable. We order first by LeaseExpires, +// then by Rank, Id, Host, and Port; in that order. The +// longer leaseExpires wins (is less than). +func ServerLocLessThan(i, j *ServerLoc, now time.Time) bool { + nowu := now.UnixNano() + itm := i.LeaseExpires.UnixNano() + jtm := j.LeaseExpires.UnixNano() + + // if both are expired, then its a tie. + if jtm <= nowu { + jtm = 0 + } + if itm <= nowu { + itm = 0 + } + if itm != jtm { + return itm > jtm // we want an actual time to sort before a zero-time. + } + if i.Rank != j.Rank { + return i.Rank < j.Rank + } + if i.Id != j.Id { + return i.Id < j.Id + } + if i.Host != j.Host { + return i.Host < j.Host + } + return i.Port < j.Port +} + +func (m *Membership) setupNatsClient(pc *pongCollector) (*nats.Conn, error) { + discon := func(nc *nats.Conn) { + m.Cfg.Log.Tracef("health-agent: Disconnected from nats!") + } + optdis := nats.DisconnectHandler(discon) + norand := nats.DontRandomize() + + recon := func(nc *nats.Conn) { + loc, err := nc.ServerLocation() + panicOn(err) + m.Cfg.Log.Tracef("health-agent: Reconnect to nats!: loc = '%s'", loc) + } + optrecon := nats.ReconnectHandler(recon) + + opts := []nats.Option{optdis, optrecon, norand} + if m.Cfg.CliConn != nil { + opts = append(opts, nats.Dialer(&m.Cfg)) + } + + nc, err := nats.Connect(m.Cfg.NatsUrl, opts...) + if err != nil { + log.Fatalf("Can't connect: %v\n", err) + } + + loc, err := nc.ServerLocation() + if err != nil { + return nil, err + } + m.setLoc(loc) + m.Cfg.Log.Debugf("health-agent: HELLOWORLD: I am '%s' at '%v:%v'. rank %v", m.myLoc.Id, m.myLoc.Host, m.myLoc.Port, m.myLoc.Rank) + + m.subjAllCall = sysMemberPrefix + "allcall" + m.subjAllReply = sysMemberPrefix + "allreply" + m.subjMemberLost = sysMemberPrefix + "lost" + m.subjMemberAdded = sysMemberPrefix + "added" + m.subjMembership = sysMemberPrefix + "list" + + nc.Subscribe(m.subjAllReply, func(msg *nats.Msg) { + if m.deaf() { + return + } + pc.receivePong(msg) + }) + + nc.Subscribe(m.subjAllCall, func(msg *nats.Msg) { + if m.deaf() { + return + } + loc, err := nc.ServerLocation() + if err != nil { + return // try again next time. + } + + // did we accidentally change server locacations? + // we don't want to do that! + if m.locDifferent(loc) { + panic(fmt.Sprintf("changed locations! first: '%s', now:'%s'", m.myLoc, loc)) + } + + // allcall broadcasts the leader + var lead ServerLoc + err = lead.fromBytes(msg.Data) + panicOn(err) + + if lead.Id != "" && !lead.LeaseExpires.IsZero() { + won, alt := m.elec.setLeader(&lead) + if !won { + // if we rejected, get our preferred leader. + lead = alt + } + + if loc.Id == lead.Id { + loc.IsLeader = true + loc.LeaseExpires = lead.LeaseExpires + } else { + loc.IsLeader = false + loc.LeaseExpires = time.Time{} + } + } + + hp, err := json.Marshal(loc) + panicOn(err) + if !m.deaf() { + pong(nc, msg.Reply, hp) + } + }) + + /* reporting */ + nc.Subscribe(m.subjMemberLost, func(msg *nats.Msg) { + if m.deaf() { + return + } + m.Cfg.Log.Tracef("health-agent: Received on [%s]: '%s'", + msg.Subject, string(msg.Data)) + }) + + nc.Subscribe(m.subjMemberAdded, func(msg *nats.Msg) { + if m.deaf() { + return + } + m.Cfg.Log.Tracef("health-agent: Received on [%s]: '%s'", + msg.Subject, string(msg.Data)) + }) + + nc.Subscribe(m.subjMembership, func(msg *nats.Msg) { + if m.deaf() { + return + } + m.Cfg.Log.Tracef("health-agent: Received on [%s]: '%s'", + msg.Subject, string(msg.Data)) + }) + + return nc, nil +} + +func (m *Membership) locDifferent(b *nats.ServerLoc) bool { + m.mu.Lock() + defer m.mu.Unlock() + if b.Id != m.myLoc.Id { + return true + } + if b.Rank != m.myLoc.Rank { + return true + } + if b.Host != m.myLoc.Host { + return true + } + if b.Port != m.myLoc.Port { + return true + } + return false +} + +func (m *Membership) setLoc(b *nats.ServerLoc) { + m.mu.Lock() + m.myLoc.Id = b.Id + m.myLoc.Rank = b.Rank + m.myLoc.Host = b.Host + m.myLoc.Port = b.Port + m.mu.Unlock() + m.elec.setMyLoc(&m.myLoc) +} diff --git a/health/health_test.go b/health/health_test.go new file mode 100644 index 00000000000..36ac823b03f --- /dev/null +++ b/health/health_test.go @@ -0,0 +1,195 @@ +package health + +import ( + "fmt" + "math/rand" + "testing" + "time" + + cv "github.com/glycerine/goconvey/convey" + "github.com/nats-io/gnatsd/logger" + "github.com/nats-io/gnatsd/server" + gnatsd "github.com/nats-io/gnatsd/test" + "github.com/nats-io/go-nats" +) + +const TEST_PORT = 8392 +const DefaultTimeout = 2 * time.Second + +var cliOpts = nats.Options{ + Url: fmt.Sprintf("nats://localhost:%d", TEST_PORT), + AllowReconnect: true, + MaxReconnect: 10, + ReconnectWait: 10 * time.Millisecond, + Timeout: DefaultTimeout, +} + +// DefaultTestOptions are default options for the unit tests. +var serverOpts = server.Options{ + Host: "localhost", + Port: TEST_PORT, + NoLog: true, + NoSigs: true, + MaxControlLine: 256, +} + +func Test101StressTestManyClients(t *testing.T) { + + cv.Convey("when stress testing with 50 clients coming up and shutting down, we should survive and prosper", t, func() { + + s := RunServerOnPort(TEST_PORT) + defer s.Shutdown() + + n := 50 + var ms []*Membership + for i := 0; i < n; i++ { + cli, srv, err := NewInternalClientPair() + panicOn(err) + + s.InternalCliRegisterCallback(srv) + cfg := &MembershipCfg{ + CliConn: cli, + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 30 * time.Millisecond, + BeatDur: 10 * time.Millisecond, + NatsUrl: fmt.Sprintf("nats://localhost:%v", TEST_PORT), + MyRank: i, // ranks 0..n-1 + } + + m := NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + ms = append(ms, m) + defer m.Stop() + } + // the test here is basically that we didn't crash + // or hang. So if we got here, success. + cv.So(true, cv.ShouldBeTrue) + }) +} + +func Test102ConvergenceToOneLowRankLeaderAndLiveness(t *testing.T) { + + cv.Convey("Given a cluster of one server with rank 0, no matter what other servers arrive thinking they are the leader (say, after a partition is healed), as long as those other nodes have rank 1, our rank 0 process will persist in leading and all other arrivals will give up their leadership claims (after their leases expire). In addition to safety, this is also a liveness check: After a single lease term + clockskew, a leader will have been chosen.", t, func() { + + const maxPayload = 1024 * 1024 + s := RunServerOnPort(TEST_PORT) + defer func() { + p("starting gnatsd shutdown...") + s.Shutdown() + }() + + n := 50 + tot := 50 + pause := make([]int, n) + for i := 0; i < n; i++ { + pause[i] = 20 + rand.Intn(50) + tot += pause[i] + } + + var ms []*Membership + for i := 0; i < n; i++ { + + cfg := &MembershipCfg{ + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 150 * time.Millisecond, + BeatDur: 50 * time.Millisecond, + NatsUrl: fmt.Sprintf("nats://localhost:%v", TEST_PORT), + MyRank: i, //min(1, i), // ranks 0,1,1,1,1,1,... + deaf: DEAF_TRUE, // don't ping or pong + historyCount: 10000, + } + + cli, srv, err := NewInternalClientPair() + panicOn(err) + + s.InternalCliRegisterCallback(srv) + cfg.CliConn = cli + + if i == 0 { + cfg.deaf = DEAF_FALSE + aLogger := logger.NewStdLogger(micros, true, trace, colors, pid) + _ = aLogger + // to follow the prints, uncomment: + //cfg.Log = aLogger + } + + m := NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + ms = append(ms, m) + defer m.Stop() + } + + // let them all get past init phase. + time.Sleep(2 * (ms[0].Cfg.LeaseTime + ms[0].Cfg.MaxClockSkew)) + + // verify liveness, a leader exists. + p("verifying everyone thinks there is a leader:") + for i := 0; i < n; i++ { + //fmt.Printf("verifying %v thinks there is a leader\n", i) + cv.So(ms[i].elec.history.Avail(), cv.ShouldBeGreaterThan, 0) + } + + // bring in jobs after their random pause time + for i := 0; i < n; i++ { + dur := time.Duration(pause[i]) * time.Millisecond + //p("%v on i = %v/dur=%v ", time.Now().UTC(), i, dur) + time.Sleep(dur) + ms[i].unDeaf() + } + + // check that the history from rank 0 + // always shows rank 0 as lead. + h := ms[0].elec.history + av := h.Avail() + //p("ms[0].myLoc.Id = %v", ms[0].myLoc.Id) + cv.So(ms[0].myLoc.Id, cv.ShouldNotEqual, "") + cv.So(av, cv.ShouldBeGreaterThan, 10) + p("av: available history len = %v", av) + + // prints first: + /* + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*ServerLoc) + fmt.Printf("history print i = %v. sloc.Id=%v / sloc.Rank=%v\n", i, sloc.Id, sloc.Rank) + } + */ + // checks second: + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*ServerLoc) + //fmt.Printf("history check Id at i = %v. sloc.Id=%v\n", i, sloc.Id) + cv.So(sloc.Id, cv.ShouldEqual, ms[0].myLoc.Id) + } + + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*ServerLoc) + //p("history check Rank at i = %v. sloc.Rank=%v", i, sloc.Rank) + cv.So(sloc.Rank, cv.ShouldEqual, 0) + } + }) +} + +func RunServerOnPort(port int) *server.Server { + opts := serverOpts + opts.Port = port + return gnatsd.RunServer(&opts) +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func max(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/health/healthcmd/main.go b/health/healthcmd/main.go new file mode 100644 index 00000000000..4cec1950cb2 --- /dev/null +++ b/health/healthcmd/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "strconv" + "time" + + "github.com/nats-io/gnatsd/health" +) + +// healthcmd runs an allcall election from a standalone +// command line nats client. It exercises the same +// gnatsd/health library code that runs as an internal client +// in process with gnatsd. + +func usage() { + log.Fatalf("use: healthcmd {host}:port {rank}\n") +} + +func main() { + + log.SetFlags(0) + flag.Usage = usage + flag.Parse() + + args := flag.Args() + if len(args) < 1 { + usage() + } + + rank := 0 + var err error + if len(args) >= 2 { + rank, err = strconv.Atoi(args[1]) + if err != nil { + fmt.Fprintf(os.Stderr, "2nd arg should be our numeric rank") + } + } + + cfg := &health.MembershipCfg{ + MaxClockSkew: time.Second, + BeatDur: 100 * time.Millisecond, + NatsUrl: "nats://" + args[0], // "nats://127.0.0.1:4222" + MyRank: rank, + } + m := health.NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + + select {} +} diff --git a/health/icc.go b/health/icc.go new file mode 100644 index 00000000000..60bc5b4a63f --- /dev/null +++ b/health/icc.go @@ -0,0 +1,64 @@ +package health + +import ( + "net" +) + +// Icc allows the server to +// detect a net.Conn as +// an internal client connection +// by checking if it implements the +// LocalInternalClient interface. +// +type Icc struct { + *net.TCPConn +} + +// IsInternal satisfy LocalInternalClient interface +func (c *Icc) IsInternal() {} + +// NewInternalClientPair() constructs a client/server +// pair that wrap tcp endpoints in Icc to let +// the server recognized them as internal. +// +func NewInternalClientPair() (cli, srv *Icc, err error) { + + lsn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return + } + + srvDone := make(chan struct{}) + go func() { + s, err2 := lsn.Accept() + if err2 == nil { + srv = &Icc{TCPConn: s.(*net.TCPConn)} + } else { + err = err2 + } + lsn.Close() + close(srvDone) + }() + + addr := lsn.Addr() + c, err3 := net.Dial(addr.Network(), addr.String()) + <-srvDone + + if err3 != nil { + err = err3 + if srv != nil { + srv.Close() + srv = nil + } + return + } + cli = &Icc{TCPConn: c.(*net.TCPConn)} + // INVAR: cli ok. + if err != nil { + cli.Close() + cli = nil + return + } + // INVAR: srv ok. + return +} diff --git a/health/icc_test.go b/health/icc_test.go new file mode 100644 index 00000000000..4ffae665c47 --- /dev/null +++ b/health/icc_test.go @@ -0,0 +1,48 @@ +package health + +import ( + "net" + "testing" + + "github.com/nats-io/gnatsd/server" +) + +func TestIccTypeSwitchWorks(t *testing.T) { + var nc net.Conn = &Icc{} + _, isIcc := nc.(server.LocalInternalClient) + if !isIcc { + t.Fatalf("nc was not LocalInternalClient, as it should be!") + } +} + +func TestIccAsNetConn(t *testing.T) { + + // write to a, read from b + a, b, err := NewInternalClientPair() + if err != nil { + panic(err) + } + + msg := "hello-world" + + n, err := a.Write([]byte(msg)) + if err != nil { + t.Errorf("err = %v", err) + } + if n != len(msg) { + t.Errorf("Write truncated at %v < %v", n, len(msg)) + } + + readbuf := make([]byte, len(msg)) + m, err := b.Read(readbuf) + if err != nil { + t.Errorf("err = %v", err) + } + if m != n { + t.Errorf("Read truncated at %v !=n %v", m, n) + } + back := string(readbuf[:m]) + if back != msg { + t.Errorf("msg corrupted, wrote '%v', read '%v'", msg, back) + } +} diff --git a/health/idem.go b/health/idem.go new file mode 100644 index 00000000000..107399d3215 --- /dev/null +++ b/health/idem.go @@ -0,0 +1,106 @@ +package health + +import ( + "fmt" + "sync" +) + +// Copyright (c) 2017 Jason E. Aten, Ph.D. +// https://github.com/glycerine/idem +// MIT license. + +// idemCloseChan can have Close() called on it +// multiple times, and it will only close +// Chan once. +type idemCloseChan struct { + Chan chan bool + closed bool + mut sync.Mutex +} + +// Reinit re-allocates the Chan, assinging +// a new channel and reseting the state +// as if brand new. +func (c *idemCloseChan) Reinit() { + c.mut.Lock() + defer c.mut.Unlock() + c.Chan = make(chan bool) + c.closed = false +} + +// NewIdemCloseChan makes a new idemCloseChan. +func NewIdemCloseChan() *idemCloseChan { + return &idemCloseChan{ + Chan: make(chan bool), + } +} + +var ErrAlreadyClosed = fmt.Errorf("Chan already closed") + +// Close returns ErrAlreadyClosed if it has been +// called before. It never closes IdemClose.Chan more +// than once, so it is safe to ignore the returned +// error value. Close() is safe for concurrent access by multiple +// goroutines. Close returns nil after the first time +// it is called. +func (c *idemCloseChan) Close() error { + c.mut.Lock() + defer c.mut.Unlock() + if !c.closed { + close(c.Chan) + c.closed = true + return nil + } + return ErrAlreadyClosed +} + +// IsClosed tells you if Chan is already closed or not. +func (c *idemCloseChan) IsClosed() bool { + c.mut.Lock() + defer c.mut.Unlock() + return c.closed +} + +// halter helps shutdown a goroutine +type halter struct { + // The owning goutine should call Done.Close() as its last + // actual once it has received the ReqStop() signal. + Done idemCloseChan + + // Other goroutines call ReqStop.Close() in order + // to request that the owning goroutine stop immediately. + // The owning goroutine should select on ReqStop.Chan + // in order to recognize shutdown requests. + ReqStop idemCloseChan +} + +func newHalter() *halter { + return &halter{ + Done: *NewIdemCloseChan(), + ReqStop: *NewIdemCloseChan(), + } +} + +// RequestStop closes the h.ReqStop channel +// if it has not already done so. Safe for +// multiple goroutine access. +func (h *halter) RequestStop() { + h.ReqStop.Close() +} + +// MarkDone closes the h.Done channel +// if it has not already done so. Safe for +// multiple goroutine access. +func (h *halter) MarkDone() { + h.Done.Close() +} + +// IsStopRequested returns true iff h.ReqStop has been Closed(). +func (h *halter) IsStopRequested() bool { + return h.ReqStop.IsClosed() +} + +// IsDone returns true iff h.Done has been Closed(). +func (h *halter) IsDone() bool { + return h.Done.IsClosed() +} diff --git a/health/rbuf.go b/health/rbuf.go new file mode 100644 index 00000000000..1f8aaa13798 --- /dev/null +++ b/health/rbuf.go @@ -0,0 +1,128 @@ +package health + +// https://github.com/glycerine/rbuf +// copyright (c) 2014, Jason E. Aten +// license: MIT + +import "io" + +// RingBuf: +// +// a fixed-size circular ring buffer. Just what it says. +// +type RingBuf struct { + A []interface{} + N int // MaxViewInBytes, the size of A + Beg int // start of data in A + Readable int // number of bytes available to read in A +} + +// NewRingBuf constructs a new RingBuf. +func NewRingBuf(maxViewInBytes int) *RingBuf { + n := maxViewInBytes + r := &RingBuf{ + N: n, + Beg: 0, + Readable: 0, + } + r.A = make([]interface{}, n, n) + + return r +} + +// Reset quickly forgets any data stored in the ring buffer. The +// data is still there, but the ring buffer will ignore it and +// overwrite those buffers as new data comes in. +func (b *RingBuf) Reset() { + b.Beg = 0 + b.Readable = 0 +} + +// Advance(): non-standard, but better than Next(), +// because we don't have to unwrap our buffer and pay the cpu time +// for the copy that unwrapping may need. +// Useful in conjuction/after ReadWithoutAdvance() above. +func (b *RingBuf) Advance(n int) { + if n <= 0 { + return + } + if n > b.Readable { + n = b.Readable + } + b.Readable -= n + b.Beg = (b.Beg + n) % b.N +} + +func intMin(a, b int) int { + if a < b { + return a + } else { + return b + } +} + +func (f *RingBuf) Avail() int { + return f.Readable +} + +// returns the earliest index, or -1 if +// the ring is empty +func (f *RingBuf) First() int { + if f.Readable == 0 { + return -1 + } + return f.Beg +} + +// returns the index of the last element, +// or -1 if the ring is empty. +func (f *RingBuf) Last() int { + if f.Readable == 0 { + return -1 + } + + last := f.Beg + f.Readable - 1 + if last < f.N { + // we fit without wrapping + return last + } + + return last % f.N +} + +// Kth presents the contents of the +// ring as a strictly linear sequence, +// so the user doesn't need to think +// about modular arithmetic. Here k indexes from +// [0, f.Readable-1], assuming f.Avail() +// is greater than 0. Kth() returns an +// actual index where the logical k-th +// element, starting from f.Beg, resides. +// f.Beg itself lives at k = 0. If k is +// out of bounds, or the ring is empty, +// -1 is returned. +func (f *RingBuf) Kth(k int) int { + if f.Readable == 0 || k < 0 || k >= f.Readable { + return -1 + } + return (f.Beg + k) % f.N +} + +// +// Append returns an error if there is no more +// space in the ring. Otherwise it returns nil +// and writes p into the ring in last position. +// +func (b *RingBuf) Append(p interface{}) error { + writeCapacity := b.N - b.Readable + if writeCapacity <= 0 { + // we are all full up already. + return io.ErrShortWrite + } + + writeStart := (b.Beg + b.Readable) % b.N + b.A[writeStart] = p + + b.Readable += 1 + return nil +} diff --git a/health/setutil.go b/health/setutil.go new file mode 100644 index 00000000000..8e72401a9bb --- /dev/null +++ b/health/setutil.go @@ -0,0 +1,112 @@ +package health + +import ( + "encoding/json" + "fmt" +) + +// utilities and sets stuff + +// p is a shortcut for a call to fmt.Printf that implicitly starts +// and ends its message with a newline. +func p(format string, stuff ...interface{}) { + fmt.Printf("\n "+format+"\n", stuff...) +} + +func panicOn(err error) { + if err != nil { + panic(err) + } +} + +// return a minus b, where a and b are sets. +func setDiff(a, b *members, curLead *ServerLoc) *members { + + res := newMembers() + a.Amap.tex.Lock() + for k, v := range a.Amap.U { + + if curLead != nil { + // annotate leader as we go... + if v.Id == curLead.Id { + v.IsLeader = true + v.LeaseExpires = curLead.LeaseExpires + } + } + + if _, found := b.Amap.U[k]; !found { // data race + res.Amap.U[k] = v + } + } + a.Amap.tex.Unlock() + return res +} + +func setsEqual(a, b *members) bool { + a.Amap.tex.Lock() + b.Amap.tex.Lock() + defer b.Amap.tex.Unlock() + defer a.Amap.tex.Unlock() + + alen := len(a.Amap.U) + if alen != len(b.Amap.U) { + return false + } + // INVAR: len(a) == len(b) + if alen == 0 { + return true + } + for k := range a.Amap.U { + if _, found := b.Amap.U[k]; !found { + return false + } + } + // INVAR: all of a was found in b, and they + // are the same size + return true +} + +type members struct { + GroupName string `json:"GroupName"` + Amap *AtomicServerLocMap `json:"Mem"` +} + +func (m *members) clear() { + m.Amap = NewAtomicServerLocMap() +} + +func (m *members) clone() *members { + cp := newMembers() + cp.GroupName = m.GroupName + if m.Amap == nil { + return cp + } + m.Amap.tex.Lock() + cp.Amap.tex.Lock() + for k, v := range m.Amap.U { + cp.Amap.U[k] = v + } + cp.Amap.tex.Unlock() + m.Amap.tex.Unlock() + return cp +} + +func (m *members) setEmpty() bool { + return m.Amap.Len() == 0 +} + +func (m *members) String() string { + return string(m.mustJsonBytes()) +} + +func newMembers() *members { + return &members{ + Amap: NewAtomicServerLocMap(), + } +} + +func (m members) mustJsonBytes() []byte { + by, err := json.Marshal(m) + panicOn(err) + return by +} diff --git a/main.go b/main.go index 425372390f2..8ec3a63eb13 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,10 @@ import ( "net" "net/url" "os" + "time" "github.com/nats-io/gnatsd/auth" + "github.com/nats-io/gnatsd/health" "github.com/nats-io/gnatsd/logger" "github.com/nats-io/gnatsd/server" ) @@ -52,6 +54,11 @@ Cluster Options: --no_advertise Advertise known cluster IPs to clients --connect_retries For implicit routes, number of connect retries +Cluster Health Monitor: + --health Run the health monitoring/leader election agent + --lease Duration of leader leases. default: 12s + --beat Time between heartbeats (want 3-4/lease). default: 3s + --rank Smaller rank gives priority in leader election Common Options: -h, --help Show this message @@ -118,6 +125,10 @@ func main() { flag.StringVar(&opts.TLSCert, "tlscert", "", "Server certificate file.") flag.StringVar(&opts.TLSKey, "tlskey", "", "Private key for server certificate.") flag.StringVar(&opts.TLSCaCert, "tlscacert", "", "Client certificate CA for verification.") + flag.BoolVar(&opts.HealthAgent, "health", false, "Run the health agent, elect a leader.") + flag.IntVar(&opts.HealthRank, "rank", 7, "leader election priority: the smaller the rank, the more preferred the server is as a leader. Negative ranks are allowed. Ties are broken by the random ServerId.") + flag.DurationVar(&opts.HealthLease, "lease", time.Second*12, "leader lease duration (should allow 3-4 beats within a lease)") + flag.DurationVar(&opts.HealthBeat, "beat", time.Second*3, "heart beat every this often (should get 3-4 beats within a lease)") flag.Usage = func() { fmt.Printf("%s\n", usageStr) @@ -175,6 +186,10 @@ func main() { server.PrintAndDie(err.Error()) } + if opts.HealthAgent { + opts.InternalCli = append(opts.InternalCli, health.NewAgent(&opts)) + } + // Create the server with appropriate options. s := server.New(&opts) diff --git a/server/client.go b/server/client.go index 5a64a79c357..c5f6f2f5166 100644 --- a/server/client.go +++ b/server/client.go @@ -20,6 +20,8 @@ const ( CLIENT = iota // ROUTER is another router in the cluster. ROUTER + // HEALTH is the internal client that monitors cluster membership. + HEALTH ) const ( @@ -216,6 +218,8 @@ func (c *client) initClient() { c.ncs = fmt.Sprintf("%s - cid:%d", conn, c.cid) case ROUTER: c.ncs = fmt.Sprintf("%s - rid:%d", conn, c.cid) + case HEALTH: + c.ncs = fmt.Sprintf("internal:0 - hid:%d", c.cid) } } @@ -407,6 +411,8 @@ func (c *client) processErr(errStr string) { c.Errorf("Client Error %s", errStr) case ROUTER: c.Errorf("Route Error %s", errStr) + case HEALTH: + c.Errorf("Health Error %s", errStr) } c.closeConnection() } diff --git a/server/icli.go b/server/icli.go new file mode 100644 index 00000000000..3b5374b5432 --- /dev/null +++ b/server/icli.go @@ -0,0 +1,89 @@ +package server + +import ( + "net" + "sync" +) + +// LocalInternalClient is a trait interface. +// The net.Conn implementations of +// internal clients provided over +// the accept() callback (see Start below) +// should implement it to tell the server to ignore +// TLS and auth for internal clients. +// +type LocalInternalClient interface { + IsInternal() +} + +// iCli tracks the internal +// clients. +// +type iCli struct { + configured []InternalClient + mu sync.Mutex +} + +// InternalClient provides +// a plugin-like interface, +// supporting internal clients that live +// in-process with the Server +// on their own goroutines. +// +// An example of an internal client +// is the health monitoring client. +// In order to be effective, its lifetime +// must exactly match that of the +// server it monitors. +// +type InternalClient interface { + + // Name should return a readable + // human name for the InternalClient; + // it will be invoked as a part of + // startup/shutdown/error logging. + // + Name() string + + // Start should run the client on + // a background goroutine. + // + // The Server s will invoke Start() + // as a part of its own init and setup. + // + // The info and opts pointers will be + // viewable from an already locked Server + // instance, and so can be read without + // worrying about data races. + // + // Any returned error will be logged. + // This will not prevent the Server + // from calling Stop() on termination, + // and Stop() must be expected (and + // not block) no matter what. + // + // By returning an net.Conn the client + // provides the server with the + // equivalent of a Listen/Accept created + // net.Conn for communication with + // the client. + // + // The iclient should log using logger. + // + Start(info Info, + opts Options, + logger Logger) (net.Conn, error) + + // Stop should shutdown the goroutine(s) + // of the internal client. + // The Server will invoke Stop() as a part + // of its own shutdown process, *even* if + // Start() failed to start the background + // goroutine. Authors should take care + // to allow Stop() to be called even + // on a failed start. + // + // Stop is expected not to block for long. + // + Stop() +} diff --git a/server/monitor.go b/server/monitor.go index 6cce144b7f3..dc473cc2047 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -473,10 +473,11 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { v.TotalConnections = s.totalClients v.Routes = len(s.routes) v.Remotes = len(s.remotes) - v.InMsgs = s.inMsgs - v.InBytes = s.inBytes - v.OutMsgs = s.outMsgs - v.OutBytes = s.outBytes + // atomic loads avoid data races with client.go:298,951 + v.InMsgs = atomic.LoadInt64(&s.inMsgs) + v.InBytes = atomic.LoadInt64(&s.inBytes) + v.OutMsgs = atomic.LoadInt64(&s.outMsgs) + v.OutBytes = atomic.LoadInt64(&s.outBytes) v.SlowConsumers = s.slowConsumers v.Subscriptions = s.sl.Count() s.httpReqStats[VarzPath]++ diff --git a/server/opts.go b/server/opts.go index 8b97fa451a6..46dc7d13c55 100644 --- a/server/opts.go +++ b/server/opts.go @@ -71,10 +71,10 @@ type Options struct { ProfPort int `json:"-"` PidFile string `json:"-"` LogFile string `json:"-"` - Syslog bool `json:"-"` RemoteSyslog string `json:"-"` Routes []*url.URL `json:"-"` RoutesStr string `json:"-"` + Syslog bool `json:"-"` TLSTimeout float64 `json:"tls_timeout"` TLS bool `json:"-"` TLSVerify bool `json:"-"` @@ -83,6 +83,12 @@ type Options struct { TLSCaCert string `json:"-"` TLSConfig *tls.Config `json:"-"` WriteDeadline time.Duration `json:"-"` + + InternalCli []InternalClient `json:"-"` + HealthAgent bool `json:"health_agent"` + HealthRank int `json:"health_rank"` + HealthLease time.Duration `json:"health_lease"` + HealthBeat time.Duration `json:"health_beat"` } // Configuration file authorization section. @@ -225,6 +231,14 @@ func ProcessConfigFile(configFile string) (*Options, error) { opts.PingInterval = time.Duration(int(v.(int64))) * time.Second case "ping_max": opts.MaxPingsOut = int(v.(int64)) + case "health_rank": + opts.HealthRank = int(v.(int64)) + case "health_lease": + opts.HealthLease = v.(time.Duration) + case "health_beat": + opts.HealthBeat = v.(time.Duration) + case "health_agent": + opts.HealthAgent = v.(bool) case "tls": tlsm := v.(map[string]interface{}) tc, err := parseTLS(tlsm) diff --git a/server/server.go b/server/server.go index 7d616d2b0dd..028d2c4dcc9 100644 --- a/server/server.go +++ b/server/server.go @@ -39,6 +39,7 @@ type Info struct { MaxPayload int `json:"max_payload"` IP string `json:"ip,omitempty"` ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to. + ServerRank int `json:"server_rank"` // lowest rank wins leader election. // Used internally for quick look-ups. clientConnectURLs map[string]struct{} @@ -77,6 +78,7 @@ type Server struct { grRunning bool grWG sync.WaitGroup // to wait on various go routines cproto int64 // number of clients supporting async INFO + icli iCli // in-process internal clients } // Make sure all are 64bits for atomic use @@ -118,6 +120,7 @@ func New(opts *Options) *Server { trace: opts.Trace, done: make(chan bool, 1), start: time.Now(), + icli: iCli{configured: opts.InternalCli}, } s.mu.Lock() @@ -267,10 +270,53 @@ func (s *Server) Start() { s.StartProfiler() } + // Run the internal clients in + // s.icli.configured. + // + // Retain only those started + // successfully in s.icli.running. + // + s.icli.mu.Lock() + go func(info Info, opts Options) { + defer s.icli.mu.Unlock() + + // wait for server to be accepting clients + select { + case <-s.rcQuit: + return + case <-clientListenReady: + } + + n := len(s.icli.configured) + if n == 0 { + return + } + Debugf("Starting the %v internal client(s).", n) + for _, ic := range s.icli.configured { + srv, err := ic.Start(info, opts, log.logger) + if err == nil { + if srv != nil { + s.InternalCliRegisterCallback(srv) + } + Noticef("InternalClient ['%s'] started.", ic.Name()) + } else { + Errorf("InternalClient ['%s'] failed to Start(): %s", ic.Name(), err) + } + } + }(s.info, *s.opts) + // Wait for clients. s.AcceptLoop(clientListenReady) } +// InternalCliRegisterCallback is public only for testing. +func (s *Server) InternalCliRegisterCallback(srv net.Conn) { + s.startGoRoutine(func() { + s.createClient(srv) + s.grWG.Done() + }) +} + // Shutdown will shutdown the server instance by kicking out the AcceptLoop // and closing all associated clients. func (s *Server) Shutdown() { @@ -287,6 +333,13 @@ func (s *Server) Shutdown() { s.grRunning = false s.grMu.Unlock() + // stop any internal clients + s.icli.mu.Lock() + for _, ic := range s.icli.configured { + ic.Stop() + } + s.icli.mu.Unlock() + conns := make(map[uint64]*client) // Copy off the clients @@ -531,6 +584,11 @@ func (s *Server) startMonitoring(secure bool) { func (s *Server) createClient(conn net.Conn) *client { c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: s.info.MaxPayload, start: time.Now()} + _, isInternal := conn.(LocalInternalClient) + if isInternal { + c.typ = HEALTH + } + // Grab JSON info string s.mu.Lock() info := s.infoJSON @@ -548,7 +606,7 @@ func (s *Server) createClient(conn net.Conn) *client { c.Debugf("Client connection created") // Check for Auth - if authRequired { + if !isInternal && authRequired { c.setAuthTimer(secondsToDuration(s.opts.AuthTimeout)) } @@ -582,7 +640,7 @@ func (s *Server) createClient(conn net.Conn) *client { c.mu.Lock() // Check for TLS - if tlsRequired { + if !isInternal && tlsRequired { c.Debugf("Starting TLS client connection handshake") c.nc = tls.Server(c.nc, s.opts.TLSConfig) conn := c.nc.(*tls.Conn) @@ -613,7 +671,7 @@ func (s *Server) createClient(conn net.Conn) *client { return c } - if tlsRequired { + if !isInternal && tlsRequired { // Rewrap bw c.bw = bufio.NewWriterSize(c.nc, startBufSize) } @@ -621,12 +679,14 @@ func (s *Server) createClient(conn net.Conn) *client { // Do final client initialization // Set the Ping timer - c.setPingTimer() + if !isInternal { + c.setPingTimer() + } // Spin up the read loop. s.startGoRoutine(func() { c.readLoop() }) - if tlsRequired { + if !isInternal && tlsRequired { c.Debugf("TLS handshake complete") cs := c.nc.(*tls.Conn).ConnectionState() c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite)) @@ -755,6 +815,8 @@ func (s *Server) checkAuth(c *client) bool { return s.checkClientAuth(c) case ROUTER: return s.checkRouterAuth(c) + case HEALTH: + return true default: return false }