diff --git a/health/ALGORITHM.md b/health/ALGORITHM.md new file mode 100644 index 0000000000..f19259aba0 --- /dev/null +++ b/health/ALGORITHM.md @@ -0,0 +1,177 @@ +# The ALLCALL leased-leader election algorithm. + +Jason E. Aten + +February 2017 + +definitions, with example values. +----------- + +Let heartBeat = 1 sec. This is how frequently +we will assess cluster health by +sending out an allcall ping. + +Let leaseTime = 10 sec. This is how long a leader's lease +lasts for. A failed leader won't be replaced until its +lease (+maxClockSkew) has expired. The lease +also lets leaders efficiently serve reads +without doing quorum checks. + +Let maxClockSkew = 1 sec. The maxClockSkew +is a bound on how far out of sync our local +clocks may drift. + +givens +-------- + +* Given: Let each server have a numeric integer rank, that is distinct +and unique to that server. If necessary an extremely long +true random number is used to break ties between server ranks, so +that we may assert, with probability 1, that all ranks are distinct integers, +and that each server, absent an in-force lease, can be put into a strict total order. + +* Rule: The lower the rank is preferred for being the leader. + +* Ordering by lease time then rank: we order the pair (leaseExpires, rank) first +by largest leaseExpires time, then by lower rank. If +both leases have expired, then lease time is not considered as +a part of the ordering, and rank alone determines the +new leader. + +ALLCALL Algorithm phases +=========================== + +### I. Init phase + +When a server joins the cluster, +it does not issue allcalls (a ping of all members) +until after leaseTime + maxClockSkew time has elapsed. + +During init, the server does, however, accept and respond to allcalls() from +other cluster members. The allcall() ping will contain +the current (lease, leader-rank) and leader-id +according to the issuer of the allcall(). Every +recipient of the allcall updates her local information +of who she thinks the leader is, so long as the +received information is monotone in the (lease, leader-rank) +ordering; so a later unexpired lease will replace an +earlier unexpired lease, and if both are expired then +the lower rank will replace the larger rank as winner +of the current leader role. + +### II. regular ping phase + +After a server has finished its Init phase, it +enters its ping phase, where is stays until it +is shut down. + +During ping phase, the server continues to accept and respond +to allcall requests from other servers. Now in addition, +the server also issues its own allcall() pings every +heartBeat seconds. + +### III. Election and Lease determination + +Election and leasing are computed locally, by each +node, one heartBeat after receiving any replies +from the allcall(). The localnode who issued +the allcall sorts the respondents, including +itself, and if all leases have expired, it +determines who is the new leader and marks +their lease as starting from now. Lease +and leader computation is done locally +and independently on each server. +Once in ping phase, this new determination +is broadcast at the next heartbeat when +the local node issues an allcall(). + +If a node receives an allcall with a leader +claim, and that leader claim has a shorter +lease expirtaion time than the existing +leader, the new proposed leader is rejected +in favor of the current leader. This +favors continuity of leadership until +the end of the current leaders term, and +is a product of the sort order +described above where we sort +candidates by lease time then rank. + +## Properties of the allcall + +The allcalls() are heard by all active cluster members, and +contain the sender's computed result of who the current leader is, +and replies answer back with the recipient's own rank and id. Each +recipient of an allcall() replies to all cluster members. +Both the sending and the replying to the allcall are +broadcasts that are published to a well known topic. + +## Safety/Convergence: ALLCALL converges to one leader + +Suppose two nodes are partitioned and so both are leaders on +their own side of the network. Then suppose the network +is joined again, so the two leaders are brought together +by a healing of the network, or by adding a new link +between the networks. The two nodes exchange Ids and lease +times, and the node with the shorter valid lease time +adopts the node with the longer lease as leader, +since that is the sort order. The adoption takes +effect as soon as the loser's current lease expires. +Hence the two leader situation persists for at +most one lease term after the network join. + +## Liveness: a leader will be chosen + +Given the total order among nodes, exactly one +will be lowest rank and thus be the preferred +leader at the end of the any current leader's +lease, even if the current lease holder +has failed. Hence, with at least one live +node, the system can run for at most one +lease term before electing a leader. + + +## commentary + +ALLCALL does not guarantee that there will +never be more than one leader. Availability +in the face of network partition is +desirable in many cases, and ALLCALL is +appropriate for these. This is congruent +with Nats design as an always-on system. +ALLCALL does not guarantee that a +leader will always be present, but +with live nodes it does provide +that the cluster will have a leader +after one lease term + maxClockSkew +has expired. + +By design, ALLCALL functions well +in a cluster with any number of nodes. +One and two nodes, or an even number +of nodes, will work just fine. + +Compared to quorum based elections +like raft and paxos, where an odd +number of at least three +nodes is required to make progress, +this can be very desirable. + +ALLCALL is appropriate for AP, +rather than CP, style systems, where +availability is more important +than having a single writer. When +writes are idempotent or deduplicated +downstream, this is typically preferred. +It is better for availability and +uptime to run an always-on leadership +system. + +implementation +------------ + +ALLCALL is implented on top of +the Nats system, see the health/ +subdirectory of + +https://github.com/nats-io/gnatsd + diff --git a/health/agent.go b/health/agent.go new file mode 100644 index 0000000000..899c3eb5ee --- /dev/null +++ b/health/agent.go @@ -0,0 +1,78 @@ +package health + +import ( + "fmt" + "net" + "time" + + "github.com/nats-io/gnatsd/server" + //"github.com/nats-io/gnatsd/server/lcon" +) + +// Agent implements the InternalClient interface. +// It provides health status checks and +// leader election from among the candidate +// gnatsd instances in a cluster. +type Agent struct { + opts *server.Options + mship *Membership +} + +// NewAgent makes a new Agent. +func NewAgent(opts *server.Options) *Agent { + return &Agent{ + opts: opts, + } +} + +// Name should identify the internal client for logging. +func (h *Agent) Name() string { + return "health-agent" +} + +// Start makes an internal +// entirely in-process client that monitors +// cluster health and manages group +// membership functions. +// +func (h *Agent) Start( + info server.Info, + opts server.Options, + logger server.Logger, + +) (net.Conn, error) { + + // To keep the health client fast and its traffic + // internal-only, we use an bi-directional, + // in-memory version of a TCP stream. + // + // The buffers really do have to be of + // sufficient size, or we will + // deadlock/livelock the system. + // + cli, srv, err := NewInternalClientPair() + if err != nil { + return nil, fmt.Errorf("NewInternalClientPair() returned error: %s", err) + } + + rank := opts.HealthRank + beat := opts.HealthBeat + lease := opts.HealthLease + + cfg := &MembershipCfg{ + MaxClockSkew: time.Second, + BeatDur: beat, + LeaseTime: lease, + MyRank: rank, + CliConn: cli, + Log: logger, + } + h.mship = NewMembership(cfg) + go h.mship.Start() + return srv, nil +} + +// Stop halts the background goroutine. +func (h *Agent) Stop() { + h.mship.Stop() +} diff --git a/health/amap.go b/health/amap.go new file mode 100644 index 0000000000..8315dac6ec --- /dev/null +++ b/health/amap.go @@ -0,0 +1,49 @@ +package health + +import "sync" + +// atomic map from string to *ServerLoc + +type AtomicServerLocMap struct { + U map[string]*ServerLoc `json:"U"` + tex sync.RWMutex +} + +func NewAtomicServerLocMap() *AtomicServerLocMap { + return &AtomicServerLocMap{ + U: make(map[string]*ServerLoc), + } +} + +func (m *AtomicServerLocMap) Len() int { + m.tex.RLock() + n := len(m.U) + m.tex.RUnlock() + return n +} + +func (m *AtomicServerLocMap) Get(key string) *ServerLoc { + m.tex.RLock() + s := m.U[key] + m.tex.RUnlock() + return s +} + +func (m *AtomicServerLocMap) Get2(key string) (*ServerLoc, bool) { + m.tex.RLock() + v, ok := m.U[key] + m.tex.RUnlock() + return v, ok +} + +func (m *AtomicServerLocMap) Set(key string, val *ServerLoc) { + m.tex.Lock() + m.U[key] = val + m.tex.Unlock() +} + +func (m *AtomicServerLocMap) Del(key string) { + m.tex.Lock() + delete(m.U, key) + m.tex.Unlock() +} diff --git a/health/config.go b/health/config.go new file mode 100644 index 0000000000..8394b21861 --- /dev/null +++ b/health/config.go @@ -0,0 +1,82 @@ +package health + +import ( + "net" + "time" + + "github.com/nats-io/gnatsd/logger" + "github.com/nats-io/gnatsd/server" +) + +const DEAF_TRUE = 1 +const DEAF_FALSE = 0 + +type MembershipCfg struct { + + // max we allow for clocks to be out of sync. + // default to 1 second if not set. + MaxClockSkew time.Duration + + // how often we heartbeat. defaults to 100msec + // if not set. + BeatDur time.Duration + + // NatsUrl example "nats://127.0.0.1:4222" + NatsUrl string + + // defaults to "_nats.cluster.members." + SysMemberPrefix string + + // LeaseTime is the minimum time the + // leader is elected for. Defaults to 10 sec. + LeaseTime time.Duration + + // provide a default until the server gives us rank + MyRank int + + // optional, if provided we will use this connection on + // the client side. + CliConn net.Conn + + // where we log stuff. + Log server.Logger + + // for testing under network partition + deaf int64 + + // how much history to save + historyCount int +} + +func (cfg *MembershipCfg) SetDefaults() { + if cfg.LeaseTime == 0 { + cfg.LeaseTime = time.Second * 10 + } + if cfg.SysMemberPrefix == "" { + cfg.SysMemberPrefix = "_nats.cluster.members." + } + if cfg.BeatDur == 0 { + cfg.BeatDur = 100 * time.Millisecond + } + if cfg.MaxClockSkew == 0 { + cfg.MaxClockSkew = time.Second + } + if cfg.NatsUrl == "" { + cfg.NatsUrl = "nats://127.0.0.1:4222" + } + if cfg.Log == nil { + // stderr + cfg.Log = logger.NewStdLogger(micros, debug, trace, colors, pid) + } +} + +const colors = false +const micros, pid = true, true +const trace = false + +//const debug = true +const debug = false + +func (cfg *MembershipCfg) Dial(network, address string) (net.Conn, error) { + return cfg.CliConn, nil +} diff --git a/health/health.go b/health/health.go new file mode 100644 index 0000000000..3532a3c809 --- /dev/null +++ b/health/health.go @@ -0,0 +1,803 @@ +package health + +import ( + "encoding/json" + "fmt" + "log" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/nats-io/go-nats" +) + +// sysMemberPrefix creates a namespace +// for system cluster membership communication. +// This prefix aims to avoid collisions +// with user-level topics. Only system +// processes / internal clients should +// write to these topics, but everyone +// is welcome to listen on them. +// +// note: `_nats` is for now, can easily +// changed to be `_SYS` later once +// we're sure everything is working. +// +const sysMemberPrefix = "_nats.cluster.members." + +// ServerLoc conveys to interested parties +// the Id and location of one gnatsd +// server in the cluster. +type ServerLoc struct { + Id string `json:"serverId"` + Host string `json:"host"` + Port int `json:"port"` + + // Are we the leader? + IsLeader bool `json:"leader"` + + // LeaseExpires is zero for any + // non-leader. For the leader, + // LeaseExpires tells you when + // the leaders lease expires. + LeaseExpires time.Time `json:"leaseExpires"` + + // lower rank is leader until lease + // expires. Ties are broken by Id. + // Rank should be assignable on the + // gnatsd command line with -rank to + // let the operator prioritize + // leadership for certain hosts. + Rank int `json:"rank"` +} + +func (s *ServerLoc) String() string { + by, err := json.Marshal(s) + panicOn(err) + return string(by) +} + +func (s *ServerLoc) fromBytes(by []byte) error { + return json.Unmarshal(by, s) +} + +// Membership tracks the nats server cluster +// membership, issuing health checks and +// choosing a leader. +type Membership struct { + Cfg MembershipCfg + + elec *leadHolder + nc *nats.Conn + myLoc ServerLoc + + subjAllCall string + subjAllReply string + subjMemberLost string + subjMemberAdded string + subjMembership string + + halt *halter + mu sync.Mutex + stopping bool +} + +func (m *Membership) deaf() bool { + v := atomic.LoadInt64(&m.Cfg.deaf) + return v == DEAF_TRUE +} + +func (m *Membership) setDeaf() { + atomic.StoreInt64(&m.Cfg.deaf, DEAF_TRUE) +} + +func (m *Membership) unDeaf() { + atomic.StoreInt64(&m.Cfg.deaf, DEAF_FALSE) +} + +func NewMembership(cfg *MembershipCfg) *Membership { + m := &Membership{ + Cfg: *cfg, + halt: newHalter(), + } + m.elec = m.newLeadHolder(cfg.historyCount) + return m +} + +// leadHolder holds who is the current leader, +// and what their lease is. Used to synchronize +// access between various goroutines. +type leadHolder struct { + mu sync.Mutex + sloc ServerLoc + + myId string + myRank int + myLocHasBeenSet bool + + history *RingBuf + histsz int + + m *Membership +} + +func (m *Membership) newLeadHolder(histsz int) *leadHolder { + if histsz == 0 { + histsz = 100 + } + return &leadHolder{ + history: NewRingBuf(histsz), + histsz: histsz, + m: m, + } +} + +func (e *leadHolder) setMyLoc(myLoc *ServerLoc) { + e.mu.Lock() + if e.myLocHasBeenSet { + panic("no double set!") + } + e.myLocHasBeenSet = true + e.myId = myLoc.Id + e.myRank = myLoc.Rank + e.mu.Unlock() +} + +// getLeader retreives the stored e.sloc value. +func (e *leadHolder) getLeader() ServerLoc { + e.mu.Lock() + defer e.mu.Unlock() + return e.sloc +} + +// setLeader aims to copy sloc and store it +// for future getLeader() calls to access. +// +// However we reject any attempt to replace +// a leader with a one that doesn't rank lower, where rank +// includes the LeaseExpires time +// (see the ServerLocLessThan() function). +// +// If we accept sloc +// we return slocWon true. If we reject sloc then +// we return slocWon false. In short, we will only +// accept sloc if ServerLocLessThan(sloc, e.sloc), +// and we return ServerLocLessThan(sloc, e.sloc). +// +// If we return slocWon false, alt contains the +// value we favored, which is the current value +// of our retained e.sloc. If we return true, +// then alt contains a copy of sloc. We +// return a value in alt to avoid data races. +// +func (e *leadHolder) setLeader(sloc *ServerLoc) (slocWon bool, alt ServerLoc) { + e.mu.Lock() + defer e.mu.Unlock() + + if sloc == nil || sloc.Id == "" { + return false, e.sloc + } + + slocWon = ServerLocLessThan(sloc, &e.sloc, time.Now()) + if !slocWon { + return false, e.sloc + } + + e.sloc = *sloc + histcp := *sloc + e.history.Append(&histcp) + return true, e.sloc +} + +func (e *leadHolder) getLeaderAsBytes() []byte { + lead := e.getLeader() + by, err := json.Marshal(&lead) + panicOn(err) + return by +} + +// Stop blocks until the Membership goroutine +// acknowledges the shutdown request. +func (m *Membership) Stop() { + m.mu.Lock() + if m.stopping { + m.mu.Unlock() + return + } + m.stopping = true + m.mu.Unlock() + m.halt.ReqStop.Close() + <-m.halt.Done.Chan +} + +func (m *Membership) Start() error { + + m.Cfg.SetDefaults() + + pc := newPongCollector() + nc, err := m.setupNatsClient(pc) + if err != nil { + m.halt.Done.Close() + return err + } + m.nc = nc + go m.start(nc, pc) + return nil +} + +func (m *Membership) start(nc *nats.Conn, pc *pongCollector) { + + defer func() { + m.halt.Done.Close() + }() + + m.Cfg.Log.Debugf("health-agent: Listening on [%s]\n", m.subjAllCall) + log.SetFlags(log.LstdFlags) + + prevCount, curCount := 0, 0 + var curMember, prevMember *members + var curLead *ServerLoc + + // do an initial allcall() to discover any + // current leader. + m.Cfg.Log.Tracef("health-agent: "+ + "init: doing initial allcall "+ + "to discover any existing leader...") + + err := m.allcall() + if err != nil { + m.Cfg.Log.Debugf("health-agent: "+ + "error back from allcall, "+ + "terminating on: %s", err) + return + } + + select { + case <-time.After(m.Cfg.BeatDur): + case <-m.halt.ReqStop.Chan: + return + } + + prevCount, prevMember = pc.getSetAndClear(m.myLoc) + + now := time.Now() + + firstSeenLead := m.elec.getLeader() + xpire := firstSeenLead.LeaseExpires + + limit := xpire.Add(m.Cfg.MaxClockSkew) + if !xpire.IsZero() && limit.After(now) { + + m.Cfg.Log.Tracef("health-agent: init: "+ + "after one heartbeat, "+ + "we detect current leader '%s'"+ + " of rank %v with lease good "+ + "for %v until expiration + "+ + "maxClockSkew=='%v'", + firstSeenLead.Id, + firstSeenLead.Rank, + limit.Sub(now), + limit, + ) + } else { + m.Cfg.Log.Tracef("health-agent: "+ + "init: after one heartbeat,"+ + " no leader found. waiting "+ + "for a full leader lease "+ + "term of %s to expire...", + m.Cfg.LeaseTime) + + select { + case <-time.After(m.Cfg.LeaseTime): + case <-m.halt.ReqStop.Chan: + return + } + } + + // prev responses should be back by now. + var expired bool + var prevLead *ServerLoc + var nextLeadReportTm time.Time + + for { + // NB: replies to an + // allcall can/will change + // what the current leader + // is in elec. + err = m.allcall() + if err != nil { + // err could be: "write on closed buffer" + // typically means we are shutting down. + + m.Cfg.Log.Tracef("health-agent: "+ + "error on allcall, "+ + "shutting down the "+ + "health-agent: %s", + err) + return + } + + select { + case <-time.After(m.Cfg.BeatDur): + case <-m.halt.ReqStop.Chan: + return + } + lastSeenLead := m.elec.getLeader() + + // cur responses should be back by now + // and we can compare prev and cur. + curCount, curMember = pc.getSetAndClear(m.myLoc) + + now = time.Now() + expired, curLead = curMember.leaderLeaseExpired( + now, + m.Cfg.LeaseTime, + &lastSeenLead, + m.Cfg.MaxClockSkew, + ) + + // tell pong + won, alt := m.elec.setLeader(curLead) + if !won { + curLead = &alt + } + + loc, _ := nc.ServerLocation() + if loc != nil { + loc.Rank = m.Cfg.MyRank + if loc.Id == curLead.Id { + + if now.After(nextLeadReportTm) || + prevLead == nil || + prevLead.Id != curLead.Id { + + left := curLead.LeaseExpires.Sub(now) + m.Cfg.Log.Debugf("health-agent: "+ + "I am LEAD, my Id: '%s', "+ + "rank %v. lease expires "+ + "in %s", + loc.Id, + loc.Rank, + left) + + nextLeadReportTm = now.Add(left).Add(time.Second) + } + } else { + if prevLead != nil && + prevLead.Id == loc.Id { + + m.Cfg.Log.Debugf("health-agent: "+ + "I am no longer lead, "+ + "new LEAD is '%s', rank %v. "+ + "lease expires in %s", + curLead.Id, + curLead.Rank, + curLead.LeaseExpires.Sub(now)) + + } else { + if curLead != nil && + (nextLeadReportTm.IsZero() || now.After(nextLeadReportTm)) { + + left := curLead.LeaseExpires.Sub(now) + if curLead.Id == "" { + m.Cfg.Log.Debugf("health-agent: "+ + "I am '%s'/rank=%v. "+ + "lead is unknown.", + m.myLoc.Id, + m.myLoc.Rank) + + } else { + m.Cfg.Log.Debugf("health-agent: "+ + "I am not lead. lead is '%s', "+ + "rank %v, for %v", + curLead.Id, + curLead.Rank, + left) + + } + nextLeadReportTm = now.Add(left).Add(time.Second) + } + } + } + } + + lost := setDiff(prevMember, curMember, curLead) + gained := setDiff(curMember, prevMember, curLead) + same := setsEqual(prevMember, curMember) + + if same { + // nothing more to do. + // This is the common case when nothing changes. + } else { + lostBytes := lost.mustJsonBytes() + if !lost.setEmpty() { + if !m.deaf() { + nc.Publish(m.subjMemberLost, lostBytes) + // ignore errors on purpose; + // don't crash mid-health-report + // if at all possible. + } + } + gainedBytes := gained.mustJsonBytes() + if !gained.setEmpty() { + if !m.deaf() { + nc.Publish(m.subjMemberAdded, gainedBytes) + // same error approach as above. + } + } + } + if curCount < prevCount { + m.Cfg.Log.Debugf("health-agent: ---- "+ + "PAGE PAGE PAGE!! we went "+ + "down a server, from %v -> %v."+ + "lost: '%s'", + prevCount, + curCount, + lost) + + } else if curCount > prevCount && curCount > 0 { + m.Cfg.Log.Debugf("health-agent: ++++ "+ + "MORE ROBUSTNESS GAINED; "+ + "we went from %v -> %v. "+ + "gained: '%s'", + prevCount, + curCount, + gained) + + } + + if expired { + curBytes := curMember.mustJsonBytes() + if !m.deaf() { + nc.Publish(m.subjMembership, curBytes) + } + } + + // done with compare, now loop + prevCount = curCount + prevMember = curMember + prevLead = curLead + } +} + +func pong(nc *nats.Conn, subj string, msg []byte) { + err := nc.Publish(subj, msg) + panicOn(err) + nc.FlushTimeout(2 * time.Second) + // ignore error on nc.Flush(). + // might be: nats: connection closed on shutdown. +} + +// allcall sends out a health ping on the +// subjAllCall topic. +// +// The ping consists of sending the ServerLoc +// forf the current leader, which provides lease +// and full contact info for the leader. +// +// This gives a round-trip connectivity check. +// +func (m *Membership) allcall() error { + // allcall broadcasts the current leader + lease + leadby := m.elec.getLeaderAsBytes() + return m.nc.PublishRequest(m.subjAllCall, m.subjAllReply, leadby) +} + +// pongCollector collects the responses +// from an allcall request. +type pongCollector struct { + replies int + from *members + mu sync.Mutex +} + +func newPongCollector() *pongCollector { + return &pongCollector{ + from: newMembers(), + } +} + +// acumulate pong responses +func (pc *pongCollector) receivePong(msg *nats.Msg) { + pc.mu.Lock() + + pc.replies++ + + var loc ServerLoc + err := loc.fromBytes(msg.Data) + if err != nil { + pc.from.Amap.Set(loc.Id, &loc) + } + pc.mu.Unlock() +} + +func (pc *pongCollector) clear() { + pc.mu.Lock() + pc.from.clear() + pc.mu.Unlock() +} + +// getSet returns the count and set so far, then +// clears the set, emptying it, and then adding +// back just myLoc +func (pc *pongCollector) getSetAndClear(myLoc ServerLoc) (int, *members) { + + mem := pc.from.clone() + mem.clearLeaderAndLease() + pc.clear() + pc.from.Amap.Set(myLoc.Id, &myLoc) + return mem.Amap.Len(), mem +} + +// leaderLeaseExpired evaluates the lease as of now, +// and returns the leader or best candiate. Returns +// expired == true if any prior leader lease has +// lapsed. In this case we return the best new +// leader with its IsLeader bit set and its +// LeaseExpires set to now + lease. +// +// If expired == false then the we return +// the current leader in lead. +// +// PRE: there are only 0 or 1 leaders in m.Amap +// who have a non-zero LeaseExpires field. +// +// If m.Amap is empty, we return (true, nil). +// +// This method is where the actual "election" +// happens. See the ServerLocLessThan() +// function below for exactly how +// we rank candidates. +// +func (m *members) leaderLeaseExpired( + now time.Time, + leaseLen time.Duration, + prevLead *ServerLoc, + maxClockSkew time.Duration, + +) (expired bool, lead *ServerLoc) { + + if prevLead.LeaseExpires.Add(maxClockSkew).After(now) { + // honor the leases until they expire + return false, prevLead + } + + if m.Amap.Len() == 0 { + return false, prevLead + } + + // INVAR: any lease has expired. + + var sortme []*ServerLoc + m.Amap.tex.Lock() + for _, v := range m.Amap.U { + sortme = append(sortme, v) + } + m.Amap.tex.Unlock() + m.clearLeaderAndLease() + + sort.Sort(&byRankThenId{s: sortme, now: now}) + lead = sortme[0] + lead.IsLeader = true + lead.LeaseExpires = now.Add(leaseLen).UTC() + + return true, lead +} + +func (m *members) clearLeaderAndLease() { + m.Amap.tex.Lock() + for _, v := range m.Amap.U { + v.IsLeader = false + v.LeaseExpires = time.Time{} + } + m.Amap.tex.Unlock() +} + +type byRankThenId struct { + s []*ServerLoc + now time.Time +} + +func (p byRankThenId) Len() int { return len(p.s) } +func (p byRankThenId) Swap(i, j int) { p.s[i], p.s[j] = p.s[j], p.s[i] } + +// Less must be stable and computable locally yet +// applicable globally: it is how we choose a leader +// in a stable fashion. +func (p byRankThenId) Less(i, j int) bool { + return ServerLocLessThan(p.s[i], p.s[j], p.now) +} + +// ServerLocLessThan returns true iff i < j, in terms of rank. +// Lower rank is more electable. We order first by LeaseExpires, +// then by Rank, Id, Host, and Port; in that order. The +// longer leaseExpires wins (is less than). +func ServerLocLessThan(i, j *ServerLoc, now time.Time) bool { + nowu := now.UnixNano() + itm := i.LeaseExpires.UnixNano() + jtm := j.LeaseExpires.UnixNano() + + // if both are expired, then its a tie. + if jtm <= nowu { + jtm = 0 + } + if itm <= nowu { + itm = 0 + } + if itm != jtm { + return itm > jtm // we want an actual time to sort before a zero-time. + } + if i.Rank != j.Rank { + return i.Rank < j.Rank + } + if i.Id != j.Id { + return i.Id < j.Id + } + if i.Host != j.Host { + return i.Host < j.Host + } + return i.Port < j.Port +} + +func (m *Membership) setupNatsClient(pc *pongCollector) (*nats.Conn, error) { + discon := func(nc *nats.Conn) { + m.Cfg.Log.Tracef("health-agent: Disconnected from nats!") + } + optdis := nats.DisconnectHandler(discon) + norand := nats.DontRandomize() + + recon := func(nc *nats.Conn) { + loc, err := nc.ServerLocation() + panicOn(err) + m.Cfg.Log.Tracef("health-agent: Reconnect to nats!: loc = '%s'", loc) + } + optrecon := nats.ReconnectHandler(recon) + + opts := []nats.Option{optdis, optrecon, norand} + if m.Cfg.CliConn != nil { + opts = append(opts, nats.Dialer(&m.Cfg)) + } + + nc, err := nats.Connect(m.Cfg.NatsUrl, opts...) + if err != nil { + log.Fatalf("Can't connect: %v\n", err) + } + + loc, err := nc.ServerLocation() + if err != nil { + return nil, err + } + m.setLoc(loc) + m.Cfg.Log.Debugf("health-agent: HELLOWORLD: "+ + "I am '%s' at '%v:%v'. "+ + "rank %v", + m.myLoc.Id, + m.myLoc.Host, + m.myLoc.Port, + m.myLoc.Rank) + + m.subjAllCall = sysMemberPrefix + "allcall" + m.subjAllReply = sysMemberPrefix + "allreply" + m.subjMemberLost = sysMemberPrefix + "lost" + m.subjMemberAdded = sysMemberPrefix + "added" + m.subjMembership = sysMemberPrefix + "list" + + nc.Subscribe(m.subjAllReply, func(msg *nats.Msg) { + if m.deaf() { + return + } + pc.receivePong(msg) + }) + + nc.Subscribe(m.subjAllCall, func(msg *nats.Msg) { + if m.deaf() { + return + } + loc, err := nc.ServerLocation() + if err != nil { + return // try again next time. + } + + // did we accidentally change + // server locacations? + // Yikes, we don't want to do that! + // We are supposed to be monitoring + // just our own server. + if m.locDifferent(loc) { + panic(fmt.Sprintf("very bad! health-agent "+ + "changed locations! "+ + "first: '%s', now:'%s'", + m.myLoc, + loc)) + } + + // allcall broadcasts the leader + var lead ServerLoc + err = lead.fromBytes(msg.Data) + panicOn(err) + + if lead.Id != "" && !lead.LeaseExpires.IsZero() { + won, alt := m.elec.setLeader(&lead) + if !won { + // if we rejected, get our preferred leader. + lead = alt + } + + if loc.Id == lead.Id { + loc.IsLeader = true + loc.LeaseExpires = lead.LeaseExpires + } else { + loc.IsLeader = false + loc.LeaseExpires = time.Time{} + } + } + + hp, err := json.Marshal(loc) + panicOn(err) + if !m.deaf() { + pong(nc, msg.Reply, hp) + } + }) + + // reporting + nc.Subscribe(m.subjMemberLost, func(msg *nats.Msg) { + if m.deaf() { + return + } + m.Cfg.Log.Tracef("health-agent: "+ + "Received on [%s]: '%s'", + msg.Subject, + string(msg.Data)) + }) + + // reporting + nc.Subscribe(m.subjMemberAdded, func(msg *nats.Msg) { + if m.deaf() { + return + } + m.Cfg.Log.Tracef("health-agent: Received on [%s]: '%s'", + msg.Subject, string(msg.Data)) + }) + + // reporting + nc.Subscribe(m.subjMembership, func(msg *nats.Msg) { + if m.deaf() { + return + } + m.Cfg.Log.Tracef("health-agent: "+ + "Received on [%s]: '%s'", + msg.Subject, + string(msg.Data)) + }) + + return nc, nil +} + +func (m *Membership) locDifferent(b *nats.ServerLoc) bool { + m.mu.Lock() + defer m.mu.Unlock() + if b.Id != m.myLoc.Id { + return true + } + if b.Rank != m.myLoc.Rank { + return true + } + if b.Host != m.myLoc.Host { + return true + } + if b.Port != m.myLoc.Port { + return true + } + return false +} + +func (m *Membership) setLoc(b *nats.ServerLoc) { + m.mu.Lock() + m.myLoc.Id = b.Id + m.myLoc.Rank = b.Rank + m.myLoc.Host = b.Host + m.myLoc.Port = b.Port + m.mu.Unlock() + m.elec.setMyLoc(&m.myLoc) +} diff --git a/health/health_test.go b/health/health_test.go new file mode 100644 index 0000000000..36ac823b03 --- /dev/null +++ b/health/health_test.go @@ -0,0 +1,195 @@ +package health + +import ( + "fmt" + "math/rand" + "testing" + "time" + + cv "github.com/glycerine/goconvey/convey" + "github.com/nats-io/gnatsd/logger" + "github.com/nats-io/gnatsd/server" + gnatsd "github.com/nats-io/gnatsd/test" + "github.com/nats-io/go-nats" +) + +const TEST_PORT = 8392 +const DefaultTimeout = 2 * time.Second + +var cliOpts = nats.Options{ + Url: fmt.Sprintf("nats://localhost:%d", TEST_PORT), + AllowReconnect: true, + MaxReconnect: 10, + ReconnectWait: 10 * time.Millisecond, + Timeout: DefaultTimeout, +} + +// DefaultTestOptions are default options for the unit tests. +var serverOpts = server.Options{ + Host: "localhost", + Port: TEST_PORT, + NoLog: true, + NoSigs: true, + MaxControlLine: 256, +} + +func Test101StressTestManyClients(t *testing.T) { + + cv.Convey("when stress testing with 50 clients coming up and shutting down, we should survive and prosper", t, func() { + + s := RunServerOnPort(TEST_PORT) + defer s.Shutdown() + + n := 50 + var ms []*Membership + for i := 0; i < n; i++ { + cli, srv, err := NewInternalClientPair() + panicOn(err) + + s.InternalCliRegisterCallback(srv) + cfg := &MembershipCfg{ + CliConn: cli, + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 30 * time.Millisecond, + BeatDur: 10 * time.Millisecond, + NatsUrl: fmt.Sprintf("nats://localhost:%v", TEST_PORT), + MyRank: i, // ranks 0..n-1 + } + + m := NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + ms = append(ms, m) + defer m.Stop() + } + // the test here is basically that we didn't crash + // or hang. So if we got here, success. + cv.So(true, cv.ShouldBeTrue) + }) +} + +func Test102ConvergenceToOneLowRankLeaderAndLiveness(t *testing.T) { + + cv.Convey("Given a cluster of one server with rank 0, no matter what other servers arrive thinking they are the leader (say, after a partition is healed), as long as those other nodes have rank 1, our rank 0 process will persist in leading and all other arrivals will give up their leadership claims (after their leases expire). In addition to safety, this is also a liveness check: After a single lease term + clockskew, a leader will have been chosen.", t, func() { + + const maxPayload = 1024 * 1024 + s := RunServerOnPort(TEST_PORT) + defer func() { + p("starting gnatsd shutdown...") + s.Shutdown() + }() + + n := 50 + tot := 50 + pause := make([]int, n) + for i := 0; i < n; i++ { + pause[i] = 20 + rand.Intn(50) + tot += pause[i] + } + + var ms []*Membership + for i := 0; i < n; i++ { + + cfg := &MembershipCfg{ + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 150 * time.Millisecond, + BeatDur: 50 * time.Millisecond, + NatsUrl: fmt.Sprintf("nats://localhost:%v", TEST_PORT), + MyRank: i, //min(1, i), // ranks 0,1,1,1,1,1,... + deaf: DEAF_TRUE, // don't ping or pong + historyCount: 10000, + } + + cli, srv, err := NewInternalClientPair() + panicOn(err) + + s.InternalCliRegisterCallback(srv) + cfg.CliConn = cli + + if i == 0 { + cfg.deaf = DEAF_FALSE + aLogger := logger.NewStdLogger(micros, true, trace, colors, pid) + _ = aLogger + // to follow the prints, uncomment: + //cfg.Log = aLogger + } + + m := NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + ms = append(ms, m) + defer m.Stop() + } + + // let them all get past init phase. + time.Sleep(2 * (ms[0].Cfg.LeaseTime + ms[0].Cfg.MaxClockSkew)) + + // verify liveness, a leader exists. + p("verifying everyone thinks there is a leader:") + for i := 0; i < n; i++ { + //fmt.Printf("verifying %v thinks there is a leader\n", i) + cv.So(ms[i].elec.history.Avail(), cv.ShouldBeGreaterThan, 0) + } + + // bring in jobs after their random pause time + for i := 0; i < n; i++ { + dur := time.Duration(pause[i]) * time.Millisecond + //p("%v on i = %v/dur=%v ", time.Now().UTC(), i, dur) + time.Sleep(dur) + ms[i].unDeaf() + } + + // check that the history from rank 0 + // always shows rank 0 as lead. + h := ms[0].elec.history + av := h.Avail() + //p("ms[0].myLoc.Id = %v", ms[0].myLoc.Id) + cv.So(ms[0].myLoc.Id, cv.ShouldNotEqual, "") + cv.So(av, cv.ShouldBeGreaterThan, 10) + p("av: available history len = %v", av) + + // prints first: + /* + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*ServerLoc) + fmt.Printf("history print i = %v. sloc.Id=%v / sloc.Rank=%v\n", i, sloc.Id, sloc.Rank) + } + */ + // checks second: + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*ServerLoc) + //fmt.Printf("history check Id at i = %v. sloc.Id=%v\n", i, sloc.Id) + cv.So(sloc.Id, cv.ShouldEqual, ms[0].myLoc.Id) + } + + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*ServerLoc) + //p("history check Rank at i = %v. sloc.Rank=%v", i, sloc.Rank) + cv.So(sloc.Rank, cv.ShouldEqual, 0) + } + }) +} + +func RunServerOnPort(port int) *server.Server { + opts := serverOpts + opts.Port = port + return gnatsd.RunServer(&opts) +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func max(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/health/healthcmd/main.go b/health/healthcmd/main.go new file mode 100644 index 0000000000..4cec1950cb --- /dev/null +++ b/health/healthcmd/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "strconv" + "time" + + "github.com/nats-io/gnatsd/health" +) + +// healthcmd runs an allcall election from a standalone +// command line nats client. It exercises the same +// gnatsd/health library code that runs as an internal client +// in process with gnatsd. + +func usage() { + log.Fatalf("use: healthcmd {host}:port {rank}\n") +} + +func main() { + + log.SetFlags(0) + flag.Usage = usage + flag.Parse() + + args := flag.Args() + if len(args) < 1 { + usage() + } + + rank := 0 + var err error + if len(args) >= 2 { + rank, err = strconv.Atoi(args[1]) + if err != nil { + fmt.Fprintf(os.Stderr, "2nd arg should be our numeric rank") + } + } + + cfg := &health.MembershipCfg{ + MaxClockSkew: time.Second, + BeatDur: 100 * time.Millisecond, + NatsUrl: "nats://" + args[0], // "nats://127.0.0.1:4222" + MyRank: rank, + } + m := health.NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + + select {} +} diff --git a/health/icc.go b/health/icc.go new file mode 100644 index 0000000000..60bc5b4a63 --- /dev/null +++ b/health/icc.go @@ -0,0 +1,64 @@ +package health + +import ( + "net" +) + +// Icc allows the server to +// detect a net.Conn as +// an internal client connection +// by checking if it implements the +// LocalInternalClient interface. +// +type Icc struct { + *net.TCPConn +} + +// IsInternal satisfy LocalInternalClient interface +func (c *Icc) IsInternal() {} + +// NewInternalClientPair() constructs a client/server +// pair that wrap tcp endpoints in Icc to let +// the server recognized them as internal. +// +func NewInternalClientPair() (cli, srv *Icc, err error) { + + lsn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return + } + + srvDone := make(chan struct{}) + go func() { + s, err2 := lsn.Accept() + if err2 == nil { + srv = &Icc{TCPConn: s.(*net.TCPConn)} + } else { + err = err2 + } + lsn.Close() + close(srvDone) + }() + + addr := lsn.Addr() + c, err3 := net.Dial(addr.Network(), addr.String()) + <-srvDone + + if err3 != nil { + err = err3 + if srv != nil { + srv.Close() + srv = nil + } + return + } + cli = &Icc{TCPConn: c.(*net.TCPConn)} + // INVAR: cli ok. + if err != nil { + cli.Close() + cli = nil + return + } + // INVAR: srv ok. + return +} diff --git a/health/icc_test.go b/health/icc_test.go new file mode 100644 index 0000000000..4ffae665c4 --- /dev/null +++ b/health/icc_test.go @@ -0,0 +1,48 @@ +package health + +import ( + "net" + "testing" + + "github.com/nats-io/gnatsd/server" +) + +func TestIccTypeSwitchWorks(t *testing.T) { + var nc net.Conn = &Icc{} + _, isIcc := nc.(server.LocalInternalClient) + if !isIcc { + t.Fatalf("nc was not LocalInternalClient, as it should be!") + } +} + +func TestIccAsNetConn(t *testing.T) { + + // write to a, read from b + a, b, err := NewInternalClientPair() + if err != nil { + panic(err) + } + + msg := "hello-world" + + n, err := a.Write([]byte(msg)) + if err != nil { + t.Errorf("err = %v", err) + } + if n != len(msg) { + t.Errorf("Write truncated at %v < %v", n, len(msg)) + } + + readbuf := make([]byte, len(msg)) + m, err := b.Read(readbuf) + if err != nil { + t.Errorf("err = %v", err) + } + if m != n { + t.Errorf("Read truncated at %v !=n %v", m, n) + } + back := string(readbuf[:m]) + if back != msg { + t.Errorf("msg corrupted, wrote '%v', read '%v'", msg, back) + } +} diff --git a/health/idem.go b/health/idem.go new file mode 100644 index 0000000000..107399d321 --- /dev/null +++ b/health/idem.go @@ -0,0 +1,106 @@ +package health + +import ( + "fmt" + "sync" +) + +// Copyright (c) 2017 Jason E. Aten, Ph.D. +// https://github.com/glycerine/idem +// MIT license. + +// idemCloseChan can have Close() called on it +// multiple times, and it will only close +// Chan once. +type idemCloseChan struct { + Chan chan bool + closed bool + mut sync.Mutex +} + +// Reinit re-allocates the Chan, assinging +// a new channel and reseting the state +// as if brand new. +func (c *idemCloseChan) Reinit() { + c.mut.Lock() + defer c.mut.Unlock() + c.Chan = make(chan bool) + c.closed = false +} + +// NewIdemCloseChan makes a new idemCloseChan. +func NewIdemCloseChan() *idemCloseChan { + return &idemCloseChan{ + Chan: make(chan bool), + } +} + +var ErrAlreadyClosed = fmt.Errorf("Chan already closed") + +// Close returns ErrAlreadyClosed if it has been +// called before. It never closes IdemClose.Chan more +// than once, so it is safe to ignore the returned +// error value. Close() is safe for concurrent access by multiple +// goroutines. Close returns nil after the first time +// it is called. +func (c *idemCloseChan) Close() error { + c.mut.Lock() + defer c.mut.Unlock() + if !c.closed { + close(c.Chan) + c.closed = true + return nil + } + return ErrAlreadyClosed +} + +// IsClosed tells you if Chan is already closed or not. +func (c *idemCloseChan) IsClosed() bool { + c.mut.Lock() + defer c.mut.Unlock() + return c.closed +} + +// halter helps shutdown a goroutine +type halter struct { + // The owning goutine should call Done.Close() as its last + // actual once it has received the ReqStop() signal. + Done idemCloseChan + + // Other goroutines call ReqStop.Close() in order + // to request that the owning goroutine stop immediately. + // The owning goroutine should select on ReqStop.Chan + // in order to recognize shutdown requests. + ReqStop idemCloseChan +} + +func newHalter() *halter { + return &halter{ + Done: *NewIdemCloseChan(), + ReqStop: *NewIdemCloseChan(), + } +} + +// RequestStop closes the h.ReqStop channel +// if it has not already done so. Safe for +// multiple goroutine access. +func (h *halter) RequestStop() { + h.ReqStop.Close() +} + +// MarkDone closes the h.Done channel +// if it has not already done so. Safe for +// multiple goroutine access. +func (h *halter) MarkDone() { + h.Done.Close() +} + +// IsStopRequested returns true iff h.ReqStop has been Closed(). +func (h *halter) IsStopRequested() bool { + return h.ReqStop.IsClosed() +} + +// IsDone returns true iff h.Done has been Closed(). +func (h *halter) IsDone() bool { + return h.Done.IsClosed() +} diff --git a/health/rbuf.go b/health/rbuf.go new file mode 100644 index 0000000000..1f8aaa1379 --- /dev/null +++ b/health/rbuf.go @@ -0,0 +1,128 @@ +package health + +// https://github.com/glycerine/rbuf +// copyright (c) 2014, Jason E. Aten +// license: MIT + +import "io" + +// RingBuf: +// +// a fixed-size circular ring buffer. Just what it says. +// +type RingBuf struct { + A []interface{} + N int // MaxViewInBytes, the size of A + Beg int // start of data in A + Readable int // number of bytes available to read in A +} + +// NewRingBuf constructs a new RingBuf. +func NewRingBuf(maxViewInBytes int) *RingBuf { + n := maxViewInBytes + r := &RingBuf{ + N: n, + Beg: 0, + Readable: 0, + } + r.A = make([]interface{}, n, n) + + return r +} + +// Reset quickly forgets any data stored in the ring buffer. The +// data is still there, but the ring buffer will ignore it and +// overwrite those buffers as new data comes in. +func (b *RingBuf) Reset() { + b.Beg = 0 + b.Readable = 0 +} + +// Advance(): non-standard, but better than Next(), +// because we don't have to unwrap our buffer and pay the cpu time +// for the copy that unwrapping may need. +// Useful in conjuction/after ReadWithoutAdvance() above. +func (b *RingBuf) Advance(n int) { + if n <= 0 { + return + } + if n > b.Readable { + n = b.Readable + } + b.Readable -= n + b.Beg = (b.Beg + n) % b.N +} + +func intMin(a, b int) int { + if a < b { + return a + } else { + return b + } +} + +func (f *RingBuf) Avail() int { + return f.Readable +} + +// returns the earliest index, or -1 if +// the ring is empty +func (f *RingBuf) First() int { + if f.Readable == 0 { + return -1 + } + return f.Beg +} + +// returns the index of the last element, +// or -1 if the ring is empty. +func (f *RingBuf) Last() int { + if f.Readable == 0 { + return -1 + } + + last := f.Beg + f.Readable - 1 + if last < f.N { + // we fit without wrapping + return last + } + + return last % f.N +} + +// Kth presents the contents of the +// ring as a strictly linear sequence, +// so the user doesn't need to think +// about modular arithmetic. Here k indexes from +// [0, f.Readable-1], assuming f.Avail() +// is greater than 0. Kth() returns an +// actual index where the logical k-th +// element, starting from f.Beg, resides. +// f.Beg itself lives at k = 0. If k is +// out of bounds, or the ring is empty, +// -1 is returned. +func (f *RingBuf) Kth(k int) int { + if f.Readable == 0 || k < 0 || k >= f.Readable { + return -1 + } + return (f.Beg + k) % f.N +} + +// +// Append returns an error if there is no more +// space in the ring. Otherwise it returns nil +// and writes p into the ring in last position. +// +func (b *RingBuf) Append(p interface{}) error { + writeCapacity := b.N - b.Readable + if writeCapacity <= 0 { + // we are all full up already. + return io.ErrShortWrite + } + + writeStart := (b.Beg + b.Readable) % b.N + b.A[writeStart] = p + + b.Readable += 1 + return nil +} diff --git a/health/setutil.go b/health/setutil.go new file mode 100644 index 0000000000..8e72401a9b --- /dev/null +++ b/health/setutil.go @@ -0,0 +1,112 @@ +package health + +import ( + "encoding/json" + "fmt" +) + +// utilities and sets stuff + +// p is a shortcut for a call to fmt.Printf that implicitly starts +// and ends its message with a newline. +func p(format string, stuff ...interface{}) { + fmt.Printf("\n "+format+"\n", stuff...) +} + +func panicOn(err error) { + if err != nil { + panic(err) + } +} + +// return a minus b, where a and b are sets. +func setDiff(a, b *members, curLead *ServerLoc) *members { + + res := newMembers() + a.Amap.tex.Lock() + for k, v := range a.Amap.U { + + if curLead != nil { + // annotate leader as we go... + if v.Id == curLead.Id { + v.IsLeader = true + v.LeaseExpires = curLead.LeaseExpires + } + } + + if _, found := b.Amap.U[k]; !found { // data race + res.Amap.U[k] = v + } + } + a.Amap.tex.Unlock() + return res +} + +func setsEqual(a, b *members) bool { + a.Amap.tex.Lock() + b.Amap.tex.Lock() + defer b.Amap.tex.Unlock() + defer a.Amap.tex.Unlock() + + alen := len(a.Amap.U) + if alen != len(b.Amap.U) { + return false + } + // INVAR: len(a) == len(b) + if alen == 0 { + return true + } + for k := range a.Amap.U { + if _, found := b.Amap.U[k]; !found { + return false + } + } + // INVAR: all of a was found in b, and they + // are the same size + return true +} + +type members struct { + GroupName string `json:"GroupName"` + Amap *AtomicServerLocMap `json:"Mem"` +} + +func (m *members) clear() { + m.Amap = NewAtomicServerLocMap() +} + +func (m *members) clone() *members { + cp := newMembers() + cp.GroupName = m.GroupName + if m.Amap == nil { + return cp + } + m.Amap.tex.Lock() + cp.Amap.tex.Lock() + for k, v := range m.Amap.U { + cp.Amap.U[k] = v + } + cp.Amap.tex.Unlock() + m.Amap.tex.Unlock() + return cp +} + +func (m *members) setEmpty() bool { + return m.Amap.Len() == 0 +} + +func (m *members) String() string { + return string(m.mustJsonBytes()) +} + +func newMembers() *members { + return &members{ + Amap: NewAtomicServerLocMap(), + } +} + +func (m members) mustJsonBytes() []byte { + by, err := json.Marshal(m) + panicOn(err) + return by +} diff --git a/main.go b/main.go index 425372390f..8ec3a63eb1 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,10 @@ import ( "net" "net/url" "os" + "time" "github.com/nats-io/gnatsd/auth" + "github.com/nats-io/gnatsd/health" "github.com/nats-io/gnatsd/logger" "github.com/nats-io/gnatsd/server" ) @@ -52,6 +54,11 @@ Cluster Options: --no_advertise Advertise known cluster IPs to clients --connect_retries For implicit routes, number of connect retries +Cluster Health Monitor: + --health Run the health monitoring/leader election agent + --lease Duration of leader leases. default: 12s + --beat Time between heartbeats (want 3-4/lease). default: 3s + --rank Smaller rank gives priority in leader election Common Options: -h, --help Show this message @@ -118,6 +125,10 @@ func main() { flag.StringVar(&opts.TLSCert, "tlscert", "", "Server certificate file.") flag.StringVar(&opts.TLSKey, "tlskey", "", "Private key for server certificate.") flag.StringVar(&opts.TLSCaCert, "tlscacert", "", "Client certificate CA for verification.") + flag.BoolVar(&opts.HealthAgent, "health", false, "Run the health agent, elect a leader.") + flag.IntVar(&opts.HealthRank, "rank", 7, "leader election priority: the smaller the rank, the more preferred the server is as a leader. Negative ranks are allowed. Ties are broken by the random ServerId.") + flag.DurationVar(&opts.HealthLease, "lease", time.Second*12, "leader lease duration (should allow 3-4 beats within a lease)") + flag.DurationVar(&opts.HealthBeat, "beat", time.Second*3, "heart beat every this often (should get 3-4 beats within a lease)") flag.Usage = func() { fmt.Printf("%s\n", usageStr) @@ -175,6 +186,10 @@ func main() { server.PrintAndDie(err.Error()) } + if opts.HealthAgent { + opts.InternalCli = append(opts.InternalCli, health.NewAgent(&opts)) + } + // Create the server with appropriate options. s := server.New(&opts) diff --git a/server/client.go b/server/client.go index 5a64a79c35..901749d0ef 100644 --- a/server/client.go +++ b/server/client.go @@ -20,6 +20,9 @@ const ( CLIENT = iota // ROUTER is another router in the cluster. ROUTER + // INTERNALCLI is an internal client. + // An example is the health-agent. + INTERNALCLI ) const ( @@ -216,6 +219,8 @@ func (c *client) initClient() { c.ncs = fmt.Sprintf("%s - cid:%d", conn, c.cid) case ROUTER: c.ncs = fmt.Sprintf("%s - rid:%d", conn, c.cid) + case INTERNALCLI: + c.ncs = fmt.Sprintf("internal:0 - hid:%d", c.cid) } } @@ -407,6 +412,8 @@ func (c *client) processErr(errStr string) { c.Errorf("Client Error %s", errStr) case ROUTER: c.Errorf("Route Error %s", errStr) + case INTERNALCLI: + c.Errorf("InternalClient Error %s", errStr) } c.closeConnection() } diff --git a/server/icli.go b/server/icli.go new file mode 100644 index 0000000000..3b5374b543 --- /dev/null +++ b/server/icli.go @@ -0,0 +1,89 @@ +package server + +import ( + "net" + "sync" +) + +// LocalInternalClient is a trait interface. +// The net.Conn implementations of +// internal clients provided over +// the accept() callback (see Start below) +// should implement it to tell the server to ignore +// TLS and auth for internal clients. +// +type LocalInternalClient interface { + IsInternal() +} + +// iCli tracks the internal +// clients. +// +type iCli struct { + configured []InternalClient + mu sync.Mutex +} + +// InternalClient provides +// a plugin-like interface, +// supporting internal clients that live +// in-process with the Server +// on their own goroutines. +// +// An example of an internal client +// is the health monitoring client. +// In order to be effective, its lifetime +// must exactly match that of the +// server it monitors. +// +type InternalClient interface { + + // Name should return a readable + // human name for the InternalClient; + // it will be invoked as a part of + // startup/shutdown/error logging. + // + Name() string + + // Start should run the client on + // a background goroutine. + // + // The Server s will invoke Start() + // as a part of its own init and setup. + // + // The info and opts pointers will be + // viewable from an already locked Server + // instance, and so can be read without + // worrying about data races. + // + // Any returned error will be logged. + // This will not prevent the Server + // from calling Stop() on termination, + // and Stop() must be expected (and + // not block) no matter what. + // + // By returning an net.Conn the client + // provides the server with the + // equivalent of a Listen/Accept created + // net.Conn for communication with + // the client. + // + // The iclient should log using logger. + // + Start(info Info, + opts Options, + logger Logger) (net.Conn, error) + + // Stop should shutdown the goroutine(s) + // of the internal client. + // The Server will invoke Stop() as a part + // of its own shutdown process, *even* if + // Start() failed to start the background + // goroutine. Authors should take care + // to allow Stop() to be called even + // on a failed start. + // + // Stop is expected not to block for long. + // + Stop() +} diff --git a/server/monitor.go b/server/monitor.go index 6cce144b7f..dc473cc204 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -473,10 +473,11 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { v.TotalConnections = s.totalClients v.Routes = len(s.routes) v.Remotes = len(s.remotes) - v.InMsgs = s.inMsgs - v.InBytes = s.inBytes - v.OutMsgs = s.outMsgs - v.OutBytes = s.outBytes + // atomic loads avoid data races with client.go:298,951 + v.InMsgs = atomic.LoadInt64(&s.inMsgs) + v.InBytes = atomic.LoadInt64(&s.inBytes) + v.OutMsgs = atomic.LoadInt64(&s.outMsgs) + v.OutBytes = atomic.LoadInt64(&s.outBytes) v.SlowConsumers = s.slowConsumers v.Subscriptions = s.sl.Count() s.httpReqStats[VarzPath]++ diff --git a/server/opts.go b/server/opts.go index 8b97fa451a..46dc7d13c5 100644 --- a/server/opts.go +++ b/server/opts.go @@ -71,10 +71,10 @@ type Options struct { ProfPort int `json:"-"` PidFile string `json:"-"` LogFile string `json:"-"` - Syslog bool `json:"-"` RemoteSyslog string `json:"-"` Routes []*url.URL `json:"-"` RoutesStr string `json:"-"` + Syslog bool `json:"-"` TLSTimeout float64 `json:"tls_timeout"` TLS bool `json:"-"` TLSVerify bool `json:"-"` @@ -83,6 +83,12 @@ type Options struct { TLSCaCert string `json:"-"` TLSConfig *tls.Config `json:"-"` WriteDeadline time.Duration `json:"-"` + + InternalCli []InternalClient `json:"-"` + HealthAgent bool `json:"health_agent"` + HealthRank int `json:"health_rank"` + HealthLease time.Duration `json:"health_lease"` + HealthBeat time.Duration `json:"health_beat"` } // Configuration file authorization section. @@ -225,6 +231,14 @@ func ProcessConfigFile(configFile string) (*Options, error) { opts.PingInterval = time.Duration(int(v.(int64))) * time.Second case "ping_max": opts.MaxPingsOut = int(v.(int64)) + case "health_rank": + opts.HealthRank = int(v.(int64)) + case "health_lease": + opts.HealthLease = v.(time.Duration) + case "health_beat": + opts.HealthBeat = v.(time.Duration) + case "health_agent": + opts.HealthAgent = v.(bool) case "tls": tlsm := v.(map[string]interface{}) tc, err := parseTLS(tlsm) diff --git a/server/server.go b/server/server.go index 7d616d2b0d..b8d4d808f2 100644 --- a/server/server.go +++ b/server/server.go @@ -39,6 +39,7 @@ type Info struct { MaxPayload int `json:"max_payload"` IP string `json:"ip,omitempty"` ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to. + ServerRank int `json:"server_rank"` // lowest rank wins leader election. // Used internally for quick look-ups. clientConnectURLs map[string]struct{} @@ -77,6 +78,7 @@ type Server struct { grRunning bool grWG sync.WaitGroup // to wait on various go routines cproto int64 // number of clients supporting async INFO + icli iCli // in-process internal clients } // Make sure all are 64bits for atomic use @@ -118,6 +120,7 @@ func New(opts *Options) *Server { trace: opts.Trace, done: make(chan bool, 1), start: time.Now(), + icli: iCli{configured: opts.InternalCli}, } s.mu.Lock() @@ -267,10 +270,53 @@ func (s *Server) Start() { s.StartProfiler() } + // Run the internal clients in + // s.icli.configured. + // + // Retain only those started + // successfully in s.icli.running. + // + s.icli.mu.Lock() + go func(info Info, opts Options) { + defer s.icli.mu.Unlock() + + // wait for server to be accepting clients + select { + case <-s.rcQuit: + return + case <-clientListenReady: + } + + n := len(s.icli.configured) + if n == 0 { + return + } + Debugf("Starting the %v internal client(s).", n) + for _, ic := range s.icli.configured { + srv, err := ic.Start(info, opts, log.logger) + if err == nil { + if srv != nil { + s.InternalCliRegisterCallback(srv) + } + Noticef("InternalClient ['%s'] started.", ic.Name()) + } else { + Errorf("InternalClient ['%s'] failed to Start(): %s", ic.Name(), err) + } + } + }(s.info, *s.opts) + // Wait for clients. s.AcceptLoop(clientListenReady) } +// InternalCliRegisterCallback is public only for testing. +func (s *Server) InternalCliRegisterCallback(srv net.Conn) { + s.startGoRoutine(func() { + s.createClient(srv) + s.grWG.Done() + }) +} + // Shutdown will shutdown the server instance by kicking out the AcceptLoop // and closing all associated clients. func (s *Server) Shutdown() { @@ -287,6 +333,13 @@ func (s *Server) Shutdown() { s.grRunning = false s.grMu.Unlock() + // stop any internal clients + s.icli.mu.Lock() + for _, ic := range s.icli.configured { + ic.Stop() + } + s.icli.mu.Unlock() + conns := make(map[uint64]*client) // Copy off the clients @@ -531,6 +584,11 @@ func (s *Server) startMonitoring(secure bool) { func (s *Server) createClient(conn net.Conn) *client { c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: s.info.MaxPayload, start: time.Now()} + _, isInternal := conn.(LocalInternalClient) + if isInternal { + c.typ = INTERNALCLI + } + // Grab JSON info string s.mu.Lock() info := s.infoJSON @@ -548,7 +606,7 @@ func (s *Server) createClient(conn net.Conn) *client { c.Debugf("Client connection created") // Check for Auth - if authRequired { + if !isInternal && authRequired { c.setAuthTimer(secondsToDuration(s.opts.AuthTimeout)) } @@ -582,7 +640,7 @@ func (s *Server) createClient(conn net.Conn) *client { c.mu.Lock() // Check for TLS - if tlsRequired { + if !isInternal && tlsRequired { c.Debugf("Starting TLS client connection handshake") c.nc = tls.Server(c.nc, s.opts.TLSConfig) conn := c.nc.(*tls.Conn) @@ -613,7 +671,7 @@ func (s *Server) createClient(conn net.Conn) *client { return c } - if tlsRequired { + if !isInternal && tlsRequired { // Rewrap bw c.bw = bufio.NewWriterSize(c.nc, startBufSize) } @@ -621,12 +679,14 @@ func (s *Server) createClient(conn net.Conn) *client { // Do final client initialization // Set the Ping timer - c.setPingTimer() + if !isInternal { + c.setPingTimer() + } // Spin up the read loop. s.startGoRoutine(func() { c.readLoop() }) - if tlsRequired { + if !isInternal && tlsRequired { c.Debugf("TLS handshake complete") cs := c.nc.(*tls.Conn).ConnectionState() c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite)) @@ -755,6 +815,8 @@ func (s *Server) checkAuth(c *client) bool { return s.checkClientAuth(c) case ROUTER: return s.checkRouterAuth(c) + case INTERNALCLI: + return true default: return false }