diff --git a/health/ALGORITHM.md b/health/ALGORITHM.md new file mode 100644 index 00000000000..a61583370f2 --- /dev/null +++ b/health/ALGORITHM.md @@ -0,0 +1,148 @@ +# The ALLCALL leader election algorithm. + +Jason E. Aten + +February 2017 + +definition, with example value. +----------- + +Let heartBeat = 3 sec. This is how frequently +we will assess cluster health by +sending out an allcall ping. + +givens +-------- + +* Given: Let each server have a numeric integer rank, that is distinct +and unique to that server. If necessary an extremely long +true random number is used to break ties between server ranks, so +that we may assert, with probability 1, that all ranks are distinct. +Server can be put into a strict total order. + +* Rule: The lower the rank is preferred for being the leader. + + +ALLCALL Algorithm +=========================== + +### I. In a continuous loop + +The server always accepts and respond +to allcall broadcasts from other cluster members. + +The allcall() ping asks each member of +the server cluster to reply. Replies +provide the responder's own assigned rank +and identity. + +### II. Election + +Election are computed locally, after accumulating +responses. + +After issuing an allcall, the server +listens for a heartbeat interval. + +At the end of the interval, it sorts +the respondents by rank. Since +the replies are on a broadcast +channel, more than one simultaneous +allcall reply may be incorporated into +the set of respondents. + +The lowest ranking server is elected +as leader. + +## Safety/Convergence: ALLCALL converges to one leader + +Suppose two nodes are partitioned and so both are leaders on +their own side of the network. Then suppose the network +is joined again, so the two leaders are brought together +by a healing of the network, or by adding a new link +between the networks. The two nodes exchange Ids and +ranks via responding to allcalls, and compute +who is the new leader. + +Hence the two leader situation persists for at +most one heartbeat term after the network join. + +## Liveness: a leader will be chosen + +Given the total order among nodes, exactly one +will be lowest rank and thus be the preferred +leader. If the leader fails, the next +heartbeat of allcall will omit that +server from the candidate list, and +the next ranking server will be chosen. + +Hence, with at least one live +node, the system can run for at most one +heartbeat term before electing a leader. +Since there is a total order on +all live (non-failed) servers, only +one will be chosen. + +## commentary + +ALLCALL does not guarantee that there will +never be more than one leader. Availability +in the face of network partition is +desirable in many cases, and ALLCALL is +appropriate for these. This is congruent +with Nats design as an always-on system. + +ALLCALL does not guarantee that a +leader will always be present, but +with live nodes it does provide +that the cluster will have a leader +after one heartbeat term has +been initiated and completed. + +By design, ALLCALL functions well +in a cluster with any number of nodes. +One and two nodes, or an even number +of nodes, will work just fine. + +Compared to quorum based elections +like raft and paxos, where an odd +number of at least three +nodes is required to make progress, +this can be very desirable. + +ALLCALL is appropriate for AP, +rather than CP, style systems, where +availability is more important +than having a single writer. When +writes are idempotent or deduplicated +downstream, this is typically preferred. + +prior art +---------- + +ALLCALL is a simplified version of +the well known Bully Algorithm[1][2] +for election in a distributed system +of arbitrary graph. + +ALLCALL does less bullying, and lets +nodes arrive at their own conclusions. +The essential broadcast and ranking +mechanism, however, is identical. + +[1] Hector Garcia-Molina, Elections in a +Distributed Computing System, IEEE +Transactions on Computers, +Vol. C-31, No. 1, January (1982) 48–59 + +[2] https://en.wikipedia.org/wiki/Bully_algorithm + +implementation +------------ + +ALLCALL is implented on top of +the Nats (https://nats.io) system, see the health/ +subdirectory of + +https://github.com/nats-io/gnatsd + diff --git a/health/agent.go b/health/agent.go new file mode 100644 index 00000000000..899c3eb5eec --- /dev/null +++ b/health/agent.go @@ -0,0 +1,78 @@ +package health + +import ( + "fmt" + "net" + "time" + + "github.com/nats-io/gnatsd/server" + //"github.com/nats-io/gnatsd/server/lcon" +) + +// Agent implements the InternalClient interface. +// It provides health status checks and +// leader election from among the candidate +// gnatsd instances in a cluster. +type Agent struct { + opts *server.Options + mship *Membership +} + +// NewAgent makes a new Agent. +func NewAgent(opts *server.Options) *Agent { + return &Agent{ + opts: opts, + } +} + +// Name should identify the internal client for logging. +func (h *Agent) Name() string { + return "health-agent" +} + +// Start makes an internal +// entirely in-process client that monitors +// cluster health and manages group +// membership functions. +// +func (h *Agent) Start( + info server.Info, + opts server.Options, + logger server.Logger, + +) (net.Conn, error) { + + // To keep the health client fast and its traffic + // internal-only, we use an bi-directional, + // in-memory version of a TCP stream. + // + // The buffers really do have to be of + // sufficient size, or we will + // deadlock/livelock the system. + // + cli, srv, err := NewInternalClientPair() + if err != nil { + return nil, fmt.Errorf("NewInternalClientPair() returned error: %s", err) + } + + rank := opts.HealthRank + beat := opts.HealthBeat + lease := opts.HealthLease + + cfg := &MembershipCfg{ + MaxClockSkew: time.Second, + BeatDur: beat, + LeaseTime: lease, + MyRank: rank, + CliConn: cli, + Log: logger, + } + h.mship = NewMembership(cfg) + go h.mship.Start() + return srv, nil +} + +// Stop halts the background goroutine. +func (h *Agent) Stop() { + h.mship.Stop() +} diff --git a/health/aloc.go b/health/aloc.go new file mode 100644 index 00000000000..0440639600f --- /dev/null +++ b/health/aloc.go @@ -0,0 +1,89 @@ +package health + +import ( + "encoding/json" + "os" + "time" + + "github.com/nats-io/go-nats" +) + +// AgentLoc conveys to interested parties +// the Id and location of one gnatsd +// server in the cluster. +type AgentLoc struct { + Id string `json:"serverId"` + Host string `json:"host"` + Port int `json:"port"` + + // Are we the leader? + IsLeader bool `json:"leader"` + + // LeaseExpires is zero for any + // non-leader. For the leader, + // LeaseExpires tells you when + // the leaders lease expires. + LeaseExpires time.Time `json:"leaseExpires"` + + // lower rank is leader until lease + // expires. Ties are broken by Id. + // Rank should be assignable on the + // gnatsd command line with -rank to + // let the operator prioritize + // leadership for certain hosts. + Rank int `json:"rank"` + + // Pid or process id is the only + // way to tell apart two processes + // sometimes, if they share the + // same nats server. + // + // Pid is the one difference between + // a nats.ServerLoc and a health.AgentLoc. + // + Pid int `json:"pid"` +} + +func (s *AgentLoc) String() string { + by, err := json.Marshal(s) + panicOn(err) + return string(by) +} + +func (s *AgentLoc) fromBytes(by []byte) error { + return json.Unmarshal(by, s) +} + +func alocEqual(a, b *AgentLoc) bool { + aless := AgentLocLessThan(a, b) + bless := AgentLocLessThan(b, a) + return !aless && !bless +} + +func slocEqualIgnoreLease(a, b *AgentLoc) bool { + a0 := *a + b0 := *b + a0.LeaseExpires = time.Time{} + a0.IsLeader = false + b0.LeaseExpires = time.Time{} + b0.IsLeader = false + + aless := AgentLocLessThan(&a0, &b0) + bless := AgentLocLessThan(&b0, &a0) + return !aless && !bless +} + +// the 2 types should be kept in sync. +// We return a brand new &AgentLoc{} +// with contents filled from loc. +func natsLocConvert(loc *nats.ServerLoc) *AgentLoc { + return &AgentLoc{ + Id: loc.Id, + Host: loc.Host, + Port: loc.Port, + IsLeader: loc.IsLeader, + LeaseExpires: loc.LeaseExpires, + Rank: loc.Rank, + Pid: os.Getpid(), + } +} diff --git a/health/btree.go b/health/btree.go new file mode 100644 index 00000000000..68ad4109453 --- /dev/null +++ b/health/btree.go @@ -0,0 +1,157 @@ +package health + +import ( + "bytes" + + "fmt" + "sync" + + "github.com/google/btree" +) + +// ranktree is an in-memory, sorted, +// balanced tree that is implemented +// as a left-learning red-black tree. +// It holds AgentLoc +// from candidate servers in the cluster, +// sorting them based on +// AgentLocLessThan() so they are in +// priority order and deduplicated. +type ranktree struct { + *btree.BTree + tex sync.Mutex +} + +func (a AgentLoc) Less(than btree.Item) bool { + b := than.(AgentLoc) + return AgentLocLessThan(&a, &b) +} + +// insert is idemopotent so it is safe +// to insert the same sloc multiple times and +// duplicates will be ignored. +func (t *ranktree) insert(j AgentLoc) { + t.tex.Lock() + t.ReplaceOrInsert(j) + t.tex.Unlock() +} + +// present locks, Has does not. +func (t *ranktree) present(j AgentLoc) bool { + t.tex.Lock() + b := t.Has(j) + t.tex.Unlock() + return b +} + +func (t *ranktree) minrank() (min AgentLoc) { + t.tex.Lock() + min = t.Min().(AgentLoc) + t.tex.Unlock() + + return +} + +func (t *ranktree) deleteSloc(j AgentLoc) { + t.tex.Lock() + t.Delete(j) + t.tex.Unlock() +} + +func newRanktree() *ranktree { + return &ranktree{ + BTree: btree.New(2), + } +} + +func (t *ranktree) String() string { + t.tex.Lock() + + s := "[" + t.AscendLessThan(AgentLoc{}, func(item btree.Item) bool { + cur := item.(AgentLoc) + s += cur.String() + "," + return true + }) + t.tex.Unlock() + + // replace last comma with matching bracket + n := len(s) + if n > 1 { + s = s[:n-1] + } + return s + "]" +} + +func (t *ranktree) clone() *ranktree { + r := newRanktree() + t.tex.Lock() + + t.AscendLessThan(AgentLoc{}, func(item btree.Item) bool { + cur := item.(AgentLoc) + r.insert(cur) + return true + }) + t.tex.Unlock() + return r +} + +func (t *ranktree) MarshalJSON() ([]byte, error) { + var buf bytes.Buffer + fmt.Fprintf(&buf, "%s", t) + return buf.Bytes(), nil +} + +// Len() is inherted from the btree, +// but isn't protected by mutex. +func (t *ranktree) size() int { + t.tex.Lock() + n := t.Len() + t.tex.Unlock() + return n +} + +// return a minus b, where a and b are sets. +func setDiff(a, b *members) *members { + + res := a.DedupTree.clone() + a.DedupTree.tex.Lock() + b.DedupTree.tex.Lock() + + b.DedupTree.AscendLessThan(AgentLoc{}, func(item btree.Item) bool { + v := item.(AgentLoc) + res.deleteSloc(v) + return true // keep iterating + }) + + b.DedupTree.tex.Unlock() + a.DedupTree.tex.Unlock() + return &members{DedupTree: res} +} + +func setsEqual(a, b *members) bool { + a.DedupTree.tex.Lock() + b.DedupTree.tex.Lock() + defer b.DedupTree.tex.Unlock() + defer a.DedupTree.tex.Unlock() + + alen := a.DedupTree.Len() + if alen != b.DedupTree.Len() { + return false + } + // INVAR: len(a) == len(b) + if alen == 0 { + return true + } + + missing := false + a.DedupTree.AscendLessThan(AgentLoc{}, func(item btree.Item) bool { + v := item.(AgentLoc) + if !b.DedupTree.Has(v) { + missing = true + return false // stop iterating + } + return true // keep iterating + }) + return !missing +} diff --git a/health/btree_test.go b/health/btree_test.go new file mode 100644 index 00000000000..d583308f1c7 --- /dev/null +++ b/health/btree_test.go @@ -0,0 +1,66 @@ +package health + +import ( + "testing" +) + +func Test201BtreeInsertDisplay(t *testing.T) { + s1 := AgentLoc{Id: "abc"} + s2 := AgentLoc{Id: "xyz"} + r := newRanktree() + r.insert(s2) + r.insert(s1) + + sz := r.size() + if sz != 2 { + t.Fatalf("expected 2, saw sz=%v", sz) + } + s := r.String() + if s == "[]" { + t.Fatalf("missing serialization of set elements") + } + expect := `[{"serverId":"abc","host":"","port":0,"leader":false,"leaseExpires":"0001-01-01T00:00:00Z","rank":0,"pid":0},{"serverId":"xyz","host":"","port":0,"leader":false,"leaseExpires":"0001-01-01T00:00:00Z","rank":0,"pid":0}]` + if s != expect { + t.Fatalf("serial json didn't match expectations.\n expect:'%s'\n\n observe:'%s'", expect, s) + } +} + +func Test202BtreeEqual(t *testing.T) { + s1 := AgentLoc{Id: "abc"} + s2 := AgentLoc{Id: "xyz"} + r := newRanktree() + r.insert(s2) + r.insert(s1) + + s := r.clone() + same := setsEqual(&members{DedupTree: s}, &members{DedupTree: r}) + if !same { + t.Fatalf("expected setsEqual to be true") + } +} + +func Test203SetDiff(t *testing.T) { + s1 := AgentLoc{Id: "abc"} + s2 := AgentLoc{Id: "def"} + s3 := AgentLoc{Id: "ghi"} + s4 := AgentLoc{Id: "jkl"} + + r1 := newRanktree() + r1.insert(s1) + r1.insert(s2) + r1.insert(s3) + r1.insert(s4) + + r2 := newRanktree() + r2.insert(s1) + r2.insert(s2) + + diff := setDiff(&members{DedupTree: r1}, &members{DedupTree: r2}) + if diff.DedupTree.size() != 2 { + t.Fatalf("setdiff was not the right size") + } + x := diff.DedupTree.minrank() + if !alocEqual(&x, &s3) { + t.Fatalf("setdiff was not the right element") + } +} diff --git a/health/config.go b/health/config.go new file mode 100644 index 00000000000..972a2c8b505 --- /dev/null +++ b/health/config.go @@ -0,0 +1,83 @@ +package health + +import ( + "log" + "net" + "time" + + "github.com/nats-io/gnatsd/logger" + "github.com/nats-io/gnatsd/server" +) + +const DEAF_TRUE = 1 +const DEAF_FALSE = 0 + +type MembershipCfg struct { + + // max we allow for clocks to be out of sync. + // default to 1 second if not set. + MaxClockSkew time.Duration + + // how often we heartbeat. defaults to 100msec + // if not set. + BeatDur time.Duration + + // NatsUrl example "nats://127.0.0.1:4222" + NatsUrl string + + // defaults to "_nats.cluster.members." + SysMemberPrefix string + + // LeaseTime is the minimum time the + // leader is elected for. Defaults to 10 sec. + LeaseTime time.Duration + + // provide a default until the server gives us rank + MyRank int + + // optional, if provided we will use this connection on + // the client side. + CliConn net.Conn + + // where we log stuff. + Log server.Logger + + // for testing under network partition + deaf int64 + + // how much history to save + historyCount int +} + +func (cfg *MembershipCfg) SetDefaults() { + if cfg.LeaseTime == 0 { + cfg.LeaseTime = time.Second * 12 + } + if cfg.SysMemberPrefix == "" { + cfg.SysMemberPrefix = "_nats.cluster.members." + } + if cfg.BeatDur == 0 { + cfg.BeatDur = 3000 * time.Millisecond + } + if cfg.MaxClockSkew == 0 { + cfg.MaxClockSkew = time.Second + } + if cfg.NatsUrl == "" { + cfg.NatsUrl = "nats://127.0.0.1:4222" + } + if cfg.Log == nil { + // stderr + cfg.Log = logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + } +} + +const colors = false +const micros, pid = true, true +const trace = false + +//const debug = true +const debug = false + +func (cfg *MembershipCfg) Dial(network, address string) (net.Conn, error) { + return cfg.CliConn, nil +} diff --git a/health/health.go b/health/health.go new file mode 100644 index 00000000000..6f9ba81bd6a --- /dev/null +++ b/health/health.go @@ -0,0 +1,943 @@ +package health + +import ( + "encoding/json" + "fmt" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/nats-io/go-nats" +) + +// sysMemberPrefix creates a namespace +// for system cluster membership communication. +// This prefix aims to avoid collisions +// with user-level topics. Only system +// processes / internal clients should +// write to these topics, but everyone +// is welcome to listen on them. +// +// note: `_nats` is for now, can easily +// changed to be `_SYS` later once +// we're sure everything is working. +// +const sysMemberPrefix = "_nats.cluster.members." + +// Membership tracks the nats server cluster +// membership, issuing health checks and +// choosing a leader. +type Membership struct { + Cfg MembershipCfg + + // the pongCollector holds + // all the pongs received in + // response to allcall pings + // in the most recent heartbeat + // session. + pc *pongCollector + + // actually elected leaders, should + // change only after a lease term. + elec *leadHolder + nc *nats.Conn + myLoc AgentLoc + pid int + + subjAllCall string + subjAllReply string + subjMemberLost string + subjMemberAdded string + subjMembership string + + halt *halter + mu sync.Mutex + stopping bool + + needReconnect chan bool +} + +func (m *Membership) trace(f string, arg ...interface{}) { + m.Cfg.Log.Tracef(fmt.Sprintf("my.Port:%v. ", m.myLoc.Port)+f, arg...) +} + +func (m *Membership) dlog(f string, arg ...interface{}) { + m.Cfg.Log.Debugf(fmt.Sprintf("my.Port:%v. ", m.myLoc.Port)+f, arg...) +} + +func (m *Membership) getMyLocWithAnyLease() AgentLoc { + m.mu.Lock() + myLoc := m.myLoc + m.mu.Unlock() + + lead := m.elec.getLeader() + if slocEqualIgnoreLease(&lead, &myLoc) { + myLoc.LeaseExpires = lead.LeaseExpires + myLoc.IsLeader = true + } + return myLoc +} + +func (m *Membership) getMyLocWithZeroLease() AgentLoc { + m.mu.Lock() + myLoc := m.myLoc + m.mu.Unlock() + myLoc.LeaseExpires = time.Time{} + myLoc.IsLeader = false + return myLoc +} + +// deaf means we don't ping or pong. +// It is used to simulate network +// partition and healing. +// +func (m *Membership) deaf() bool { + v := atomic.LoadInt64(&m.Cfg.deaf) + return v == DEAF_TRUE +} + +func (m *Membership) setDeaf() { + atomic.StoreInt64(&m.Cfg.deaf, DEAF_TRUE) +} + +func (m *Membership) unDeaf() { + atomic.StoreInt64(&m.Cfg.deaf, DEAF_FALSE) +} + +func NewMembership(cfg *MembershipCfg) *Membership { + m := &Membership{ + Cfg: *cfg, + halt: newHalter(), + pid: os.Getpid(), + // needReconnect should be sent on, not closed. + needReconnect: make(chan bool), + } + m.pc = m.newPongCollector() + m.elec = m.newLeadHolder(cfg.historyCount) + return m +} + +// leadHolder holds who is the current leader, +// and what their lease is. Used to synchronize +// access between various goroutines. +type leadHolder struct { + mu sync.Mutex + sloc AgentLoc + + myId string + myRank int + myLocHasBeenSet bool + + history *ringBuf + histsz int + + m *Membership +} + +func (m *Membership) newLeadHolder(histsz int) *leadHolder { + if histsz == 0 { + histsz = 100 + } + return &leadHolder{ + history: newRingBuf(histsz), + histsz: histsz, + m: m, + } +} + +func (e *leadHolder) setMyLoc(myLoc *AgentLoc) { + e.mu.Lock() + if e.myLocHasBeenSet { + panic("no double set!") + } + e.myLocHasBeenSet = true + e.myId = myLoc.Id + e.myRank = myLoc.Rank + e.mu.Unlock() +} + +// getLeader retreives the stored e.sloc value. +func (e *leadHolder) getLeader() AgentLoc { + e.mu.Lock() + defer e.mu.Unlock() + return e.sloc +} + +// setLeader aims to copy sloc and store it +// for future getLeader() calls to access. +// +// However we reject any attempt to replace +// a leader with a one that doesn't rank lower, where rank +// includes the LeaseExpires time +// (see the AgentLocLessThan() function). +// +// If we accept sloc +// we return slocWon true. If we reject sloc then +// we return slocWon false. In short, we will only +// accept sloc if AgentLocLessThan(sloc, e.sloc), +// and we return AgentLocLessThan(sloc, e.sloc). +// +// If we return slocWon false, alt contains the +// value we favored, which is the current value +// of our retained e.sloc. If we return true, +// then alt contains a copy of sloc. We +// return a value in alt to avoid data races. +// +func (e *leadHolder) setLeader(sloc AgentLoc, now time.Time) (slocWon bool, alt AgentLoc) { + e.mu.Lock() + defer e.mu.Unlock() + + if sloc.Id == "" { + e.m.trace("setLeader returning false because sloc==nil or sloc.Id==\"\"") + return false, e.sloc + } + + // check on expired leases: any new leader must + // have a lease that is not expired. + nowu := now.UnixNano() + cure := e.sloc.LeaseExpires.UnixNano() + newe := sloc.LeaseExpires.UnixNano() + + curExpired := cure <= nowu + newExpired := newe <= nowu + bothExpired := curExpired && newExpired + neitherExpired := !curExpired && !newExpired + + var newWon, oldWon bool + + switch { + case bothExpired: + e.m.trace("22222 setLeader finds both expired") + return false, e.sloc + + case neitherExpired: + newWon = AgentLocLessThan(&sloc, &e.sloc) + oldWon = AgentLocLessThan(&e.sloc, &sloc) + + case newExpired: + e.m.trace("55555 setLeader is returning false because new has expired lease.") + return false, e.sloc + + case curExpired: + newWon = true + oldWon = false + e.m.trace("44444 setLeader finds old lease expired") + } + + switch { + case !newWon && !oldWon: + // they are equal, pick the longer lease + // so we allow lease renewal + if sloc.LeaseExpires.After(e.sloc.LeaseExpires) { + slocWon = true + alt = sloc + e.sloc = sloc + + e.m.trace("999999 setLeader: same leader, > lease, renewing lease for %v\n", &e.sloc) + } else { + slocWon = false + alt = e.sloc + + e.m.trace("000000 setLeader is failing to update the leader, rejecting the new contendor.\n\nsloc='%s'\n >= \n prev:'%s'\n", sloc, &e.sloc) + } + case newWon: + slocWon = true + alt = sloc + e.sloc = sloc + + e.m.trace("11111 setLeader updated the leader, accepting new proposal.\n\nsloc='%s'\n < \n prev:'%s'\n", sloc, &e.sloc) + + default: + //oldWon + slocWon = false + alt = e.sloc + } + + // update history + if slocWon { + histcp := sloc + e.history.Append(&histcp) + } + + return +} + +func (e *leadHolder) copyLeadHistory() *ringBuf { + e.mu.Lock() + r := e.history.clone() + e.mu.Unlock() + return r +} + +func (e *leadHolder) getLeaderAsBytes() []byte { + lead := e.getLeader() + by, err := json.Marshal(&lead) + panicOn(err) + return by +} + +// Stop blocks until the Membership goroutine +// acknowledges the shutdown request. +func (m *Membership) Stop() { + m.mu.Lock() + if m.stopping { + m.mu.Unlock() + return + } + m.stopping = true + m.mu.Unlock() + m.halt.ReqStop.Close() + <-m.halt.Done.Chan +} + +func (m *Membership) Start() error { + + m.Cfg.SetDefaults() + + err := m.setupNatsClient() + if err != nil { + m.halt.Done.Close() + return err + } + go m.start() + return nil +} + +func (m *Membership) start() { + + var nc *nats.Conn = m.nc + var pc *pongCollector = m.pc + + defer func() { + m.halt.Done.Close() + }() + + m.Cfg.Log.Debugf("health-agent: Listening on [%s]\n", m.subjAllCall) + + prevCount, curCount := 0, 0 + prevMember := newMembers() + var curMember *members + var curLead AgentLoc + + var err error + var now time.Time + var expired bool + var prevLead AgentLoc + var nextLeadReportTm time.Time + + k := -1 + for { + k++ + // NB: replies to an + // allcall will only update + // the pongCollectors.from set, + // and won't change + // what the current leader + // is in elec. + m.trace("issuing k-th (k=%v) allcall", k) + err = m.allcall() + if err != nil { + // err could be: "write on closed buffer" + // typically means we are shutting down. + + m.trace("health-agent: "+ + "error on allcall, "+ + "shutting down the "+ + "health-agent: %s", + err) + return + } + + m.trace("SLEEPING for a heartbeat of %v", m.Cfg.BeatDur) + select { + case <-time.After(m.Cfg.BeatDur): + // continue below, latest heartbeat session done. + case <-m.needReconnect: + err := m.setupNatsClient() + if err != nil { + m.Cfg.Log.Debugf("health-agent: "+ + "fatal error: could not reconnect to, "+ + "our url '%s', error: %s", + m.Cfg.NatsUrl, err) + + m.halt.ReqStop.Close() + return + } + case <-m.halt.ReqStop.Chan: + return + } + lastSeenLead := m.elec.getLeader() + + // cur responses should be back by now + // and we can compare prev and cur. + curCount, curMember = pc.getSetAndClear() + now = time.Now().UTC() + m.trace("k-th (k=%v) before doing leaderLeaseCheck, curMember='%s'", k, curMember) + + expired, curLead = curMember.leaderLeaseCheck( + now, + m.Cfg.LeaseTime, + lastSeenLead, + m.Cfg.MaxClockSkew, + m, + ) + + if expired { + // record in our history + won, alt := m.elec.setLeader(curLead, now) + if !won { + m.trace("k-th (k=%v) round "+ + "conclusion of trying to "+ + "setLeader: rejected '%s'"+ + " in favor of '%s'", + k, curLead, &alt) + curLead = alt + } else { + m.trace("k-th (k=%v) round "+ + "conclusion of trying to "+ + "setLeader: accepted as "+ + "new lead '%s'", + k, curLead) + } + } + + // logging + loc, _ := m.getNatsServerLocation() + if loc != nil { + if loc.Id == curLead.Id && + curLead.Pid == m.pid { + + if now.After(nextLeadReportTm) || + prevLead.Id == "" || + prevLead.Id != curLead.Id { + + left := curLead.LeaseExpires.Sub(now) + m.dlog("health-agent: "+ + "I am LEAD, my Id: '%s', "+ + "rank %v port %v host %v "+ + "pid %v. lease expires "+ + "in %s", + loc.Id, + loc.Rank, + loc.Port, + loc.Host, + m.pid, + left) + + nextLeadReportTm = now.Add(left).Add(m.Cfg.MaxClockSkew) + } + } else { + if prevLead.Id == loc.Id && + prevLead.Pid == m.pid { + + m.dlog("health-agent: "+ + "I am no longer lead, "+ + "new LEAD is '%s', rank %v. "+ + "port %v host %v pid %v lease expires in %s", + curLead.Id, + curLead.Rank, + curLead.Port, + curLead.Host, + curLead.Pid, + curLead.LeaseExpires.Sub(now)) + + } else { + if nextLeadReportTm.IsZero() || + now.After(nextLeadReportTm) { + + left := curLead.LeaseExpires.Sub(now) + if curLead.Id == "" { + m.dlog("health-agent: "+ + "I am '%s'/rank=%v. "+ + "port %v. lead is unknown.", + m.myLoc.Id, + m.myLoc.Rank, + m.myLoc.Port) + + } else { + m.dlog("health-agent: "+ + "I am not lead. lead is '%s', "+ + "rank %v host %v port %v pid %v for %v", + curLead.Id, + curLead.Rank, + curLead.Host, + curLead.Port, + curLead.Pid, + left) + + } + nextLeadReportTm = now.Add(left).Add(m.Cfg.MaxClockSkew) + } + } + } + } + + lost := setDiff(prevMember, curMember) + gained := setDiff(curMember, prevMember) + same := setsEqual(prevMember, curMember) + + if same { + // nothing more to do. + // This is the common case when nothing changes. + } else { + lostBytes := lost.mustJsonBytes() + if !lost.setEmpty() { + if !m.deaf() { + nc.Publish(m.subjMemberLost, lostBytes) + // ignore errors on purpose; + // don't crash mid-health-report + // if at all possible. + } + } + gainedBytes := gained.mustJsonBytes() + if !gained.setEmpty() { + if !m.deaf() { + nc.Publish(m.subjMemberAdded, gainedBytes) + // same error approach as above. + } + } + } + if curCount < prevCount { + m.dlog("health-agent: ---- "+ + "PAGE PAGE PAGE!! we went "+ + "down a server, from %v -> %v. "+ + "lost: '%s'", + prevCount, + curCount, + lost) + + } else if curCount > prevCount && prevCount > 0 { + m.dlog("health-agent: ++++ "+ + "MORE ROBUSTNESS GAINED; "+ + "we went from %v -> %v. "+ + "gained: '%s'", + prevCount, + curCount, + gained) + + } + + if expired { + curBytes := curMember.mustJsonBytes() + if !m.deaf() { + nc.Publish(m.subjMembership, curBytes) + } + } + + // done with compare, now loop + prevCount = curCount + prevMember = curMember + prevLead = curLead + } +} + +func pong(nc *nats.Conn, subj string, msg []byte) { + nc.Publish(subj, msg) + nc.Flush() + // ignore errors, probably shutting down. + // e.g. "nats: connection closed on shutdown." + // or "nats: connection closed" +} + +// allcall sends out a health ping on the +// subjAllCall topic. +// +// The ping consists of sending the AgentLoc +// forf the current leader, which provides lease +// and full contact info for the leader. +// +// This gives a round-trip connectivity check. +// +func (m *Membership) allcall() error { + lead := m.elec.getLeader() + m.trace("ISSUING ALLCALL on '%s' with leader '%s'\n", m.subjAllCall, &lead) + + leadby, err := json.Marshal(&lead) + panicOn(err) + + // allcall broadcasts the current leader + lease + return m.nc.PublishRequest(m.subjAllCall, m.subjAllReply, leadby) +} + +// pongCollector collects the responses +// from an allcall request. +type pongCollector struct { + replies int + + fromNoTime *members + + mu sync.Mutex + mship *Membership +} + +func (m *Membership) newPongCollector() *pongCollector { + return &pongCollector{ + fromNoTime: newMembers(), + mship: m, + } +} + +func (pc *pongCollector) insert(sloc AgentLoc) { + // insert into both our trees, one + // keeping the lease time, the other not. + cp := sloc + cp.LeaseExpires = time.Time{} + cp.IsLeader = false + pc.fromNoTime.insert(cp) +} + +// acumulate pong responses +func (pc *pongCollector) receivePong(msg *nats.Msg) { + pc.mu.Lock() + + pc.replies++ + + var loc AgentLoc + err := loc.fromBytes(msg.Data) + if err == nil { + pc.insert(loc) + } else { + panic(err) + } + pc.mship.trace("PONG COLLECTOR RECEIVED ALLCALL REPLY '%s'", &loc) + + pc.mu.Unlock() +} + +func (pc *pongCollector) clear() { + pc.mu.Lock() + pc.fromNoTime.clear() + pc.mu.Unlock() +} + +// getSet returns the count and set so far, then +// clears the set, emptying it, and then adding +// back just myLoc +func (pc *pongCollector) getSetAndClear() (int, *members) { + + mem := pc.fromNoTime.clone() + pc.clear() + + // we don't necessarily need to seed, + // since we'll hear our own allcall. + // But we may want to seed in case + // the connection to gnatsd is down + // This happens under test when we + // are deaf. Hence we seed so we + // can elect ourselves when the + // connection to gnatsd is not available. + // + // add myLoc to pc.from as a part of the reset: + myLoc := pc.mship.getMyLocWithZeroLease() + pc.insert(myLoc) + + pc.mship.trace("in getSetAndClear, here are the contents of mem.DedupTree: '%s'", mem.DedupTree) + + // return the old member set + return mem.DedupTree.Len(), mem +} + +// leaderLeaseCheck evaluates the lease as of now, +// and returns the leader or best candiate. Returns +// expired == true if any prior leader lease has +// lapsed. In this case we return the best new +// leader with its IsLeader bit set and its +// LeaseExpires set to now + lease. +// +// If expired == false then the we return +// the current leader in lead. +// +// PRE: there are only 0 or 1 leaders in m.DedupTree +// who have a non-zero LeaseExpires field. +// +// If m.DedupTree is empty, we return (true, nil). +// +// This method is where the actual "election" +// happens. See the AgentLocLessThan() +// function below for exactly how +// we rank candidates. +// +func (mems *members) leaderLeaseCheck( + now time.Time, + leaseLen time.Duration, + prevLead AgentLoc, + maxClockSkew time.Duration, + m *Membership, + +) (expired bool, lead AgentLoc) { + + if prevLead.LeaseExpires.Add(maxClockSkew).After(now) { + // honor the leases until they expire + m.trace("leaderLeaseCheck: honoring outstanding lease") + return false, prevLead + } + + if mems.DedupTree.Len() == 0 { + m.trace("leaderLeaseCheck: m.DedupTree.Len is 0") + return false, prevLead + } + + // INVAR: any lease has expired. + expired = true + lead = mems.DedupTree.minrank() + lead.IsLeader = true + lead.LeaseExpires = now.Add(leaseLen).UTC() + + return +} + +type byRankThenId struct { + s []*AgentLoc + now time.Time +} + +func (p byRankThenId) Len() int { return len(p.s) } +func (p byRankThenId) Swap(i, j int) { p.s[i], p.s[j] = p.s[j], p.s[i] } + +// Less must be stable and computable locally yet +// applicable globally: it is how we choose a leader +// in a stable fashion. +func (p byRankThenId) Less(i, j int) bool { + return AgentLocLessThan(p.s[i], p.s[j]) +} + +// AgentLocLessThan returns true iff i < j, in +// terms of leader preference where lowest is +// more electable/preferred as leader. +func AgentLocLessThan(i, j *AgentLoc) bool { + + // recognize empty AgentLoc and sort them high, not low. + iempt := i.Id == "" + jempt := j.Id == "" + if iempt && jempt { + return false // "" == "" + } + if jempt { + return true // "123" < "" + } + if iempt { + return false // "" > "123" + } + + if i.Rank != j.Rank { + return i.Rank < j.Rank + } + if i.Id != j.Id { + return lessThanString(i.Id, j.Id) + } + if i.Host != j.Host { + return lessThanString(i.Host, j.Host) + } + if i.Port != j.Port { + return i.Port < j.Port + } + if i.Pid != j.Pid { + return i.Pid < j.Pid + } + itm := i.LeaseExpires.UnixNano() + jtm := j.LeaseExpires.UnixNano() + return itm > jtm // want the later expiration to have priority +} + +// return i < j where empty strings are big not small. +func lessThanString(i, j string) bool { + iempt := i == "" + jempt := j == "" + if iempt || jempt { + if jempt { + return true // "123" < "" + } + return false + } + return i < j +} + +func (m *Membership) setupNatsClient() error { + var pc *pongCollector = m.pc + + discon := func(nc *nats.Conn) { + select { + case m.needReconnect <- true: + case <-m.halt.ReqStop.Chan: + return + } + } + optdis := nats.DisconnectHandler(discon) + norand := nats.DontRandomize() + + // We don't want to get connected to + // some different server in the pool, + // so any reconnect, if needed, will + // need to be handled manually by us by + // attempting to contact the + // exact same address as we are + // configured with; see the m.needReconnect + // channel. + // Otherwise we are monitoring + // the health of the wrong server. + // + optrecon := nats.NoReconnect() + + opts := []nats.Option{optdis, optrecon, norand} + if m.Cfg.CliConn != nil { + opts = append(opts, nats.Dialer(&m.Cfg)) + } + + nc, err := nats.Connect(m.Cfg.NatsUrl, opts...) + if err != nil { + msg := fmt.Errorf("Can't connect to "+ + "nats on url '%s': %v", + m.Cfg.NatsUrl, + err) + m.Cfg.Log.Errorf(msg.Error()) + return msg + } + m.nc = nc + loc, err := m.getNatsServerLocation() + if err != nil { + return err + } + m.setLoc(loc) + m.Cfg.Log.Debugf("health-agent: HELLOWORLD: "+ + "I am '%s' at '%v:%v'. "+ + "rank %v", + m.myLoc.Id, + m.myLoc.Host, + m.myLoc.Port, + m.myLoc.Rank) + + m.subjAllCall = sysMemberPrefix + "allcall" + m.subjAllReply = sysMemberPrefix + "allreply" + m.subjMemberLost = sysMemberPrefix + "lost" + m.subjMemberAdded = sysMemberPrefix + "added" + m.subjMembership = sysMemberPrefix + "list" + + nc.Subscribe(m.subjAllReply, func(msg *nats.Msg) { + if m.deaf() { + return + } + pc.receivePong(msg) + }) + + // allcall says: "who is out there? Are you a lead?" + nc.Subscribe(m.subjAllCall, func(msg *nats.Msg) { + m.trace("ALLCALL RECEIVED. msg:'%s'", string(msg.Data)) + if m.deaf() { + return + } + + // sanity check that we haven't moved. + loc, err := m.getNatsServerLocation() + if err != nil { + return // try again next time. + } + + // did we accidentally change + // server locacations? + // Yikes, we don't want to do that! + // We are supposed to be monitoring + // just our own server. + if m.locDifferent(loc) { + panic(fmt.Sprintf("\n very bad! health-agent "+ + "changed locations! "+ + "first: '%s',\n\nvs\n now:'%s'\n", + &m.myLoc, + loc)) + } + // Done with sanity check. + // INVAR: we haven't moved, and + // loc matches m.myLoc. + + locWithLease := m.getMyLocWithAnyLease() + + hp, err := json.Marshal(&locWithLease) + panicOn(err) + if !m.deaf() { + m.trace("REPLYING TO ALLCALL on '%s' with my details: '%s'", msg.Reply, &locWithLease) + pong(nc, msg.Reply, hp) + } + }) + + // reporting + nc.Subscribe(m.subjMemberLost, func(msg *nats.Msg) { + if m.deaf() { + return + } + m.Cfg.Log.Tracef("health-agent: "+ + "Received on [%s]: '%s'", + msg.Subject, + string(msg.Data)) + }) + + // reporting + nc.Subscribe(m.subjMemberAdded, func(msg *nats.Msg) { + if m.deaf() { + return + } + m.Cfg.Log.Tracef("health-agent: Received on [%s]: '%s'", + msg.Subject, string(msg.Data)) + }) + + // reporting + nc.Subscribe(m.subjMembership, func(msg *nats.Msg) { + if m.deaf() { + return + } + m.Cfg.Log.Tracef("health-agent: "+ + "Received on [%s]: '%s'", + msg.Subject, + string(msg.Data)) + }) + + return nil +} + +func (m *Membership) locDifferent(b *nats.ServerLoc) bool { + m.mu.Lock() + defer m.mu.Unlock() + if b.Id != m.myLoc.Id { + return true + } + if b.Rank != m.myLoc.Rank { + return true + } + if b.Host != m.myLoc.Host { + return true + } + if b.Port != m.myLoc.Port { + return true + } + return false +} + +func (m *Membership) setLoc(b *nats.ServerLoc) { + m.mu.Lock() + m.myLoc.Id = b.Id + m.myLoc.Rank = b.Rank + m.myLoc.Host = b.Host + m.myLoc.Port = b.Port + m.myLoc.Pid = os.Getpid() + m.mu.Unlock() + m.elec.setMyLoc(&m.myLoc) +} + +func (m *Membership) getNatsServerLocation() (*nats.ServerLoc, error) { + loc, err := m.nc.ServerLocation() + if err != nil { + return nil, err + } + // fill in the rank because server + // doesn't have the rank correct under + // various test scenarios where we + // spin up an embedded gnatsd. + // + // This is still correct in non-test, + // since the health-agent will + // have read from the command line + // -rank options and then + // configured Cfg.MyRank when running + // embedded as an internal client. + loc.Rank = m.Cfg.MyRank + return loc, nil +} diff --git a/health/health_test.go b/health/health_test.go new file mode 100644 index 00000000000..ec04ebb7d35 --- /dev/null +++ b/health/health_test.go @@ -0,0 +1,623 @@ +package health + +import ( + "encoding/json" + "fmt" + "log" + "math/rand" + "net" + "testing" + "time" + + cv "github.com/glycerine/goconvey/convey" + "github.com/nats-io/gnatsd/logger" + "github.com/nats-io/gnatsd/server" + gnatsd "github.com/nats-io/gnatsd/test" + "github.com/nats-io/go-nats" +) + +const TEST_PORT = 8198 +const DefaultTimeout = 2 * time.Second + +// warning: don't assume TEST_PORT is correct now... +// we use a dynamic port to avoid test conflicts. +var cliOpts = nats.Options{ + Url: fmt.Sprintf("nats://localhost:%d", TEST_PORT), + AllowReconnect: true, + MaxReconnect: 10, + ReconnectWait: 10 * time.Millisecond, + Timeout: DefaultTimeout, +} + +// DefaultTestOptions are default options for the unit tests. +// Warning: don't assume TEST_PORT is correct now... +// we use a dynamic port to avoid test conflicts. +var serverOpts = server.Options{ + Host: "localhost", + Port: TEST_PORT, + NoLog: true, + NoSigs: true, + MaxControlLine: 256, +} + +func Test101StressTestManyClients(t *testing.T) { + + cv.Convey("when stress testing with 50 clients coming up and shutting down, we should survive and prosper", t, func() { + + tport, ls := getAvailPort() + ls.Close() + s := RunServerOnPort(tport) + defer s.Shutdown() + + n := 50 + var ms []*Membership + for i := 0; i < n; i++ { + cli, srv, err := NewInternalClientPair() + panicOn(err) + + s.InternalCliRegisterCallback(srv) + cfg := &MembershipCfg{ + CliConn: cli, + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 30 * time.Millisecond, + BeatDur: 10 * time.Millisecond, + NatsUrl: fmt.Sprintf("nats://localhost:%v", tport), + MyRank: i, // ranks 0..n-1 + } + + m := NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + ms = append(ms, m) + defer m.Stop() + } + // the test here is basically that we didn't crash + // or hang. So if we got here, success. + cv.So(true, cv.ShouldBeTrue) + }) +} + +func Test102ConvergenceToOneLowRankLeaderAndLiveness(t *testing.T) { + + cv.Convey("Given a cluster of one server with rank 0, no matter what other servers arrive thinking they are the leader (say, after a partition is healed), as long as those other nodes have rank 1, our rank 0 process will persist in leading and all other arrivals will give up their leadership claims (after their leases expire). In addition to safety, this is also a liveness check: After a single lease term + clockskew, a leader will have been chosen.", t, func() { + + tport, ls := getAvailPort() + ls.Close() + + s := RunServerOnPort(tport) + defer func() { + p("starting gnatsd shutdown...") + s.Shutdown() + }() + + n := 10 + tot := 50 + pause := make([]int, n) + for i := 0; i < n; i++ { + pause[i] = 20 + rand.Intn(50) + tot += pause[i] + } + + var ms []*Membership + for i := 0; i < n; i++ { + + cfg := &MembershipCfg{ + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 800 * time.Millisecond, + BeatDur: 200 * time.Millisecond, + NatsUrl: fmt.Sprintf("nats://localhost:%v", tport), + MyRank: i, // ranks 0,1,2,3,... + deaf: DEAF_TRUE, // don't ping or pong + historyCount: 10000, + } + + cli, srv, err := NewInternalClientPair() + panicOn(err) + + s.InternalCliRegisterCallback(srv) + cfg.CliConn = cli + + if i == 0 { + cfg.deaf = DEAF_FALSE + aLogger := logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + _ = aLogger + // to follow the prints, uncomment: + cfg.Log = aLogger + } + + m := NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + ms = append(ms, m) + defer func() { + m.Stop() + }() + } + + // let them all get past init phase. + time.Sleep(5 * (ms[0].Cfg.LeaseTime + ms[0].Cfg.MaxClockSkew)) + + // verify liveness, a leader exists. + //p("verifying everyone thinks there is a leader:") + for i := 0; i < n; i++ { + h := ms[i].elec.copyLeadHistory() + //fmt.Printf("verifying %v thinks there is a leader. avail = %v\n", i, h.Avail()) + cv.So(h.Avail(), cv.ShouldBeGreaterThan, 0) + } + + // bring in jobs after their random pause time + for i := 0; i < n; i++ { + dur := time.Duration(pause[i]) * time.Millisecond + //p("%v on i = %v/dur=%v ", time.Now().UTC(), i, dur) + time.Sleep(dur) + ms[i].unDeaf() + } + + // let the laggards get in a few more cycles, so + // we get enough history to evaluate. + time.Sleep(20 * (ms[0].Cfg.LeaseTime + ms[0].Cfg.MaxClockSkew)) + + // check that the history from the rank 0 + // always shows rank 0 as lead. + h := ms[0].elec.copyLeadHistory() + av := h.Avail() + //p("ms[0].myLoc.Port = %v", ms[0].myLoc.Port) + cv.So(ms[0].myLoc.Id, cv.ShouldNotEqual, "") + cv.So(av, cv.ShouldBeGreaterThan, 9) + //p("av: available history len = %v", av) + + // prints first: + + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + _ = sloc + //fmt.Printf("history print i = %v. sloc.Id=%v / sloc.Rank=%v, port=%v\n", i, sloc.Id, sloc.Rank, sloc.Port) + } + // checks second: + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + //fmt.Printf("history check Id at i = %v. sloc.Id=%v\n", i, sloc.Id) + cv.So(sloc.Id, cv.ShouldEqual, ms[0].myLoc.Id) + // ports will be the only thing different when + // running off of the one gnatsd that has the + // same rank and Id for all clients. + cv.So(sloc.Port, cv.ShouldEqual, ms[0].myLoc.Port) + } + + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + //p("history check Rank at i = %v. sloc.Rank=%v", i, sloc.Rank) + cv.So(sloc.Rank, cv.ShouldEqual, 0) + } + + // check that the other ranks have + // histories that converge + // on the rank 0 process quickly + for j := 1; j < n; j++ { + h := ms[j].elec.copyLeadHistory() + av := h.Avail() + //p("ms[j=%v].myLoc.Port = %v has history av = %v", j, ms[j].myLoc.Port, av) + cv.So(ms[j].myLoc.Id, cv.ShouldNotEqual, "") + cv.So(av, cv.ShouldBeGreaterThan, 12) + //p("av: available history len = %v", av) + + // prints first: + + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + _ = sloc + //fmt.Printf("history print i = %v. sloc.Id=%v / sloc.Rank=%v, port=%v\n", i, sloc.Id, sloc.Rank, sloc.Port) + } + // checks second: + + // after the preample of heartbeats, everybody + // should have chosen the rank 0 leader. + // start scanning from 10,... + for i := 10; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + //fmt.Printf("j=%v, history check Id at i = %v. sloc.Port=%v/rank %v vs. ms[0].myLoc.Port=%v/rank %v\n", j, i, sloc.Port, sloc.Rank, ms[0].myLoc.Port, ms[0].myLoc.Rank) + + // ports will be the only thing different when + // running off of the one gnatsd that has the + // same rank and Id for all clients. + cv.So(sloc.Port, cv.ShouldEqual, ms[0].myLoc.Port) + } + + for i := 10; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + //p("j=%v history check Rank at i = %v. sloc.Rank=%v", j, i, sloc.Rank) + cv.So(sloc.Rank, cv.ShouldEqual, 0) + } + } + }) +} + +func Test103TiedRanksUseIdAndDoNotAlternate(t *testing.T) { + + cv.Convey("Given a cluster of two servers with rank 0 and different IDs, one should win after the initial period, and they should not alternate leadership as they carry forward.", t, func() { + + tport, ls := getAvailPort() + ls.Close() + + s := RunServerOnPort(tport) + defer func() { + p("starting gnatsd shutdown...") + s.Shutdown() + }() + + n := 2 + + aLogger := logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + var ms []*Membership + for i := 0; i < n; i++ { + + cfg := &MembershipCfg{ + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 400 * time.Millisecond, + BeatDur: 100 * time.Millisecond, + NatsUrl: fmt.Sprintf("nats://localhost:%v", tport), + MyRank: 0, + historyCount: 10000, + } + + cli, srv, err := NewInternalClientPair() + panicOn(err) + + s.InternalCliRegisterCallback(srv) + cfg.CliConn = cli + + cfg.Log = aLogger + + m := NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + ms = append(ms, m) + defer m.Stop() + } + + // let them get past init phase. + time.Sleep(2 * (ms[0].Cfg.LeaseTime + ms[0].Cfg.MaxClockSkew)) + + // verify liveness, a leader exists. + p("at %v, verifying everyone thinks there is a leader:", time.Now().UTC()) + for i := 0; i < n; i++ { + h := ms[i].elec.copyLeadHistory() + //fmt.Printf("verifying %v thinks there is a leader, avail history len= %v\n", i, h.Avail()) + cv.So(h.Avail(), cv.ShouldBeGreaterThan, 0) + } + + rounds := 10 + // sleep for rounds lease cycles - check for alternation + time.Sleep(time.Duration(rounds+1) * (ms[0].Cfg.LeaseTime + ms[0].Cfg.MaxClockSkew)) + + // who should be winner after lease expiration... + zeroWins := AgentLocLessThan(&ms[0].myLoc, &ms[1].myLoc) + p("zeroWins: %v, [0].myLoc=%v [1].myLoc=%v", zeroWins, &ms[0].myLoc, &ms[1].myLoc) + winner := &ms[1].myLoc + if zeroWins { + winner = &ms[0].myLoc + } + + for j := 0; j < n; j++ { + + // check that the history doesn't alternate + // between ports / servers. + h := ms[j].elec.copyLeadHistory() + av := h.Avail() + p("ms[j=%v].myLoc.Id = %v", j, ms[j].myLoc.Id) + p("av: j=%v, available history len = %v", j, av) + cv.So(av, cv.ShouldBeGreaterThan, rounds) + + // prints first: + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + _ = sloc + //fmt.Printf("server j=%v, history print i = %v. / sloc.Port=%v, winner.Port=%v\n", j, i, sloc.Port, winner.Port) + } + } + for j := 0; j < n; j++ { + + // check that the history doesn't alternate + // between ports / servers. + h := ms[j].elec.copyLeadHistory() + av := h.Avail() + + // checks second: + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + //fmt.Printf("server j=%v, history check Id at i = %v. sloc.Port=%v, winner.Port=%v\n", j, i, sloc.Port, winner.Port) + cv.So(sloc.Port, cv.ShouldEqual, winner.Port) + } + + } + }) +} + +func RunServerOnPort(port int) *server.Server { + opts := serverOpts + opts.Port = port + return gnatsd.RunServer(&opts) +} + +func StartClusterOnPort(port, clusterPort int) *server.Server { + opts := serverOpts + opts.Port = port + + opts.Cluster = server.ClusterOpts{ + Port: clusterPort, + Host: opts.Host, + } + return gnatsd.RunServer(&opts) +} +func AddToClusterOnPort( + port, clusterPort int, routesStr string, +) *server.Server { + + opts := serverOpts + opts.Port = port + opts.Routes = server.RoutesFromStr(routesStr) + opts.Cluster = server.ClusterOpts{ + Port: clusterPort, + Host: opts.Host, + } + return gnatsd.RunServer(&opts) +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func max(a, b int) int { + if a > b { + return a + } + return b +} + +func Test104ReceiveOwnSends(t *testing.T) { + + cv.Convey("If we transmit on a topic we are subscribed to, then we should receive our own send.", t, func() { + + tport, ls := getAvailPort() + ls.Close() + + s := RunServerOnPort(tport) + defer func() { + p("starting gnatsd shutdown...") + s.Shutdown() + }() + cli, srv, err := NewInternalClientPair() + panicOn(err) + s.InternalCliRegisterCallback(srv) + + cfg := &MembershipCfg{ + CliConn: cli, + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 30 * time.Millisecond, + BeatDur: 10 * time.Millisecond, + NatsUrl: fmt.Sprintf("nats://localhost:%v", tport), + MyRank: 0, + } + + aLogger := logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + cfg.Log = aLogger + + m := NewMembership(cfg) + + // like m.Start() but manually: + m.Cfg.SetDefaults() + + // unroll setupNatsClient... + + discon := func(nc *nats.Conn) { + m.Cfg.Log.Tracef("health-agent: Disconnected from nats!") + } + optdis := nats.DisconnectHandler(discon) + norand := nats.DontRandomize() + + recon := func(nc *nats.Conn) { + loc, err := nc.ServerLocation() + panicOn(err) + m.Cfg.Log.Tracef("health-agent: Reconnect to nats!: loc = '%s'", loc) + } + optrecon := nats.ReconnectHandler(recon) + + opts := []nats.Option{optdis, optrecon, norand} + if m.Cfg.CliConn != nil { + opts = append(opts, nats.Dialer(&m.Cfg)) + } + + nc, err := nats.Connect(m.Cfg.NatsUrl, opts...) + panicOn(err) + + loc, err := nc.ServerLocation() + panicOn(err) + loc.Rank = m.Cfg.MyRank + m.setLoc(loc) + + m.subjAllCall = sysMemberPrefix + "allcall" + m.subjAllReply = sysMemberPrefix + "allreply" + m.subjMemberLost = sysMemberPrefix + "lost" + m.subjMemberAdded = sysMemberPrefix + "added" + m.subjMembership = sysMemberPrefix + "list" + m.nc = nc + + gotAllCall := make(chan bool) + repliedInAllCall := make(chan bool) + gotAllRep := make(chan bool) + nc.Subscribe(m.subjAllReply, func(msg *nats.Msg) { + p("I received on subjAllReply: msg='%#v'", string(msg.Data)) + close(gotAllRep) + }) + + nc.Subscribe(m.subjAllCall, func(msg *nats.Msg) { + close(gotAllCall) + p("test104, port %v, at 999 allcall received '%s'", m.myLoc.Port, string(msg.Data)) + loc, err := nc.ServerLocation() + panicOn(err) + hp, err := json.Marshal(loc) + panicOn(err) + + p("test104, port %v, at 222 in allcall handler: replying to allcall with our loc: '%s'", m.myLoc.Port, loc) + pong(nc, msg.Reply, hp) + close(repliedInAllCall) + }) + + now := time.Now().UTC() + // send on subjAllCall + sl := AgentLoc{ + Id: "abc", + Host: "here", + Port: 99, + Rank: -100, + LeaseExpires: now.Add(time.Hour), + } + won, _ := m.elec.setLeader(sl, now) + if !won { + panic("must be able to set leader") + } + m.allcall() + <-gotAllCall + <-repliedInAllCall + // expect to have gotAllRep closed. + <-gotAllRep + }) +} + +func Test105OnlyConnectToOriginalGnatsd(t *testing.T) { + + cv.Convey("If a heath-agent is disconnected from gnatsd, it should only ever reconnect to that same gnatsd--the server whose health it is responsible for monitoring.", t, func() { + tport, ls := getAvailPort() + ls.Close() + tport2, ls2 := getAvailPort() + ls2.Close() + + cluster1Port, lsn1 := getAvailPort() + cluster2Port, lsn2 := getAvailPort() + // now that we've bound different available ports, + // we can close the listeners to free these up. + lsn1.Close() + lsn2.Close() + routesString := fmt.Sprintf("nats://127.0.0.1:%v", cluster1Port) + s := StartClusterOnPort(tport, cluster1Port) + s2 := AddToClusterOnPort(tport2, cluster2Port, routesString) + defer s2.Shutdown() + + cli, srv, err := NewInternalClientPair() + panicOn(err) + s.InternalCliRegisterCallback(srv) + cfg := &MembershipCfg{ + CliConn: cli, + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 30 * time.Millisecond, + BeatDur: 10 * time.Millisecond, + NatsUrl: fmt.Sprintf("nats://localhost:%v", tport), + } + aLogger := logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + cfg.Log = aLogger + + m := NewMembership(cfg) + err = m.Start() + panicOn(err) + defer m.Stop() + + _, err = m.nc.ServerLocation() + panicOn(err) + + time.Sleep(100 * time.Millisecond) + s.Shutdown() + // allow time for any unwanted + // auto-reconnect to be attempted; + // we are testing that no reconnect + // happens. + time.Sleep(300 * time.Millisecond) + + _, err = m.nc.ServerLocation() + p("attempting to contact failed server; err is '%v'", err) + // this *should* *have* *failed* + cv.So(err, cv.ShouldNotBeNil) + cv.So(err.Error(), cv.ShouldEqual, "nats: invalid connection") + select { + case <-m.halt.Done.Chan: + p("good: Membership shut itself down.") + } + }) +} + +func getAvailPort() (int, net.Listener) { + l, _ := net.Listen("tcp", ":0") + r := l.Addr() + return r.(*net.TCPAddr).Port, l +} + +func Test106AgentLocLessThan(t *testing.T) { + + cv.Convey("To properly handle empty AgentLoc, AgentLocLessThan should sort sloc with and Id as smaller (more preferred) compared to an sloc (AgentLoc) with an empty string Id, even if their ranks are different.", t, func() { + var s1, s2 AgentLoc + s1.Id = "a" + s1.Rank = 1 + cv.So(AgentLocLessThan(&s1, &s2), cv.ShouldBeTrue) + }) +} + +func Test107OneNodeAloneWaitsLeaseTermBeforeRenewal(t *testing.T) { + + cv.Convey("Given a cluster of one server, it should elect itself leader and then wait a full lease term before considering who to elect again", t, func() { + + tport, ls := getAvailPort() + ls.Close() + + s := RunServerOnPort(tport) + defer func() { + p("starting gnatsd shutdown...") + s.Shutdown() + }() + + cfg := &MembershipCfg{ + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 3000 * time.Millisecond, + BeatDur: 1000 * time.Millisecond, + NatsUrl: fmt.Sprintf("nats://localhost:%v", tport), + MyRank: 0, + historyCount: 10000, + } + + cli, srv, err := NewInternalClientPair() + panicOn(err) + + s.InternalCliRegisterCallback(srv) + cfg.CliConn = cli + + aLogger := logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + _ = aLogger + // to follow the prints, uncomment: + cfg.Log = aLogger + + m := NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + defer func() { + m.Stop() + }() + + // let it get past init phase. + time.Sleep(3 * (m.Cfg.LeaseTime + m.Cfg.MaxClockSkew)) + + h := m.elec.copyLeadHistory() + av := h.Avail() + fmt.Printf("verifying at most 3 leader changes: %v\n", av) + cv.So(av, cv.ShouldBeLessThan, 4) + + }) +} diff --git a/health/healthcmd/main.go b/health/healthcmd/main.go new file mode 100644 index 00000000000..4b3cc1c0e26 --- /dev/null +++ b/health/healthcmd/main.go @@ -0,0 +1,61 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "strconv" + + "github.com/nats-io/gnatsd/health" + "github.com/nats-io/gnatsd/logger" +) + +// healthcmd runs an allcall election from a standalone +// command line nats client. It exercises the same +// gnatsd/health library code that runs as an internal client +// in process with gnatsd. + +func usage() { + log.Fatalf("use: healthcmd {host}:port {rank}\n") +} + +func main() { + + log.SetFlags(0) + flag.Usage = usage + flag.Parse() + + args := flag.Args() + if len(args) < 1 { + usage() + } + + rank := 0 + var err error + if len(args) >= 2 { + rank, err = strconv.Atoi(args[1]) + if err != nil { + fmt.Fprintf(os.Stderr, "2nd arg should be our numeric rank") + } + } + + const colors = false + const micros, pid = true, true + const trace = false + const debug = true + aLogger := logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + + cfg := &health.MembershipCfg{ + NatsUrl: "nats://" + args[0], // "nats://127.0.0.1:4222" + MyRank: rank, + Log: aLogger, + } + m := health.NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + + select {} +} diff --git a/health/icc.go b/health/icc.go new file mode 100644 index 00000000000..60bc5b4a63f --- /dev/null +++ b/health/icc.go @@ -0,0 +1,64 @@ +package health + +import ( + "net" +) + +// Icc allows the server to +// detect a net.Conn as +// an internal client connection +// by checking if it implements the +// LocalInternalClient interface. +// +type Icc struct { + *net.TCPConn +} + +// IsInternal satisfy LocalInternalClient interface +func (c *Icc) IsInternal() {} + +// NewInternalClientPair() constructs a client/server +// pair that wrap tcp endpoints in Icc to let +// the server recognized them as internal. +// +func NewInternalClientPair() (cli, srv *Icc, err error) { + + lsn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return + } + + srvDone := make(chan struct{}) + go func() { + s, err2 := lsn.Accept() + if err2 == nil { + srv = &Icc{TCPConn: s.(*net.TCPConn)} + } else { + err = err2 + } + lsn.Close() + close(srvDone) + }() + + addr := lsn.Addr() + c, err3 := net.Dial(addr.Network(), addr.String()) + <-srvDone + + if err3 != nil { + err = err3 + if srv != nil { + srv.Close() + srv = nil + } + return + } + cli = &Icc{TCPConn: c.(*net.TCPConn)} + // INVAR: cli ok. + if err != nil { + cli.Close() + cli = nil + return + } + // INVAR: srv ok. + return +} diff --git a/health/icc_test.go b/health/icc_test.go new file mode 100644 index 00000000000..4ffae665c47 --- /dev/null +++ b/health/icc_test.go @@ -0,0 +1,48 @@ +package health + +import ( + "net" + "testing" + + "github.com/nats-io/gnatsd/server" +) + +func TestIccTypeSwitchWorks(t *testing.T) { + var nc net.Conn = &Icc{} + _, isIcc := nc.(server.LocalInternalClient) + if !isIcc { + t.Fatalf("nc was not LocalInternalClient, as it should be!") + } +} + +func TestIccAsNetConn(t *testing.T) { + + // write to a, read from b + a, b, err := NewInternalClientPair() + if err != nil { + panic(err) + } + + msg := "hello-world" + + n, err := a.Write([]byte(msg)) + if err != nil { + t.Errorf("err = %v", err) + } + if n != len(msg) { + t.Errorf("Write truncated at %v < %v", n, len(msg)) + } + + readbuf := make([]byte, len(msg)) + m, err := b.Read(readbuf) + if err != nil { + t.Errorf("err = %v", err) + } + if m != n { + t.Errorf("Read truncated at %v !=n %v", m, n) + } + back := string(readbuf[:m]) + if back != msg { + t.Errorf("msg corrupted, wrote '%v', read '%v'", msg, back) + } +} diff --git a/health/idem.go b/health/idem.go new file mode 100644 index 00000000000..107399d3215 --- /dev/null +++ b/health/idem.go @@ -0,0 +1,106 @@ +package health + +import ( + "fmt" + "sync" +) + +// Copyright (c) 2017 Jason E. Aten, Ph.D. +// https://github.com/glycerine/idem +// MIT license. + +// idemCloseChan can have Close() called on it +// multiple times, and it will only close +// Chan once. +type idemCloseChan struct { + Chan chan bool + closed bool + mut sync.Mutex +} + +// Reinit re-allocates the Chan, assinging +// a new channel and reseting the state +// as if brand new. +func (c *idemCloseChan) Reinit() { + c.mut.Lock() + defer c.mut.Unlock() + c.Chan = make(chan bool) + c.closed = false +} + +// NewIdemCloseChan makes a new idemCloseChan. +func NewIdemCloseChan() *idemCloseChan { + return &idemCloseChan{ + Chan: make(chan bool), + } +} + +var ErrAlreadyClosed = fmt.Errorf("Chan already closed") + +// Close returns ErrAlreadyClosed if it has been +// called before. It never closes IdemClose.Chan more +// than once, so it is safe to ignore the returned +// error value. Close() is safe for concurrent access by multiple +// goroutines. Close returns nil after the first time +// it is called. +func (c *idemCloseChan) Close() error { + c.mut.Lock() + defer c.mut.Unlock() + if !c.closed { + close(c.Chan) + c.closed = true + return nil + } + return ErrAlreadyClosed +} + +// IsClosed tells you if Chan is already closed or not. +func (c *idemCloseChan) IsClosed() bool { + c.mut.Lock() + defer c.mut.Unlock() + return c.closed +} + +// halter helps shutdown a goroutine +type halter struct { + // The owning goutine should call Done.Close() as its last + // actual once it has received the ReqStop() signal. + Done idemCloseChan + + // Other goroutines call ReqStop.Close() in order + // to request that the owning goroutine stop immediately. + // The owning goroutine should select on ReqStop.Chan + // in order to recognize shutdown requests. + ReqStop idemCloseChan +} + +func newHalter() *halter { + return &halter{ + Done: *NewIdemCloseChan(), + ReqStop: *NewIdemCloseChan(), + } +} + +// RequestStop closes the h.ReqStop channel +// if it has not already done so. Safe for +// multiple goroutine access. +func (h *halter) RequestStop() { + h.ReqStop.Close() +} + +// MarkDone closes the h.Done channel +// if it has not already done so. Safe for +// multiple goroutine access. +func (h *halter) MarkDone() { + h.Done.Close() +} + +// IsStopRequested returns true iff h.ReqStop has been Closed(). +func (h *halter) IsStopRequested() bool { + return h.ReqStop.IsClosed() +} + +// IsDone returns true iff h.Done has been Closed(). +func (h *halter) IsDone() bool { + return h.Done.IsClosed() +} diff --git a/health/rbuf.go b/health/rbuf.go new file mode 100644 index 00000000000..2c2d4936fb7 --- /dev/null +++ b/health/rbuf.go @@ -0,0 +1,140 @@ +package health + +// https://github.com/glycerine/rbuf +// copyright (c) 2014, Jason E. Aten +// license: MIT + +import "io" + +// ringBuf: +// +// a fixed-size circular ring buffer. Just what it says. +// +type ringBuf struct { + A []interface{} + N int // MaxViewInBytes, the size of A + Beg int // start of data in A + Readable int // number of bytes available to read in A +} + +// newRingBuf constructs a new ringBuf. +func newRingBuf(maxViewInBytes int) *ringBuf { + n := maxViewInBytes + r := &ringBuf{ + N: n, + Beg: 0, + Readable: 0, + } + r.A = make([]interface{}, n, n) + + return r +} + +// clone makes a copy of b. +func (b *ringBuf) clone() *ringBuf { + a := &ringBuf{} + for i := range b.A { + a.A = append(a.A, b.A[i]) + } + a.N = b.N + a.Beg = b.Beg + a.Readable = b.Readable + return a +} + +// Reset quickly forgets any data stored in the ring buffer. The +// data is still there, but the ring buffer will ignore it and +// overwrite those buffers as new data comes in. +func (b *ringBuf) Reset() { + b.Beg = 0 + b.Readable = 0 +} + +// Advance(): non-standard, but better than Next(), +// because we don't have to unwrap our buffer and pay the cpu time +// for the copy that unwrapping may need. +// Useful in conjuction/after ReadWithoutAdvance() above. +func (b *ringBuf) Advance(n int) { + if n <= 0 { + return + } + if n > b.Readable { + n = b.Readable + } + b.Readable -= n + b.Beg = (b.Beg + n) % b.N +} + +func intMin(a, b int) int { + if a < b { + return a + } else { + return b + } +} + +func (f *ringBuf) Avail() int { + return f.Readable +} + +// returns the earliest index, or -1 if +// the ring is empty +func (f *ringBuf) First() int { + if f.Readable == 0 { + return -1 + } + return f.Beg +} + +// returns the index of the last element, +// or -1 if the ring is empty. +func (f *ringBuf) Last() int { + if f.Readable == 0 { + return -1 + } + + last := f.Beg + f.Readable - 1 + if last < f.N { + // we fit without wrapping + return last + } + + return last % f.N +} + +// Kth presents the contents of the +// ring as a strictly linear sequence, +// so the user doesn't need to think +// about modular arithmetic. Here k indexes from +// [0, f.Readable-1], assuming f.Avail() +// is greater than 0. Kth() returns an +// actual index where the logical k-th +// element, starting from f.Beg, resides. +// f.Beg itself lives at k = 0. If k is +// out of bounds, or the ring is empty, +// -1 is returned. +func (f *ringBuf) Kth(k int) int { + if f.Readable == 0 || k < 0 || k >= f.Readable { + return -1 + } + return (f.Beg + k) % f.N +} + +// +// Append returns an error if there is no more +// space in the ring. Otherwise it returns nil +// and writes p into the ring in last position. +// +func (b *ringBuf) Append(p interface{}) error { + writeCapacity := b.N - b.Readable + if writeCapacity <= 0 { + // we are all full up already. + return io.ErrShortWrite + } + + writeStart := (b.Beg + b.Readable) % b.N + b.A[writeStart] = p + + b.Readable += 1 + return nil +} diff --git a/health/setutil.go b/health/setutil.go new file mode 100644 index 00000000000..585ed338872 --- /dev/null +++ b/health/setutil.go @@ -0,0 +1,63 @@ +package health + +import ( + "fmt" +) + +// utilities and sets stuff + +// p is a shortcut for a call to fmt.Printf that implicitly starts +// and ends its message with a newline. +func p(format string, stuff ...interface{}) { + fmt.Printf("\n "+format+"\n", stuff...) +} + +func panicOn(err error) { + if err != nil { + panic(err) + } +} + +type members struct { + DedupTree *ranktree `json:"Mem"` +} + +func (m *members) insert(s AgentLoc) { + m.DedupTree.insert(s) +} +func (m *members) clear() { + m.DedupTree = newRanktree() +} + +func (m *members) minrank() AgentLoc { + return m.DedupTree.minrank() +} + +func (m *members) clone() *members { + cp := newMembers() + if m.DedupTree == nil { + return cp + } + cp.DedupTree = m.DedupTree.clone() + return cp +} + +func (m *members) setEmpty() bool { + return m.DedupTree.Len() == 0 +} + +func (m *members) String() string { + return string(m.mustJsonBytes()) +} + +func newMembers() *members { + return &members{ + DedupTree: newRanktree(), + } +} + +func (m *members) mustJsonBytes() []byte { + by, err := m.DedupTree.MarshalJSON() + panicOn(err) + return by +} diff --git a/logger/log.go b/logger/log.go index 485ae12e7ee..a266de2f108 100644 --- a/logger/log.go +++ b/logger/log.go @@ -22,8 +22,7 @@ type Logger struct { } // NewStdLogger creates a logger with output directed to Stderr -func NewStdLogger(time, debug, trace, colors, pid bool) *Logger { - flags := 0 +func NewStdLogger(time, debug, trace, colors, pid bool, flags int) *Logger { if time { flags = log.LstdFlags | log.Lmicroseconds } @@ -49,14 +48,13 @@ func NewStdLogger(time, debug, trace, colors, pid bool) *Logger { } // NewFileLogger creates a logger with output directed to a file -func NewFileLogger(filename string, time, debug, trace, pid bool) *Logger { +func NewFileLogger(filename string, time, debug, trace, pid bool, flags int) *Logger { fileflags := os.O_WRONLY | os.O_APPEND | os.O_CREATE f, err := os.OpenFile(filename, fileflags, 0660) if err != nil { log.Fatalf("error opening file: %v", err) } - flags := 0 if time { flags = log.LstdFlags | log.Lmicroseconds } @@ -126,3 +124,7 @@ func (l *Logger) Tracef(format string, v ...interface{}) { l.logger.Printf(l.traceLabel+format, v...) } } + +func (l *Logger) Utc() { + +} diff --git a/main.go b/main.go index 425372390f2..5fe761f596e 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,10 @@ import ( "net" "net/url" "os" + "time" "github.com/nats-io/gnatsd/auth" + "github.com/nats-io/gnatsd/health" "github.com/nats-io/gnatsd/logger" "github.com/nats-io/gnatsd/server" ) @@ -52,6 +54,11 @@ Cluster Options: --no_advertise Advertise known cluster IPs to clients --connect_retries For implicit routes, number of connect retries +Cluster Health Monitor: + --health Run the health monitoring/leader election agent + --lease Duration of leader leases. default: 12s + --beat Time between heartbeats (want 3-4/lease). default: 3s + --rank Smaller rank gives priority in leader election Common Options: -h, --help Show this message @@ -118,6 +125,10 @@ func main() { flag.StringVar(&opts.TLSCert, "tlscert", "", "Server certificate file.") flag.StringVar(&opts.TLSKey, "tlskey", "", "Private key for server certificate.") flag.StringVar(&opts.TLSCaCert, "tlscacert", "", "Client certificate CA for verification.") + flag.BoolVar(&opts.HealthAgent, "health", false, "Run the health agent, elect a leader.") + flag.IntVar(&opts.HealthRank, "rank", 7, "leader election priority: the smaller the rank, the more preferred the server is as a leader. Negative ranks are allowed. Ties are broken by the random ServerId.") + flag.DurationVar(&opts.HealthLease, "lease", time.Second*12, "leader lease duration (should allow 3-4 beats within a lease)") + flag.DurationVar(&opts.HealthBeat, "beat", time.Second*3, "heart beat every this often (should get 3-4 beats within a lease)") flag.Usage = func() { fmt.Printf("%s\n", usageStr) @@ -175,6 +186,10 @@ func main() { server.PrintAndDie(err.Error()) } + if opts.HealthAgent { + opts.InternalCli = append(opts.InternalCli, health.NewAgent(&opts)) + } + // Create the server with appropriate options. s := server.New(&opts) @@ -220,7 +235,7 @@ func configureLogger(s *server.Server, opts *server.Options) { var log server.Logger if opts.LogFile != "" { - log = logger.NewFileLogger(opts.LogFile, opts.Logtime, opts.Debug, opts.Trace, true) + log = logger.NewFileLogger(opts.LogFile, opts.Logtime, opts.Debug, opts.Trace, true, 0) } else if opts.RemoteSyslog != "" { log = logger.NewRemoteSysLogger(opts.RemoteSyslog, opts.Debug, opts.Trace) } else if opts.Syslog { @@ -233,7 +248,7 @@ func configureLogger(s *server.Server, opts *server.Options) { if err != nil || (stat.Mode()&os.ModeCharDevice) == 0 { colors = false } - log = logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace, colors, true) + log = logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace, colors, true, 0) } s.SetLogger(log, opts.Debug, opts.Trace) diff --git a/server/client.go b/server/client.go index 5a64a79c357..901749d0ef6 100644 --- a/server/client.go +++ b/server/client.go @@ -20,6 +20,9 @@ const ( CLIENT = iota // ROUTER is another router in the cluster. ROUTER + // INTERNALCLI is an internal client. + // An example is the health-agent. + INTERNALCLI ) const ( @@ -216,6 +219,8 @@ func (c *client) initClient() { c.ncs = fmt.Sprintf("%s - cid:%d", conn, c.cid) case ROUTER: c.ncs = fmt.Sprintf("%s - rid:%d", conn, c.cid) + case INTERNALCLI: + c.ncs = fmt.Sprintf("internal:0 - hid:%d", c.cid) } } @@ -407,6 +412,8 @@ func (c *client) processErr(errStr string) { c.Errorf("Client Error %s", errStr) case ROUTER: c.Errorf("Route Error %s", errStr) + case INTERNALCLI: + c.Errorf("InternalClient Error %s", errStr) } c.closeConnection() } diff --git a/server/icli.go b/server/icli.go new file mode 100644 index 00000000000..3b5374b5432 --- /dev/null +++ b/server/icli.go @@ -0,0 +1,89 @@ +package server + +import ( + "net" + "sync" +) + +// LocalInternalClient is a trait interface. +// The net.Conn implementations of +// internal clients provided over +// the accept() callback (see Start below) +// should implement it to tell the server to ignore +// TLS and auth for internal clients. +// +type LocalInternalClient interface { + IsInternal() +} + +// iCli tracks the internal +// clients. +// +type iCli struct { + configured []InternalClient + mu sync.Mutex +} + +// InternalClient provides +// a plugin-like interface, +// supporting internal clients that live +// in-process with the Server +// on their own goroutines. +// +// An example of an internal client +// is the health monitoring client. +// In order to be effective, its lifetime +// must exactly match that of the +// server it monitors. +// +type InternalClient interface { + + // Name should return a readable + // human name for the InternalClient; + // it will be invoked as a part of + // startup/shutdown/error logging. + // + Name() string + + // Start should run the client on + // a background goroutine. + // + // The Server s will invoke Start() + // as a part of its own init and setup. + // + // The info and opts pointers will be + // viewable from an already locked Server + // instance, and so can be read without + // worrying about data races. + // + // Any returned error will be logged. + // This will not prevent the Server + // from calling Stop() on termination, + // and Stop() must be expected (and + // not block) no matter what. + // + // By returning an net.Conn the client + // provides the server with the + // equivalent of a Listen/Accept created + // net.Conn for communication with + // the client. + // + // The iclient should log using logger. + // + Start(info Info, + opts Options, + logger Logger) (net.Conn, error) + + // Stop should shutdown the goroutine(s) + // of the internal client. + // The Server will invoke Stop() as a part + // of its own shutdown process, *even* if + // Start() failed to start the background + // goroutine. Authors should take care + // to allow Stop() to be called even + // on a failed start. + // + // Stop is expected not to block for long. + // + Stop() +} diff --git a/server/log.go b/server/log.go index 26176d3eb82..5713bf21d9c 100644 --- a/server/log.go +++ b/server/log.go @@ -72,7 +72,7 @@ func (s *Server) ReOpenLogFile() { Noticef("File log re-open ignored, not a file logger") } else { fileLog := logger.NewFileLogger(s.opts.LogFile, - s.opts.Logtime, s.opts.Debug, s.opts.Trace, true) + s.opts.Logtime, s.opts.Debug, s.opts.Trace, true, 0) s.SetLogger(fileLog, s.opts.Debug, s.opts.Trace) Noticef("File log re-opened") } diff --git a/server/monitor.go b/server/monitor.go index 6cce144b7f3..dc473cc2047 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -473,10 +473,11 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { v.TotalConnections = s.totalClients v.Routes = len(s.routes) v.Remotes = len(s.remotes) - v.InMsgs = s.inMsgs - v.InBytes = s.inBytes - v.OutMsgs = s.outMsgs - v.OutBytes = s.outBytes + // atomic loads avoid data races with client.go:298,951 + v.InMsgs = atomic.LoadInt64(&s.inMsgs) + v.InBytes = atomic.LoadInt64(&s.inBytes) + v.OutMsgs = atomic.LoadInt64(&s.outMsgs) + v.OutBytes = atomic.LoadInt64(&s.outBytes) v.SlowConsumers = s.slowConsumers v.Subscriptions = s.sl.Count() s.httpReqStats[VarzPath]++ diff --git a/server/opts.go b/server/opts.go index 8b97fa451a6..46dc7d13c55 100644 --- a/server/opts.go +++ b/server/opts.go @@ -71,10 +71,10 @@ type Options struct { ProfPort int `json:"-"` PidFile string `json:"-"` LogFile string `json:"-"` - Syslog bool `json:"-"` RemoteSyslog string `json:"-"` Routes []*url.URL `json:"-"` RoutesStr string `json:"-"` + Syslog bool `json:"-"` TLSTimeout float64 `json:"tls_timeout"` TLS bool `json:"-"` TLSVerify bool `json:"-"` @@ -83,6 +83,12 @@ type Options struct { TLSCaCert string `json:"-"` TLSConfig *tls.Config `json:"-"` WriteDeadline time.Duration `json:"-"` + + InternalCli []InternalClient `json:"-"` + HealthAgent bool `json:"health_agent"` + HealthRank int `json:"health_rank"` + HealthLease time.Duration `json:"health_lease"` + HealthBeat time.Duration `json:"health_beat"` } // Configuration file authorization section. @@ -225,6 +231,14 @@ func ProcessConfigFile(configFile string) (*Options, error) { opts.PingInterval = time.Duration(int(v.(int64))) * time.Second case "ping_max": opts.MaxPingsOut = int(v.(int64)) + case "health_rank": + opts.HealthRank = int(v.(int64)) + case "health_lease": + opts.HealthLease = v.(time.Duration) + case "health_beat": + opts.HealthBeat = v.(time.Duration) + case "health_agent": + opts.HealthAgent = v.(bool) case "tls": tlsm := v.(map[string]interface{}) tc, err := parseTLS(tlsm) diff --git a/server/server.go b/server/server.go index 7d616d2b0dd..b8d4d808f2b 100644 --- a/server/server.go +++ b/server/server.go @@ -39,6 +39,7 @@ type Info struct { MaxPayload int `json:"max_payload"` IP string `json:"ip,omitempty"` ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to. + ServerRank int `json:"server_rank"` // lowest rank wins leader election. // Used internally for quick look-ups. clientConnectURLs map[string]struct{} @@ -77,6 +78,7 @@ type Server struct { grRunning bool grWG sync.WaitGroup // to wait on various go routines cproto int64 // number of clients supporting async INFO + icli iCli // in-process internal clients } // Make sure all are 64bits for atomic use @@ -118,6 +120,7 @@ func New(opts *Options) *Server { trace: opts.Trace, done: make(chan bool, 1), start: time.Now(), + icli: iCli{configured: opts.InternalCli}, } s.mu.Lock() @@ -267,10 +270,53 @@ func (s *Server) Start() { s.StartProfiler() } + // Run the internal clients in + // s.icli.configured. + // + // Retain only those started + // successfully in s.icli.running. + // + s.icli.mu.Lock() + go func(info Info, opts Options) { + defer s.icli.mu.Unlock() + + // wait for server to be accepting clients + select { + case <-s.rcQuit: + return + case <-clientListenReady: + } + + n := len(s.icli.configured) + if n == 0 { + return + } + Debugf("Starting the %v internal client(s).", n) + for _, ic := range s.icli.configured { + srv, err := ic.Start(info, opts, log.logger) + if err == nil { + if srv != nil { + s.InternalCliRegisterCallback(srv) + } + Noticef("InternalClient ['%s'] started.", ic.Name()) + } else { + Errorf("InternalClient ['%s'] failed to Start(): %s", ic.Name(), err) + } + } + }(s.info, *s.opts) + // Wait for clients. s.AcceptLoop(clientListenReady) } +// InternalCliRegisterCallback is public only for testing. +func (s *Server) InternalCliRegisterCallback(srv net.Conn) { + s.startGoRoutine(func() { + s.createClient(srv) + s.grWG.Done() + }) +} + // Shutdown will shutdown the server instance by kicking out the AcceptLoop // and closing all associated clients. func (s *Server) Shutdown() { @@ -287,6 +333,13 @@ func (s *Server) Shutdown() { s.grRunning = false s.grMu.Unlock() + // stop any internal clients + s.icli.mu.Lock() + for _, ic := range s.icli.configured { + ic.Stop() + } + s.icli.mu.Unlock() + conns := make(map[uint64]*client) // Copy off the clients @@ -531,6 +584,11 @@ func (s *Server) startMonitoring(secure bool) { func (s *Server) createClient(conn net.Conn) *client { c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: s.info.MaxPayload, start: time.Now()} + _, isInternal := conn.(LocalInternalClient) + if isInternal { + c.typ = INTERNALCLI + } + // Grab JSON info string s.mu.Lock() info := s.infoJSON @@ -548,7 +606,7 @@ func (s *Server) createClient(conn net.Conn) *client { c.Debugf("Client connection created") // Check for Auth - if authRequired { + if !isInternal && authRequired { c.setAuthTimer(secondsToDuration(s.opts.AuthTimeout)) } @@ -582,7 +640,7 @@ func (s *Server) createClient(conn net.Conn) *client { c.mu.Lock() // Check for TLS - if tlsRequired { + if !isInternal && tlsRequired { c.Debugf("Starting TLS client connection handshake") c.nc = tls.Server(c.nc, s.opts.TLSConfig) conn := c.nc.(*tls.Conn) @@ -613,7 +671,7 @@ func (s *Server) createClient(conn net.Conn) *client { return c } - if tlsRequired { + if !isInternal && tlsRequired { // Rewrap bw c.bw = bufio.NewWriterSize(c.nc, startBufSize) } @@ -621,12 +679,14 @@ func (s *Server) createClient(conn net.Conn) *client { // Do final client initialization // Set the Ping timer - c.setPingTimer() + if !isInternal { + c.setPingTimer() + } // Spin up the read loop. s.startGoRoutine(func() { c.readLoop() }) - if tlsRequired { + if !isInternal && tlsRequired { c.Debugf("TLS handshake complete") cs := c.nc.(*tls.Conn).ConnectionState() c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite)) @@ -755,6 +815,8 @@ func (s *Server) checkAuth(c *client) bool { return s.checkClientAuth(c) case ROUTER: return s.checkRouterAuth(c) + case INTERNALCLI: + return true default: return false }