diff --git a/health/ALGORITHM.md b/health/ALGORITHM.md new file mode 100644 index 00000000000..5a133331aa3 --- /dev/null +++ b/health/ALGORITHM.md @@ -0,0 +1,163 @@ +# The ALLCALL leased-leader election algorithm. + +Jason E. Aten + +February 2017 + +constants (examples) +----------- + +Let heartBeat = 1 sec. + +Let leaseTime = 10 sec. + +Let maxClockSkew = 1 sec, where maxClockSkew +is a bound on how far out of sync our local +clocks may drift. + +givens +-------- + +* Given: Let each server have a numeric integer rank, that is distinct +and unique to that server. If necessary an extremely long +true random number is used to break ties between server ranks, so +that we may assert, with probability 1, that all ranks are distinct integers, +and that each server, absent an in-force lease, can be put into a strict total order. + +* Rule: The lower the rank is preferred for being the leader. + +* Ordering by lease time then rank: we order the pair (leaseExpires, rank) first +by largest leaseExpires time, then by lower rank. If +both leases have expired, then lease time is not considered as +a part of the ordering, and rank alone determines the +new leader. + +ALLCALL Algorithm phases +=========================== + +### I. Init phase + +When a server joins the cluster, +it does not issue allcalls (a ping of all members) +until after leaseTime + maxClockSkew time has elapsed. + +During init, the server does, however, accept and respond to allcalls() from +other cluster members. The allcall() ping will contain +the current (lease, leader-rank) and leader-id +according to the issuer of the allcall(). Every +recipient of the allcall updates her local information +of who she thinks the leader is, so long as the +received information is monotone in the (lease, leader-rank) +ordering; so a later unexpired lease will replace an +earlier unexpired lease, and if both are expired then +the lower rank will replace the larger rank as winner +of the current leader role. + +### II. regular ping phase + +After a server has finished its Init phase, it +enters its ping phase, where is stays until it +is shut down. + +During ping phase, the server continues to accept and respond +to allcall requests from other servers. Now in addition, +the server also issues its own allcall() pings every +heartBeat seconds. + +### III. Election and Lease determination + +Election and leasing are computed locally, by each +node, one heartBeat after receiving any replies +from the allcall(). The localnode who issued +the allcall sorts the respondents, including +itself, and if all leases have expired, it +determines who is the new leader and marks +their lease as starting from now. Lease +and leader computation is done locally +and independently on each server. +Once in ping phase, this new determination +is broadcast at the next heartbeat when +the local node issues an allcall(). + +If a node receives an allcall with a leader +claim, and that leader claim has a shorter +lease expirtaion time than the existing +leader, the new proposed leader is rejected +in favor of the current leader. This +favors continuity of leadership until +the end of the current leaders term, and +is a product of the sort order +described above where we sort +candidates by lease time then rank. + +## Properties of the allcall + +The allcalls() are heard by all active cluster members, and +contain the sender's computed result of who the current leader is, +and replies answer back with the recipient's own rank and id. Each +recipient of an allcall() replies to all cluster members. +Both the sending and the replying to the allcall are +broadcasts that are published to a well known topic. + +## Safety/Convergence: ALLCALL converges to one leader + +Suppose two nodes are partitioned and so both are leaders on +their own side of the network. Then suppose the network +is joined again, so the two leaders are brought together +by a healing of the network, or by adding a new link +between the networks. The two nodes exchange Ids and lease +times, and the node with the shorter valid lease time +adopts the node with the longer lease as leader, +since that is the sort order. The adoption takes +effect as soon as the loser is informed of the +other leader's lease via its allcall. + +## Liveness: a leader will be chosen + +Given the total order among nodes, exactly one +will be lowest rank and thus be the preferred +leader at the end of the any current leader's +lease, even if the current lease holder +has failed. Hence, with at least one live +node (that lives beyond one lease term), +the system can run for at most one +lease term before electing a leader. + + +## commentary + +ALLCALL does not guarantee that there will +never be more than one leader. Availability +in the face of network partition is +desirable in many cases, and ALLCALL is +appropriate for these. ALLCALL does +not guarantee that a leader will always +be present, but with live nodes it +does provide that the cluster will +have a leader after one lease term + +maxClockSkew has expired. + +By design, ALLCALL functions well +in cluster with only one and two +nodes, or with any number, including +an even number, of nodes. Compared to quorum +based elections like raft and paxos, +where an odd number of at least three +nodes is required, this can be very desirable. +ALLCALL is appropriate for AP, +rather than CP, style systems, where +availability is more important +than having a single writer. When +writes are idempotent or deduplicated +downstream, it is better for uptime +to run an always-on leadership system. + +implementation +------------ + +ALLCALL is implented on top of +the Nats system here, currently +on a branch. + +https://github.com/glycerine/gnatsd/blob/minhealth/health/health.go + diff --git a/health/agent.go b/health/agent.go new file mode 100644 index 00000000000..3e5a20c7b0a --- /dev/null +++ b/health/agent.go @@ -0,0 +1,74 @@ +package health + +import ( + "net" + "time" + + "github.com/nats-io/gnatsd/server" + "github.com/nats-io/gnatsd/server/lcon" +) + +// Agent implements the InternalClient interface. +// It provides health status checks and +// leader election from among the candidate +// gnatsd instances in a cluster. +type Agent struct { + opts *server.Options + mship *Membership +} + +// NewAgent makes a new Agent. +func NewAgent(opts *server.Options) *Agent { + return &Agent{ + opts: opts, + } +} + +// Name should identify the internal client for logging. +func (h *Agent) Name() string { + return "health-agent" +} + +// Start makes an internal +// entirely in-process client that monitors +// cluster health and manages group +// membership functions. +// +func (h *Agent) Start( + info server.Info, + opts server.Options, + logger server.Logger, + +) (net.Conn, error) { + + // To keep the health client fast and its traffic + // internal-only, we use an bi-directional, + // in-memory version of a TCP stream. + // + // The buffers really do have to be of + // sufficient size, or we will + // deadlock/livelock the system. + // + cli, srv := lcon.NewBidir(info.MaxPayload * 2) + + rank := opts.HealthRank + beat := opts.HealthBeat + lease := opts.HealthLease + + cfg := &MembershipCfg{ + MaxClockSkew: time.Second, + BeatDur: beat, + LeaseTime: lease, + MyRank: rank, + CliConn: cli, + Log: logger, + } + h.mship = NewMembership(cfg) + go h.mship.Start() + return srv, nil +} + +// Stop halts the background goroutine. +func (h *Agent) Stop() { + h.mship.Stop() +} diff --git a/health/amap.go b/health/amap.go new file mode 100644 index 00000000000..8315dac6ec6 --- /dev/null +++ b/health/amap.go @@ -0,0 +1,49 @@ +package health + +import "sync" + +// atomic map from string to *ServerLoc + +type AtomicServerLocMap struct { + U map[string]*ServerLoc `json:"U"` + tex sync.RWMutex +} + +func NewAtomicServerLocMap() *AtomicServerLocMap { + return &AtomicServerLocMap{ + U: make(map[string]*ServerLoc), + } +} + +func (m *AtomicServerLocMap) Len() int { + m.tex.RLock() + n := len(m.U) + m.tex.RUnlock() + return n +} + +func (m *AtomicServerLocMap) Get(key string) *ServerLoc { + m.tex.RLock() + s := m.U[key] + m.tex.RUnlock() + return s +} + +func (m *AtomicServerLocMap) Get2(key string) (*ServerLoc, bool) { + m.tex.RLock() + v, ok := m.U[key] + m.tex.RUnlock() + return v, ok +} + +func (m *AtomicServerLocMap) Set(key string, val *ServerLoc) { + m.tex.Lock() + m.U[key] = val + m.tex.Unlock() +} + +func (m *AtomicServerLocMap) Del(key string) { + m.tex.Lock() + delete(m.U, key) + m.tex.Unlock() +} diff --git a/health/config.go b/health/config.go new file mode 100644 index 00000000000..be58fb58034 --- /dev/null +++ b/health/config.go @@ -0,0 +1,67 @@ +package health + +import ( + "net" + "time" + + "github.com/nats-io/gnatsd/logger" + "github.com/nats-io/gnatsd/server" +) + +type MembershipCfg struct { + + // max we allow for clocks to be out of sync. + // default to 1 second if not set. + MaxClockSkew time.Duration + + // how often we heartbeat. defaults to 100msec + // if not set. + BeatDur time.Duration + + // NatsUrl example "nats://127.0.0.1:4222" + NatsUrl string + + // defaults to "_nats.cluster.members." + SysMemberPrefix string + + // LeaseTime is the minimum time the + // leader is elected for. Defaults to 10 sec. + LeaseTime time.Duration + + // provide a default until the server gives us rank + MyRank int + + // optional, if provided we will use this connection on + // the client side. + CliConn net.Conn + + Log server.Logger +} + +func (cfg *MembershipCfg) SetDefaults() { + if cfg.LeaseTime == 0 { + cfg.LeaseTime = time.Second * 10 + } + if cfg.SysMemberPrefix == "" { + cfg.SysMemberPrefix = "_nats.cluster.members." + } + if cfg.BeatDur == 0 { + cfg.BeatDur = 100 * time.Millisecond + } + if cfg.MaxClockSkew == 0 { + cfg.MaxClockSkew = time.Second + } + if cfg.NatsUrl == "" { + cfg.NatsUrl = "nats://127.0.0.1:4222" + } + if cfg.Log == nil { + // stderr + const micros, pid = true, true + const colors, trace, debug = false, false, false + cfg.Log = logger.NewStdLogger(micros, debug, trace, colors, pid) + } +} + +func (cfg *MembershipCfg) Dial(network, address string) (net.Conn, error) { + return cfg.CliConn, nil +} diff --git a/health/health.go b/health/health.go new file mode 100644 index 00000000000..3d017b512f4 --- /dev/null +++ b/health/health.go @@ -0,0 +1,615 @@ +package health + +import ( + "encoding/json" + "log" + "sort" + "sync" + "time" + + "github.com/nats-io/go-nats" +) + +// sysMemberPrefix creates a namespace +// for system cluster membership communication. +// This prefix aims to avoid collisions +// with user-level topics. Only system +// processes / internal clients should +// write to these topics, but everyone +// is welcome to listen on them. +// +// note: `_nats` is for now, can easily +// changed to be `_SYS` later once +// we're sure everything is working. +// +const sysMemberPrefix = "_nats.cluster.members." + +// ServerLoc conveys to interested parties +// the Id and location of one gnatsd +// server in the cluster. +type ServerLoc struct { + Id string `json:"serverId"` + Host string `json:"host"` + Port int `json:"port"` + + // Are we the leader? + IsLeader bool `json:"leader"` + + // LeaseExpires is zero for any + // non-leader. For the leader, + // LeaseExpires tells you when + // the leaders lease expires. + LeaseExpires time.Time `json:"leaseExpires"` + + // lower rank is leader until lease + // expires. Ties are broken by Id. + // Rank should be assignable on the + // gnatsd command line with -rank to + // let the operator prioritize + // leadership for certain hosts. + Rank int `json:"rank"` +} + +func (s *ServerLoc) String() string { + by, err := json.Marshal(s) + panicOn(err) + return string(by) +} + +func (s *ServerLoc) fromBytes(by []byte) error { + return json.Unmarshal(by, s) +} + +// Membership tracks the nats server cluster +// membership, issuing health checks and +// choosing a leader. +type Membership struct { + Cfg MembershipCfg + + elec *leadHolder + nc *nats.Conn + + subjAllCall string + subjAllReply string + subjMemberLost string + subjMemberAdded string + subjMembership string + + halt *halter + mu sync.Mutex + stopping bool +} + +func NewMembership(cfg *MembershipCfg) *Membership { + return &Membership{ + Cfg: *cfg, + halt: newHalter(), + elec: newLeadHolder(), + } +} + +// leadHolder holds who is the current leader, +// and what their lease is. Used to synchronize +// access between various goroutines. +type leadHolder struct { + mu sync.Mutex + sloc ServerLoc + + history *RingBuf +} + +func newLeadHolder() *leadHolder { + return &leadHolder{ + history: NewRingBuf(100), + } +} + +// getLeader retreives the stored e.sloc value. +func (e *leadHolder) getLeader() ServerLoc { + e.mu.Lock() + defer e.mu.Unlock() + return e.sloc +} + +// setLeader aims to copy sloc and store it +// for future getLeader() calls to access. +// +// However we reject any attempt to replace +// a leader with a one that doesn't rank lower, where rank +// includes the LeaseExpires time +// (see the ServerLocLessThan() function). +// +// If we accept sloc +// we return slocWon true. If we reject sloc then +// we return slocWon false. In short, we will only +// accept sloc if ServerLocLessThan(sloc, e.sloc), +// and we return ServerLocLessThan(sloc, e.sloc). +// +// If we return slocWon false, alt contains the +// value we favored, which is the current value +// of our retained e.sloc. If we return true, +// then alt contains a copy of sloc. We +// return a value in alt to avoid data races. +// +func (e *leadHolder) setLeader(sloc *ServerLoc) (slocWon bool, alt ServerLoc) { + e.mu.Lock() + defer e.mu.Unlock() + + if sloc == nil { + return false, e.sloc + } + + slocWon = ServerLocLessThan(sloc, &e.sloc, time.Now()) + if !slocWon { + return false, e.sloc + } + + e.sloc = *sloc + histcp := *sloc + e.history.Append(&histcp) + return true, e.sloc +} + +func (e *leadHolder) getLeaderAsBytes() []byte { + lead := e.getLeader() + by, err := json.Marshal(&lead) + panicOn(err) + return by +} + +// Stop blocks until the Membership goroutine +// acknowledges the shutdown request. +func (m *Membership) Stop() { + m.mu.Lock() + if m.stopping { + m.mu.Unlock() + return + } + m.stopping = true + m.mu.Unlock() + m.halt.ReqStop.Close() + <-m.halt.Done.Chan +} + +func (m *Membership) Start() error { + //p("Membership.Start() running") + m.Cfg.SetDefaults() + + pc := newPongCollector() + nc, err := m.setupNatsClient(pc) + if err != nil { + m.halt.Done.Close() + return err + } + m.nc = nc + go m.start(nc, pc) + return nil +} + +func (m *Membership) start(nc *nats.Conn, pc *pongCollector) { + //p("Membership.start(nc=%p) running", nc) + + defer func() { + m.halt.Done.Close() + }() + + // history + history := NewRingBuf(10) + + m.Cfg.Log.Debugf("health-agent: Listening on [%s]\n", m.subjAllCall) + log.SetFlags(log.LstdFlags) + + prevCount, curCount := 0, 0 + var curMember, prevMember *members + var curLead *ServerLoc + + // do an initial allcall() to discover any + // current leader. + m.Cfg.Log.Tracef("health-agent: init: doing initial allcall to discover any existing leader...") + err := allcall(nc, m.subjAllCall, m.subjAllReply, m.elec) + if err != nil { + m.Cfg.Log.Debugf("health-agent: error back from allcall, terminating on: %s", err) + return + } + + select { + case <-time.After(m.Cfg.BeatDur): + case <-m.halt.ReqStop.Chan: + return + } + + prevCount, prevMember = pc.unsubAndGetSet() + + history.Append(prevMember) + + now := time.Now() + + firstSeenLead := m.elec.getLeader() + xpire := firstSeenLead.LeaseExpires + + limit := xpire.Add(m.Cfg.MaxClockSkew) + if !xpire.IsZero() && limit.After(now) { + m.Cfg.Log.Tracef("health-agent: init: after one heartbeat, we detect current leader '%s' of rank %v with lease good for %v until expiration + maxClockSkew=='%v'", + firstSeenLead.Id, firstSeenLead.Rank, limit.Sub(now), limit, + ) + } else { + m.Cfg.Log.Tracef("health-agent: init: after one heartbeat, no leader found. waiting for a full leader lease term of %s to expire...", m.Cfg.LeaseTime) + + select { + case <-time.After(m.Cfg.LeaseTime): + case <-m.halt.ReqStop.Chan: + return + } + } + + // prev responses should be back by now. + var expired bool + var prevLead *ServerLoc + var nextLeadReportTm time.Time + + for { + // NB: replies to an allcall can/will change what the current leader is in elec. + err = allcall(nc, m.subjAllCall, m.subjAllReply, m.elec) + if err != nil { + m.Cfg.Log.Tracef("health-agent: error on allcall, shutting down Membership: %s", err) + // e.g. "write on closed buffer" when shutting down. + return + } + + select { + case <-time.After(m.Cfg.BeatDur): + case <-m.halt.ReqStop.Chan: + return + } + lastSeenLead := m.elec.getLeader() + + // cur responses should be back by now + // and we can compare prev and cur. + curCount, curMember = pc.unsubAndGetSet() + + history.Append(curMember) + + now = time.Now() + expired, curLead = curMember.leaderLeaseExpired( + now, m.Cfg.LeaseTime, &lastSeenLead, m.Cfg.MaxClockSkew, + ) + + // tell pong + won, alt := m.elec.setLeader(curLead) + if !won { + curLead = &alt + } + + loc, _ := nc.ServerLocation() + if loc != nil { + loc.Rank = m.Cfg.MyRank + if loc.Id == curLead.Id { + if now.After(nextLeadReportTm) || prevLead == nil || prevLead.Id != curLead.Id { + left := curLead.LeaseExpires.Sub(now) + m.Cfg.Log.Debugf("health-agent: I am LEAD, my Id: '%s', rank %v. lease expires in %s", loc.Id, loc.Rank, left) + nextLeadReportTm = now.Add(left).Add(time.Second) + } + } else { + if prevLead != nil && prevLead.Id == loc.Id { + m.Cfg.Log.Debugf("health-agent: I am no longer lead, new LEAD is '%s', rank %v. lease expires in %s", curLead.Id, curLead.Rank, curLead.LeaseExpires.Sub(now)) + + } else { + if curLead != nil && (nextLeadReportTm.IsZero() || now.After(nextLeadReportTm)) { + left := curLead.LeaseExpires.Sub(now) + m.Cfg.Log.Debugf("health-agent: I am not lead. lead is '%s', rank %v, for %v", curLead.Id, curLead.Rank, left) + nextLeadReportTm = now.Add(left).Add(time.Second) + } + } + } + } + + lost := setDiff(prevMember, curMember, curLead) + gained := setDiff(curMember, prevMember, curLead) + same := setsEqual(prevMember, curMember) + + if same { + // nothing more to do. + // This is the common case when nothing changes. + } else { + lostBytes := lost.mustJsonBytes() + if !lost.setEmpty() { + nc.Publish(m.subjMemberLost, lostBytes) + // ignore errors on purpose, don't crash mid-health-report. + } + gainedBytes := gained.mustJsonBytes() + if !gained.setEmpty() { + nc.Publish(m.subjMemberAdded, gainedBytes) + // ignore errors on purpose + } + } + if curCount < prevCount { + m.Cfg.Log.Debugf("health-agent: ARG PAGE PAGE PAGE!! we went down a server, from %v -> %v. lost: '%s'", prevCount, curCount, lost) + + } else if curCount > prevCount { + m.Cfg.Log.Debugf("health-agent: +++++++ MORE ROBUSTNESS GAINED; we went from %v -> %v. gained: '%s'", prevCount, curCount, gained) + + } + + if expired { + curBytes := curMember.mustJsonBytes() + nc.Publish(m.subjMembership, curBytes) + } + + // done with compare, now loop + prevCount = curCount + prevMember = curMember + prevLead = curLead + } +} + +func pong(nc *nats.Conn, subj string, msg []byte) { + err := nc.Publish(subj, msg) + panicOn(err) + nc.Flush() + // ignore error on nc.Flush(). // nats: connection closed on shutdown. +} + +// allcall makes a new inbox and pings everyone +// to reply to that inbox. +// +// The ping consists of sending the ServerLoc +// of the current leader, which provides lease +// and full contact info for the leader. +// +// This gives a full +// round-trip connectivity check while filtering +// out concurrent checks from other nodes, so +// we can get an accurate count. +// +func allcall( + nc *nats.Conn, + subjAllCall string, + subjAllReply string, + elec *leadHolder, + +) error { + // allcall broadcasts the current leader + lease + leadby := elec.getLeaderAsBytes() + return nc.PublishRequest(subjAllCall, subjAllReply, leadby) +} + +// pongCollector collects the responses +// from an allcall request +type pongCollector struct { + replies int + from *members + mu sync.Mutex +} + +func newPongCollector() *pongCollector { + return &pongCollector{ + from: newMembers(), + } +} + +// acumulate pong responses +func (pc *pongCollector) receivePong(msg *nats.Msg) { + pc.mu.Lock() + defer pc.mu.Unlock() + + pc.replies++ + + var loc ServerLoc + err := loc.fromBytes(msg.Data) + panicOn(err) + + pc.from.Mem.Set(loc.Id, &loc) +} + +func (pc *pongCollector) clear() { + pc.mu.Lock() + pc.from.clear() + pc.mu.Unlock() +} + +// unsubAndGetSet unsubscribes and returns the count and set so far. +func (pc *pongCollector) unsubAndGetSet() (int, *members) { + + mem := pc.from.clone() + mem.clearLeaderAndLease() + pc.clear() + return mem.Mem.Len(), mem +} + +// leaderLeaseExpired evaluates the lease as of now, +// and returns the leader or best candiate. Returns +// expired == true if any prior leader lease has +// lapsed. In this case we return the best new +// leader with its IsLeader bit set and its +// LeaseExpires set to now + lease. +// +// If expired == false then the we return +// the current leader in lead. +// +// PRE: there are only 0 or 1 leaders in m.Mem +// who have a non-zero LeaseExpires field. +// +// If m.Mem is empty, we return (true, nil). +// +func (m *members) leaderLeaseExpired( + now time.Time, + leaseLen time.Duration, + prevLead *ServerLoc, + maxClockSkew time.Duration, + +) (expired bool, lead *ServerLoc) { + + if prevLead.LeaseExpires.Add(maxClockSkew).After(now) { + // honor the leases until they expire + return false, prevLead + } + + if m.Mem.Len() == 0 { + return false, prevLead + } + + // INVAR: any lease has expired. + + var sortme []*ServerLoc + m.Mem.tex.Lock() + for _, v := range m.Mem.U { + sortme = append(sortme, v) + } + m.Mem.tex.Unlock() + m.clearLeaderAndLease() + + sort.Sort(&byRankThenId{s: sortme, now: now}) + lead = sortme[0] + lead.IsLeader = true + lead.LeaseExpires = now.Add(leaseLen).UTC() + + return true, lead +} + +func (m *members) clearLeaderAndLease() { + m.Mem.tex.Lock() + for _, v := range m.Mem.U { + v.IsLeader = false + v.LeaseExpires = time.Time{} + } + m.Mem.tex.Unlock() +} + +type byRankThenId struct { + s []*ServerLoc + now time.Time +} + +func (p byRankThenId) Len() int { return len(p.s) } +func (p byRankThenId) Swap(i, j int) { p.s[i], p.s[j] = p.s[j], p.s[i] } + +// Less must be stable and computable locally yet +// applicable globally: it is how we choose a leader +// in a stable fashion. +func (p byRankThenId) Less(i, j int) bool { + return ServerLocLessThan(p.s[i], p.s[j], p.now) +} + +// ServerLocLessThan returns true iff i < j, in terms of rank. +// Lower rank is more electable. We order first by LeaseExpires, +// then by Rank, Id, Host, and Port; in that order. The +// longer leaseExpires wins (is less than). +func ServerLocLessThan(i, j *ServerLoc, now time.Time) bool { + nowu := now.UnixNano() + itm := i.LeaseExpires.UnixNano() + jtm := j.LeaseExpires.UnixNano() + + // if both are expired, then its a tie. + if jtm <= nowu { + jtm = 0 + } + if itm <= nowu { + itm = 0 + } + if itm != jtm { + return itm > jtm // we want an actual time to sort before a zero-time. + } + if i.Rank != j.Rank { + return i.Rank < j.Rank + } + if i.Id != j.Id { + return i.Id < j.Id + } + if i.Host != j.Host { + return i.Host < j.Host + } + return i.Port < j.Port +} + +func (m *Membership) setupNatsClient(pc *pongCollector) (*nats.Conn, error) { + discon := func(nc *nats.Conn) { + m.Cfg.Log.Tracef("health-agent: Disconnected from nats!") + } + optdis := nats.DisconnectHandler(discon) + norand := nats.DontRandomize() + + recon := func(nc *nats.Conn) { + loc, err := nc.ServerLocation() + panicOn(err) + loc.Rank = m.Cfg.MyRank + m.Cfg.Log.Tracef("health-agent: Reconnect to nats!: loc = '%s'", loc) + } + optrecon := nats.ReconnectHandler(recon) + + opts := []nats.Option{optdis, optrecon, norand} + if m.Cfg.CliConn != nil { + opts = append(opts, nats.Dialer(&m.Cfg)) + } + + nc, err := nats.Connect(m.Cfg.NatsUrl, opts...) + if err != nil { + log.Fatalf("Can't connect: %v\n", err) + } + + loc, err := nc.ServerLocation() + if err != nil { + return nil, err + } + loc.Rank = m.Cfg.MyRank + m.Cfg.Log.Debugf("health-agent: HELLOWORLD: I am '%s' at '%v:%v'. rank %v", loc.Id, loc.Host, loc.Port, loc.Rank) + + m.subjAllCall = sysMemberPrefix + "allcall" + m.subjAllReply = sysMemberPrefix + "allreply" + m.subjMemberLost = sysMemberPrefix + "lost" + m.subjMemberAdded = sysMemberPrefix + "added" + m.subjMembership = sysMemberPrefix + "list" + + nc.Subscribe(m.subjAllReply, pc.receivePong) + + nc.Subscribe(m.subjAllCall, func(msg *nats.Msg) { + loc, err := nc.ServerLocation() + if err != nil { + return // try again next time. + } + //panicOn(err) // nats: invalid connection + loc.Rank = m.Cfg.MyRank + + // allcall broadcasts the leader + var lead ServerLoc + err = lead.fromBytes(msg.Data) + panicOn(err) + + if lead.Id != "" && !lead.LeaseExpires.IsZero() { + won, alt := m.elec.setLeader(&lead) + if !won { + // if we rejected, get our preferred leader. + lead = alt + } + + if loc.Id == lead.Id { + loc.IsLeader = true + loc.LeaseExpires = lead.LeaseExpires + } else { + loc.IsLeader = false + loc.LeaseExpires = time.Time{} + } + } + + hp, err := json.Marshal(loc) + panicOn(err) + pong(nc, msg.Reply, hp) + }) + + /* reporting */ + nc.Subscribe(m.subjMemberLost, func(msg *nats.Msg) { + m.Cfg.Log.Tracef("health-agent: Received on [%s]: '%s'", + msg.Subject, string(msg.Data)) + }) + + nc.Subscribe(m.subjMemberAdded, func(msg *nats.Msg) { + m.Cfg.Log.Tracef("health-agent: Received on [%s]: '%s'", + msg.Subject, string(msg.Data)) + }) + + nc.Subscribe(m.subjMembership, func(msg *nats.Msg) { + m.Cfg.Log.Tracef("health-agent: Received on [%s]: '%s'", + msg.Subject, string(msg.Data)) + }) + + return nc, nil +} diff --git a/health/health_test.go b/health/health_test.go new file mode 100644 index 00000000000..eeb0f653c3d --- /dev/null +++ b/health/health_test.go @@ -0,0 +1,115 @@ +package health + +import ( + "fmt" + cv "github.com/glycerine/goconvey/convey" + "github.com/nats-io/gnatsd/server" + "github.com/nats-io/gnatsd/server/lcon" + gnatsd "github.com/nats-io/gnatsd/test" + "github.com/nats-io/go-nats" + "testing" + "time" +) + +const TEST_PORT = 8392 +const DefaultTimeout = 2 * time.Second + +var cliOpts = nats.Options{ + Url: fmt.Sprintf("nats://localhost:%d", TEST_PORT), + AllowReconnect: true, + MaxReconnect: 10, + ReconnectWait: 10 * time.Millisecond, + Timeout: DefaultTimeout, +} + +// DefaultTestOptions are default options for the unit tests. +var serverOpts = server.Options{ + Host: "localhost", + Port: TEST_PORT, + NoLog: true, + NoSigs: true, + MaxControlLine: 256, +} + +func Test101StressTestManyClients(t *testing.T) { + + cv.Convey("when stress testing with 50 clients coming up and shutting down, we should survive and prosper", t, func() { + + maxPayload := 1024 * 1024 + s := RunServerOnPort(TEST_PORT) + defer s.Shutdown() + + n := 50 + var ms []*Membership + for i := 0; i < n; i++ { + cli, srv := lcon.NewBidir(maxPayload * 2) + + s.InternalCliRegisterCallback(srv) + cfg := &MembershipCfg{ + CliConn: cli, + MaxClockSkew: 1 * time.Millisecond, + LeaseTime: 30 * time.Millisecond, + BeatDur: 10 * time.Millisecond, + NatsUrl: fmt.Sprintf("nats://localhost:%v", TEST_PORT), + MyRank: i, // ranks 0..n-1 + } + + m := NewMembership(cfg) + err := m.Start() + if err != nil { + panic(err) + } + ms = append(ms, m) + defer m.Stop() + } + + }) +} + +func Test102ConvergenceToOneLowRankLeader(t *testing.T) { + + cv.Convey("given a cluster of one with rank 0, no matter what other nodes that arrive thinking they are the leader (from after a partition is healed), as long as those other nodes have rank 1, our rank 0 process will continue to be the leader and all other arrivals will give up their leadership after their lease expires.", t, func() { + + maxPayload := 1024 * 1024 + s := RunServerOnPort(TEST_PORT) + defer s.Shutdown() + + n := 50 + var ms []*Membership + for i := 0; i < n; i++ { + cli, srv := lcon.NewBidir(maxPayload * 2) + + s.InternalCliRegisterCallback(srv) + cfg := &MembershipCfg{ + CliConn: cli, + MaxClockSkew: 1 * time.Millisecond, + LeaseTime: 30 * time.Millisecond, + BeatDur: 10 * time.Millisecond, + NatsUrl: fmt.Sprintf("nats://localhost:%v", TEST_PORT), + MyRank: i, // ranks 0..n-1 + } + + m := NewMembership(cfg) + err := m.Start() + if err != nil { + panic(err) + } + ms = append(ms, m) + defer m.Stop() + } + cv.So(true, cv.ShouldBeTrue) // we should get here. + }) +} + +func TestLiveness(t *testing.T) { + + cv.Convey("After a single lease term + clockskew, a leader will have been chosen", t, func() { + + }) +} + +func RunServerOnPort(port int) *server.Server { + opts := serverOpts + opts.Port = port + return gnatsd.RunServer(&opts) +} diff --git a/health/healthcmd/main.go b/health/healthcmd/main.go new file mode 100644 index 00000000000..4cec1950cb2 --- /dev/null +++ b/health/healthcmd/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "strconv" + "time" + + "github.com/nats-io/gnatsd/health" +) + +// healthcmd runs an allcall election from a standalone +// command line nats client. It exercises the same +// gnatsd/health library code that runs as an internal client +// in process with gnatsd. + +func usage() { + log.Fatalf("use: healthcmd {host}:port {rank}\n") +} + +func main() { + + log.SetFlags(0) + flag.Usage = usage + flag.Parse() + + args := flag.Args() + if len(args) < 1 { + usage() + } + + rank := 0 + var err error + if len(args) >= 2 { + rank, err = strconv.Atoi(args[1]) + if err != nil { + fmt.Fprintf(os.Stderr, "2nd arg should be our numeric rank") + } + } + + cfg := &health.MembershipCfg{ + MaxClockSkew: time.Second, + BeatDur: 100 * time.Millisecond, + NatsUrl: "nats://" + args[0], // "nats://127.0.0.1:4222" + MyRank: rank, + } + m := health.NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + + select {} +} diff --git a/health/idem.go b/health/idem.go new file mode 100644 index 00000000000..107399d3215 --- /dev/null +++ b/health/idem.go @@ -0,0 +1,106 @@ +package health + +import ( + "fmt" + "sync" +) + +// Copyright (c) 2017 Jason E. Aten, Ph.D. +// https://github.com/glycerine/idem +// MIT license. + +// idemCloseChan can have Close() called on it +// multiple times, and it will only close +// Chan once. +type idemCloseChan struct { + Chan chan bool + closed bool + mut sync.Mutex +} + +// Reinit re-allocates the Chan, assinging +// a new channel and reseting the state +// as if brand new. +func (c *idemCloseChan) Reinit() { + c.mut.Lock() + defer c.mut.Unlock() + c.Chan = make(chan bool) + c.closed = false +} + +// NewIdemCloseChan makes a new idemCloseChan. +func NewIdemCloseChan() *idemCloseChan { + return &idemCloseChan{ + Chan: make(chan bool), + } +} + +var ErrAlreadyClosed = fmt.Errorf("Chan already closed") + +// Close returns ErrAlreadyClosed if it has been +// called before. It never closes IdemClose.Chan more +// than once, so it is safe to ignore the returned +// error value. Close() is safe for concurrent access by multiple +// goroutines. Close returns nil after the first time +// it is called. +func (c *idemCloseChan) Close() error { + c.mut.Lock() + defer c.mut.Unlock() + if !c.closed { + close(c.Chan) + c.closed = true + return nil + } + return ErrAlreadyClosed +} + +// IsClosed tells you if Chan is already closed or not. +func (c *idemCloseChan) IsClosed() bool { + c.mut.Lock() + defer c.mut.Unlock() + return c.closed +} + +// halter helps shutdown a goroutine +type halter struct { + // The owning goutine should call Done.Close() as its last + // actual once it has received the ReqStop() signal. + Done idemCloseChan + + // Other goroutines call ReqStop.Close() in order + // to request that the owning goroutine stop immediately. + // The owning goroutine should select on ReqStop.Chan + // in order to recognize shutdown requests. + ReqStop idemCloseChan +} + +func newHalter() *halter { + return &halter{ + Done: *NewIdemCloseChan(), + ReqStop: *NewIdemCloseChan(), + } +} + +// RequestStop closes the h.ReqStop channel +// if it has not already done so. Safe for +// multiple goroutine access. +func (h *halter) RequestStop() { + h.ReqStop.Close() +} + +// MarkDone closes the h.Done channel +// if it has not already done so. Safe for +// multiple goroutine access. +func (h *halter) MarkDone() { + h.Done.Close() +} + +// IsStopRequested returns true iff h.ReqStop has been Closed(). +func (h *halter) IsStopRequested() bool { + return h.ReqStop.IsClosed() +} + +// IsDone returns true iff h.Done has been Closed(). +func (h *halter) IsDone() bool { + return h.Done.IsClosed() +} diff --git a/health/rbuf.go b/health/rbuf.go new file mode 100644 index 00000000000..1f8aaa13798 --- /dev/null +++ b/health/rbuf.go @@ -0,0 +1,128 @@ +package health + +// https://github.com/glycerine/rbuf +// copyright (c) 2014, Jason E. Aten +// license: MIT + +import "io" + +// RingBuf: +// +// a fixed-size circular ring buffer. Just what it says. +// +type RingBuf struct { + A []interface{} + N int // MaxViewInBytes, the size of A + Beg int // start of data in A + Readable int // number of bytes available to read in A +} + +// NewRingBuf constructs a new RingBuf. +func NewRingBuf(maxViewInBytes int) *RingBuf { + n := maxViewInBytes + r := &RingBuf{ + N: n, + Beg: 0, + Readable: 0, + } + r.A = make([]interface{}, n, n) + + return r +} + +// Reset quickly forgets any data stored in the ring buffer. The +// data is still there, but the ring buffer will ignore it and +// overwrite those buffers as new data comes in. +func (b *RingBuf) Reset() { + b.Beg = 0 + b.Readable = 0 +} + +// Advance(): non-standard, but better than Next(), +// because we don't have to unwrap our buffer and pay the cpu time +// for the copy that unwrapping may need. +// Useful in conjuction/after ReadWithoutAdvance() above. +func (b *RingBuf) Advance(n int) { + if n <= 0 { + return + } + if n > b.Readable { + n = b.Readable + } + b.Readable -= n + b.Beg = (b.Beg + n) % b.N +} + +func intMin(a, b int) int { + if a < b { + return a + } else { + return b + } +} + +func (f *RingBuf) Avail() int { + return f.Readable +} + +// returns the earliest index, or -1 if +// the ring is empty +func (f *RingBuf) First() int { + if f.Readable == 0 { + return -1 + } + return f.Beg +} + +// returns the index of the last element, +// or -1 if the ring is empty. +func (f *RingBuf) Last() int { + if f.Readable == 0 { + return -1 + } + + last := f.Beg + f.Readable - 1 + if last < f.N { + // we fit without wrapping + return last + } + + return last % f.N +} + +// Kth presents the contents of the +// ring as a strictly linear sequence, +// so the user doesn't need to think +// about modular arithmetic. Here k indexes from +// [0, f.Readable-1], assuming f.Avail() +// is greater than 0. Kth() returns an +// actual index where the logical k-th +// element, starting from f.Beg, resides. +// f.Beg itself lives at k = 0. If k is +// out of bounds, or the ring is empty, +// -1 is returned. +func (f *RingBuf) Kth(k int) int { + if f.Readable == 0 || k < 0 || k >= f.Readable { + return -1 + } + return (f.Beg + k) % f.N +} + +// +// Append returns an error if there is no more +// space in the ring. Otherwise it returns nil +// and writes p into the ring in last position. +// +func (b *RingBuf) Append(p interface{}) error { + writeCapacity := b.N - b.Readable + if writeCapacity <= 0 { + // we are all full up already. + return io.ErrShortWrite + } + + writeStart := (b.Beg + b.Readable) % b.N + b.A[writeStart] = p + + b.Readable += 1 + return nil +} diff --git a/health/setutil.go b/health/setutil.go new file mode 100644 index 00000000000..3b24836c746 --- /dev/null +++ b/health/setutil.go @@ -0,0 +1,112 @@ +package health + +import ( + "encoding/json" + "fmt" +) + +// utilities and sets stuff + +// p is a shortcut for a call to fmt.Printf that implicitly starts +// and ends its message with a newline. +func p(format string, stuff ...interface{}) { + fmt.Printf("\n "+format+"\n", stuff...) +} + +func panicOn(err error) { + if err != nil { + panic(err) + } +} + +// return a minus b, where a and b are sets. +func setDiff(a, b *members, curLead *ServerLoc) *members { + + res := newMembers() + a.Mem.tex.Lock() + for k, v := range a.Mem.U { + + if curLead != nil { + // annotate leader as we go... + if v.Id == curLead.Id { + v.IsLeader = true + v.LeaseExpires = curLead.LeaseExpires + } + } + + if _, found := b.Mem.U[k]; !found { // data race + res.Mem.U[k] = v + } + } + a.Mem.tex.Unlock() + return res +} + +func setsEqual(a, b *members) bool { + a.Mem.tex.Lock() + b.Mem.tex.Lock() + defer b.Mem.tex.Unlock() + defer a.Mem.tex.Unlock() + + alen := len(a.Mem.U) + if alen != len(b.Mem.U) { + return false + } + // INVAR: len(a) == len(b) + if alen == 0 { + return true + } + for k := range a.Mem.U { + if _, found := b.Mem.U[k]; !found { + return false + } + } + // INVAR: all of a was found in b, and they + // are the same size + return true +} + +type members struct { + GroupName string `json:"GroupName"` + Mem *AtomicServerLocMap `json:"Mem"` +} + +func (m *members) clear() { + m.Mem = NewAtomicServerLocMap() +} + +func (m *members) clone() *members { + cp := newMembers() + cp.GroupName = m.GroupName + if m.Mem == nil { + return cp + } + m.Mem.tex.Lock() + cp.Mem.tex.Lock() + for k, v := range m.Mem.U { + cp.Mem.U[k] = v + } + cp.Mem.tex.Unlock() + m.Mem.tex.Unlock() + return cp +} + +func (m *members) setEmpty() bool { + return m.Mem.Len() == 0 +} + +func (m *members) String() string { + return string(m.mustJsonBytes()) +} + +func newMembers() *members { + return &members{ + Mem: NewAtomicServerLocMap(), + } +} + +func (m members) mustJsonBytes() []byte { + by, err := json.Marshal(m) + panicOn(err) + return by +} diff --git a/main.go b/main.go index 425372390f2..20d55f91d9d 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,10 @@ import ( "net" "net/url" "os" + "time" "github.com/nats-io/gnatsd/auth" + "github.com/nats-io/gnatsd/health" "github.com/nats-io/gnatsd/logger" "github.com/nats-io/gnatsd/server" ) @@ -52,6 +54,11 @@ Cluster Options: --no_advertise Advertise known cluster IPs to clients --connect_retries For implicit routes, number of connect retries +Cluster Health Monitor: + --health Run the health monitoring/leader election agent + --lease Duration of leader leases + --beat Duration of heart-beat pings (want 3-4/lease) + --rank Smaller rank gives priority in leader election Common Options: -h, --help Show this message @@ -118,6 +125,10 @@ func main() { flag.StringVar(&opts.TLSCert, "tlscert", "", "Server certificate file.") flag.StringVar(&opts.TLSKey, "tlskey", "", "Private key for server certificate.") flag.StringVar(&opts.TLSCaCert, "tlscacert", "", "Client certificate CA for verification.") + flag.BoolVar(&opts.HealthAgent, "health", false, "Run the health agent, elect a leader.") + flag.IntVar(&opts.HealthRank, "rank", 7, "leader election priority: the smaller the rank, the more preferred the server is as a leader. Negative ranks are allowed. Ties are broken by the random ServerId.") + flag.DurationVar(&opts.HealthLease, "lease", time.Second*12, "leader lease duration (should allow 3-4 beats within a lease)") + flag.DurationVar(&opts.HealthBeat, "beat", time.Second*3, "heart beat every this often (should get 3-4 beats within a lease)") flag.Usage = func() { fmt.Printf("%s\n", usageStr) @@ -175,6 +186,10 @@ func main() { server.PrintAndDie(err.Error()) } + if opts.HealthAgent { + opts.InternalCli = append(opts.InternalCli, health.NewAgent(&opts)) + } + // Create the server with appropriate options. s := server.New(&opts) diff --git a/server/client.go b/server/client.go index 5a64a79c357..64f6cd1b96c 100644 --- a/server/client.go +++ b/server/client.go @@ -20,6 +20,8 @@ const ( CLIENT = iota // ROUTER is another router in the cluster. ROUTER + // HEALTH is the internal client that monitors cluster membership. + HEALTH ) const ( @@ -216,6 +218,8 @@ func (c *client) initClient() { c.ncs = fmt.Sprintf("%s - cid:%d", conn, c.cid) case ROUTER: c.ncs = fmt.Sprintf("%s - rid:%d", conn, c.cid) + case HEALTH: + c.ncs = fmt.Sprintf("internal:0 - hid:%d", c.cid) } } @@ -299,7 +303,7 @@ func (c *client) readLoop() { // Check pending clients for flush. for cp := range c.pcd { // Flush those in the set - cp.mu.Lock() + cp.mu.Lock() //2nd gen deadlock, jea if cp.nc != nil { // Gather the flush calls that happened before now. // This is a signal into us about dynamic buffer allocation tuning. @@ -407,6 +411,8 @@ func (c *client) processErr(errStr string) { c.Errorf("Client Error %s", errStr) case ROUTER: c.Errorf("Route Error %s", errStr) + case HEALTH: + c.Errorf("Health Error %s", errStr) } c.closeConnection() } @@ -898,7 +904,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { return } client := sub.client - client.mu.Lock() + client.mu.Lock() // 2nd gen deadlock, jea sub.nm++ // Check if we should auto-unsubscribe. if sub.max > 0 { diff --git a/server/icli.go b/server/icli.go new file mode 100644 index 00000000000..ee194ec1d66 --- /dev/null +++ b/server/icli.go @@ -0,0 +1,89 @@ +package server + +import ( + "net" + "sync" +) + +// LocalInternalClient is a trait interface. +// The net.Conn implementations of +// internal clients provided over +// the accept() callback (see Start below) +// should implement it to tell the server to ignore +// TLS and auth for internal clients. +// +type LocalInternalClient interface { + InMemory() +} + +// iCli tracks the internal +// clients. +// +type iCli struct { + configured []InternalClient + mu sync.Mutex +} + +// InternalClient provides +// a plugin-like interface, +// supporting internal clients that live +// in-process with the Server +// on their own goroutines. +// +// An example of an internal client +// is the health monitoring client. +// In order to be effective, its lifetime +// must exactly match that of the +// server it monitors. +// +type InternalClient interface { + + // Name should return a readable + // human name for the InternalClient; + // it will be invoked as a part of + // startup/shutdown/error logging. + // + Name() string + + // Start should run the client on + // a background goroutine. + // + // The Server s will invoke Start() + // as a part of its own init and setup. + // + // The info and opts pointers will be + // viewable from an already locked Server + // instance, and so can be read without + // worrying about data races. + // + // Any returned error will be logged. + // This will not prevent the Server + // from calling Stop() on termination, + // and Stop() must be expected (and + // not block) no matter what. + // + // By returning an net.Conn the client + // provides the server with the + // equivalent of a Listen/Accept created + // net.Conn for communication with + // the client. + // + // The iclient should log using logger. + // + Start(info Info, + opts Options, + logger Logger) (net.Conn, error) + + // Stop should shutdown the goroutine(s) + // of the internal client. + // The Server will invoke Stop() as a part + // of its own shutdown process, *even* if + // Start() failed to start the background + // goroutine. Authors should take care + // to allow Stop() to be called even + // on a failed start. + // + // Stop is expected not to block for long. + // + Stop() +} diff --git a/server/lcon/LICENSE b/server/lcon/LICENSE new file mode 100644 index 00000000000..3a7d3ca8d3f --- /dev/null +++ b/server/lcon/LICENSE @@ -0,0 +1,29 @@ +Copyright (c) 2014 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following disclaimer + in the documentation and/or other materials provided with the + distribution. + + * Neither the name of Google Inc. nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/server/lcon/README.md b/server/lcon/README.md new file mode 100644 index 00000000000..05ceb810d7d --- /dev/null +++ b/server/lcon/README.md @@ -0,0 +1,11 @@ +# lcon + +Emulate a bi-directional TCP connection entirely in memory. + +See NewBidir() in the godocs: https://godoc.org/github.com/glycerine/lcon + +Derived from https://github.com/bradfitz/http2/pull/8/files + +motivation: https://groups.google.com/forum/#!topic/golang-dev/k0bSal8eDyE + +Repo: https://github.com/glycerine/lcon diff --git a/server/lcon/bidir.go b/server/lcon/bidir.go new file mode 100644 index 00000000000..97cb786a3be --- /dev/null +++ b/server/lcon/bidir.go @@ -0,0 +1,85 @@ +// package lcon gives a local in-memory net.Conn +// that can be used to simulate a TCP connection. +// It uses bounded buffers in both directions, +// so writers will be blocked until readers +// make progress, and vice-versa. +package lcon + +import ( + "net" + "time" +) + +// Bidir emulates a TCP connection +// entirely in memory, providing +// bi-directional buffering. +type Bidir struct { + Send *Pipe + Recv *Pipe +} + +// satify LocalInternalClient interface +func (r *Bidir) InMemory() {} + +// NewBidir returns a pair of Bidir, +// each of which can be treated as +// a net.Conn in order to communicate +// over memory buffers. +func NewBidir(sz int) (*Bidir, *Bidir) { + s := make([]byte, sz) + r := make([]byte, sz) + + sp := NewPipe(s) + rp := NewPipe(r) + return &Bidir{ + Send: sp, + Recv: rp, + }, &Bidir{ + Send: rp, + Recv: sp, + } +} + +func (r *Bidir) Read(p []byte) (n int, err error) { + return r.Recv.Read(p) +} + +// Write copies bytes from p into the buffer and wakes a reader. +// It is an error to write more data than the buffer can hold. +func (r *Bidir) Write(p []byte) (n int, err error) { + return r.Send.Write(p) +} + +func (c *Bidir) Close() error { + return c.Send.Close() +} + +func (r *Bidir) SetErrorAndClose(err error) { + r.Recv.SetErrorAndClose(err) + r.Send.SetErrorAndClose(err) +} + +// Bidir fullfills the net.Conn interface + +func (r *Bidir) LocalAddr() net.Addr { return addr{} } +func (r *Bidir) RemoteAddr() net.Addr { return addr{} } + +// SetDeadline implements the net.Conn method +func (r *Bidir) SetDeadline(t time.Time) error { + err := r.SetReadDeadline(t) + err2 := r.SetWriteDeadline(t) + if err != nil { + return err + } + return err2 +} + +// SetWriteDeadline implements the net.Conn method +func (r *Bidir) SetWriteDeadline(t time.Time) error { + return r.Send.SetWriteDeadline(t) +} + +// SetReadDeadline implements the net.Conn method +func (r *Bidir) SetReadDeadline(t time.Time) error { + return r.Recv.SetReadDeadline(t) +} diff --git a/server/lcon/bidir_test.go b/server/lcon/bidir_test.go new file mode 100644 index 00000000000..e20cea61319 --- /dev/null +++ b/server/lcon/bidir_test.go @@ -0,0 +1,35 @@ +package lcon + +import ( + "net" + "testing" +) + +func TestBidirAsNetConn(t *testing.T) { + + // write to a, read from b + var a, b net.Conn = NewBidir(100) + + msg := "hello-world" + + n, err := a.Write([]byte(msg)) + if err != nil { + t.Errorf("err = %v", err) + } + if n != len(msg) { + t.Errorf("Write truncated at %v < %v", n, len(msg)) + } + + readbuf := make([]byte, len(msg)) + m, err := b.Read(readbuf) + if err != nil { + t.Errorf("err = %v", err) + } + if m != n { + t.Errorf("Read truncated at %v !=n %v", m, n) + } + back := string(readbuf[:m]) + if back != msg { + t.Errorf("msg corrupted, wrote '%v', read '%v'", msg, back) + } +} diff --git a/server/lcon/buffer.go b/server/lcon/buffer.go new file mode 100644 index 00000000000..e569f3fee50 --- /dev/null +++ b/server/lcon/buffer.go @@ -0,0 +1,88 @@ +// from https://github.com/bradfitz/http2/pull/8/files +// +// motivation: https://groups.google.com/forum/#!topic/golang-dev/k0bSal8eDyE +// +// Copyright 2014 The Go Authors. +// See https://code.google.com/p/go/source/browse/CONTRIBUTORS +// Licensed under the same terms as Go itself: +// https://code.google.com/p/go/source/browse/LICENSE + +package lcon + +import ( + "errors" +) + +// buffer is an io.ReadWriteCloser backed by a fixed size buffer. +// It never allocates, but moves old data as new data is written. +type buffer struct { + buf []byte + r, w int + closed bool + err error // err to return to reader + late bool +} + +var ( + errReadEmpty = errors.New("read from empty buffer") + errWriteClosed = errors.New("write on closed buffer") + errWriteFull = errors.New("write on full buffer") +) + +// Read copies bytes from the buffer into p. +// It is an error to read when no data is available. +func (b *buffer) Read(p []byte) (n int, err error) { + n = copy(p, b.buf[b.r:b.w]) + b.r += n + if b.closed && b.r == b.w { + err = b.err + } else if b.r == b.w && n == 0 { + err = errReadEmpty + } + return n, err +} + +// Len returns the number of bytes of the unread portion of the buffer. +func (b *buffer) Len() int { + return b.w - b.r +} + +func (b *buffer) freeBytes() int { + return len(b.buf) - b.w +} + +// Write copies bytes from p into the buffer. +// It is an error to write more data than the buffer can hold. +func (b *buffer) Write(p []byte) (n int, err error) { + if b.late { + return 0, ErrDeadline + } + if b.closed { + return 0, errWriteClosed + } + + // Slide existing data to beginning. + if b.r > 0 && len(p) > len(b.buf)-b.w { + copy(b.buf, b.buf[b.r:b.w]) + b.w -= b.r + b.r = 0 + } + + // Write new data. + n = copy(b.buf[b.w:], p) + b.w += n + if n < len(p) { + err = errWriteFull + } + return n, err +} + +// Close marks the buffer as closed. Future calls to Write will +// return an error. Future calls to Read, once the buffer is +// empty, will return err. +func (b *buffer) Close(err error) { + if !b.closed { + b.closed = true + b.err = err + } +} diff --git a/server/lcon/buffer_test.go b/server/lcon/buffer_test.go new file mode 100644 index 00000000000..68b5125c67c --- /dev/null +++ b/server/lcon/buffer_test.go @@ -0,0 +1,158 @@ +// from https://github.com/bradfitz/http2/pull/8/files +// +// motivation: https://groups.google.com/forum/#!topic/golang-dev/k0bSal8eDyE +// +// Copyright 2014 The Go Authors. +// See https://code.google.com/p/go/source/browse/CONTRIBUTORS +// Licensed under the same terms as Go itself: +// https://code.google.com/p/go/source/browse/LICENSE + +package lcon + +import ( + "io" + "reflect" + "testing" +) + +var bufferReadTests = []struct { + buf buffer + read, wn int + werr error + wp []byte + wbuf buffer +}{ + { + buffer{[]byte{'a', 0}, 0, 1, false, nil, false}, + 5, 1, nil, []byte{'a'}, + buffer{[]byte{'a', 0}, 1, 1, false, nil, false}, + }, + { + buffer{[]byte{'a', 0}, 0, 1, true, io.EOF, false}, + 5, 1, io.EOF, []byte{'a'}, + buffer{[]byte{'a', 0}, 1, 1, true, io.EOF, false}, + }, + { + buffer{[]byte{0, 'a'}, 1, 2, false, nil, false}, + 5, 1, nil, []byte{'a'}, + buffer{[]byte{0, 'a'}, 2, 2, false, nil, false}, + }, + { + buffer{[]byte{0, 'a'}, 1, 2, true, io.EOF, false}, + 5, 1, io.EOF, []byte{'a'}, + buffer{[]byte{0, 'a'}, 2, 2, true, io.EOF, false}, + }, + { + buffer{[]byte{}, 0, 0, false, nil, false}, + 5, 0, errReadEmpty, []byte{}, + buffer{[]byte{}, 0, 0, false, nil, false}, + }, + { + buffer{[]byte{}, 0, 0, true, io.EOF, false}, + 5, 0, io.EOF, []byte{}, + buffer{[]byte{}, 0, 0, true, io.EOF, false}, + }, +} + +func TestBufferRead(t *testing.T) { + for i, tt := range bufferReadTests { + read := make([]byte, tt.read) + n, err := tt.buf.Read(read) + if n != tt.wn { + t.Errorf("#%d: wn = %d want %d", i, n, tt.wn) + continue + } + if err != tt.werr { + t.Errorf("#%d: werr = %v want %v", i, err, tt.werr) + continue + } + read = read[:n] + if !reflect.DeepEqual(read, tt.wp) { + t.Errorf("#%d: read = %+v want %+v", i, read, tt.wp) + } + if !reflect.DeepEqual(tt.buf, tt.wbuf) { + t.Errorf("#%d: buf = %+v want %+v", i, tt.buf, tt.wbuf) + } + } +} + +var bufferWriteTests = []struct { + buf buffer + write, wn int + werr error + wbuf buffer +}{ + { + buf: buffer{ + buf: []byte{}, + }, + wbuf: buffer{ + buf: []byte{}, + }, + }, + { + buf: buffer{ + buf: []byte{1, 'a'}, + }, + write: 1, + wn: 1, + wbuf: buffer{ + buf: []byte{0, 'a'}, + w: 1, + }, + }, + { + buf: buffer{ + buf: []byte{'a', 1}, + r: 1, + w: 1, + }, + write: 2, + wn: 2, + wbuf: buffer{ + buf: []byte{0, 0}, + w: 2, + }, + }, + { + buf: buffer{ + buf: []byte{}, + r: 1, + closed: true, + }, + write: 5, + werr: errWriteClosed, + wbuf: buffer{ + buf: []byte{}, + r: 1, + closed: true, + }, + }, + { + buf: buffer{ + buf: []byte{}, + }, + write: 5, + werr: errWriteFull, + wbuf: buffer{ + buf: []byte{}, + }, + }, +} + +func TestBufferWrite(t *testing.T) { + for i, tt := range bufferWriteTests { + n, err := tt.buf.Write(make([]byte, tt.write)) + if n != tt.wn { + t.Errorf("#%d: wrote %d bytes; want %d", i, n, tt.wn) + continue + } + if err != tt.werr { + t.Errorf("#%d: error = %v; want %v", i, err, tt.werr) + continue + } + if !reflect.DeepEqual(tt.buf, tt.wbuf) { + t.Errorf("#%d: buf = %+v; want %+v", i, tt.buf, tt.wbuf) + } + } +} diff --git a/server/lcon/pipe.go b/server/lcon/pipe.go new file mode 100644 index 00000000000..48be9d816fa --- /dev/null +++ b/server/lcon/pipe.go @@ -0,0 +1,178 @@ +// from https://github.com/bradfitz/http2/pull/8/files +// +// motivation: https://groups.google.com/forum/#!topic/golang-dev/k0bSal8eDyE +// +// Copyright 2014 The Go Authors. +// See https://code.google.com/p/go/source/browse/CONTRIBUTORS +// Licensed under the same terms as Go itself: +// https://code.google.com/p/go/source/browse/LICENSE + +package lcon + +import ( + "fmt" + "net" + "sync" + "time" +) + +// Pipe is buffered version of net.Pipe. Reads +// will block until data is available. +type Pipe struct { + b buffer + rc sync.Cond + wc sync.Cond + rm sync.Mutex + wm sync.Mutex + Flushed chan bool + + readDeadline time.Time + writeDeadline time.Time +} + +// NewPipe must be given a buf of +// pre-allocated size to use as the +// internal buffer between reads +// and writes. +func NewPipe(buf []byte) *Pipe { + p := &Pipe{ + b: buffer{buf: buf}, + Flushed: make(chan bool, 1), + } + p.rc = *sync.NewCond(&p.rm) + return p +} + +var ErrDeadline = fmt.Errorf("deadline exceeded") + +// Read waits until data is available and copies bytes +// from the buffer into p. +func (r *Pipe) Read(p []byte) (n int, err error) { + + r.rc.L.Lock() + defer r.rc.L.Unlock() + if !r.readDeadline.IsZero() { + now := time.Now() + dur := r.readDeadline.Sub(now) + if dur <= 0 { + return 0, ErrDeadline + } + nextReadDone := make(chan struct{}) + defer close(nextReadDone) + go func(dur time.Duration) { + select { + case <-time.After(dur): + r.rc.L.Lock() + r.b.late = true + r.rc.L.Unlock() + r.rc.Broadcast() + case <-nextReadDone: + } + }(dur) + } + for r.b.Len() == 0 && !r.b.closed && !r.b.late { + r.rc.Wait() + } + defer func() { + // we already hold the lock + r.b.late = false + r.readDeadline = time.Time{} + }() + return r.b.Read(p) +} + +// Write copies bytes from p into the buffer and wakes a reader. +// It is an error to write more data than the buffer can hold. +func (r *Pipe) Write(p []byte) (n int, err error) { + + r.rc.L.Lock() + defer r.rc.L.Unlock() + if !r.writeDeadline.IsZero() { + now := time.Now() + dur := r.writeDeadline.Sub(now) + if dur <= 0 { + return 0, ErrDeadline + } + nextWriteDone := make(chan struct{}) + defer close(nextWriteDone) + go func(dur time.Duration) { + select { + case <-time.After(dur): + r.rc.L.Lock() + r.b.late = true + r.rc.L.Unlock() + r.rc.Broadcast() + case <-nextWriteDone: + } + }(dur) + } + defer r.rc.Broadcast() + defer r.flush() + + for r.b.freeBytes() < len(p) && !r.b.closed && !r.b.late { + r.rc.Wait() + } + defer func() { + // we already hold the lock + r.b.late = false + r.writeDeadline = time.Time{} + }() + + return r.b.Write(p) +} + +var ErrLconPipeClosed = fmt.Errorf("lcon pipe closed") + +func (c *Pipe) Close() error { + c.SetErrorAndClose(ErrLconPipeClosed) + return nil +} + +func (r *Pipe) SetErrorAndClose(err error) { + r.rc.L.Lock() + defer r.rc.L.Unlock() + defer r.rc.Broadcast() + r.b.Close(err) +} + +// Pipe technically fullfills the net.Conn interface + +func (r *Pipe) LocalAddr() net.Addr { return addr{} } +func (r *Pipe) RemoteAddr() net.Addr { return addr{} } + +func (r *Pipe) flush() { + if len(r.Flushed) == 0 { + r.Flushed <- true + } +} + +type addr struct{} + +func (a addr) String() string { return "memory.pipe:0" } +func (a addr) Network() string { return "in-process-internal" } + +// SetDeadline implements the net.Conn method +func (r *Pipe) SetDeadline(t time.Time) error { + err := r.SetReadDeadline(t) + err2 := r.SetWriteDeadline(t) + if err != nil { + return err + } + return err2 +} + +// SetWriteDeadline implements the net.Conn method +func (r *Pipe) SetWriteDeadline(t time.Time) error { + r.rc.L.Lock() + r.writeDeadline = t + r.rc.L.Unlock() + return nil +} + +// SetReadDeadline implements the net.Conn method +func (r *Pipe) SetReadDeadline(t time.Time) error { + r.rc.L.Lock() + r.readDeadline = t + r.rc.L.Unlock() + return nil +} diff --git a/server/lcon/pipe_test.go b/server/lcon/pipe_test.go new file mode 100644 index 00000000000..e2e7bdf439e --- /dev/null +++ b/server/lcon/pipe_test.go @@ -0,0 +1,255 @@ +// from https://github.com/bradfitz/http2/pull/8/files +// +// motivation: https://groups.google.com/forum/#!topic/golang-dev/k0bSal8eDyE +// +// Copyright 2014 The Go Authors. +// See https://code.google.com/p/go/source/browse/CONTRIBUTORS +// Licensed under the same terms as Go itself: +// https://code.google.com/p/go/source/browse/LICENSE + +package lcon + +import ( + "errors" + "fmt" + "net" + "runtime" + "testing" + "time" +) + +func TestPipeClose(t *testing.T) { + var p Pipe + p.rc.L = &p.rm + a := errors.New("a") + b := errors.New("b") + p.SetErrorAndClose(a) + p.SetErrorAndClose(b) + _, err := p.Read(make([]byte, 1)) + if err != a { + t.Errorf("err = %v want %v", err, a) + } +} + +func TestPipeAsNetConn(t *testing.T) { + + var nc net.Conn = NewPipe(make([]byte, 100)) + + msg := "hello-world" + ms2 := "finkleworms" + for i := 0; i < 2; i++ { + if i == 1 { + msg = ms2 + } + + n, err := nc.Write([]byte(msg)) + if err != nil { + t.Errorf("err = %v", err) + } + if n != len(msg) { + t.Errorf("Write truncated at %v < %v", n, len(msg)) + } + + readbuf := make([]byte, len(msg)) + m, err := nc.Read(readbuf) + if err != nil { + t.Errorf("err = %v", err) + } + if m != n { + t.Errorf("Read truncated at %v !=n %v", m, n) + } + back := string(readbuf[:m]) + if back != msg { + t.Errorf("msg corrupted, wrote '%v', read '%v'", msg, back) + } + } + + // write-write, read-read + { + n, err := nc.Write([]byte(msg)) + if err != nil { + t.Errorf("err = %v", err) + } + if n != len(msg) { + t.Errorf("Write truncated at %v < %v", n, len(msg)) + } + + n2, err := nc.Write([]byte(ms2)) + if err != nil { + t.Errorf("err = %v", err) + } + if n2 != len(ms2) { + t.Errorf("Write truncated at %v < %v", n, len(msg)) + } + + readbuf := make([]byte, len(msg)) + m, err := nc.Read(readbuf) + if err != nil { + t.Errorf("err = %v", err) + } + if m != n { + t.Errorf("Read truncated at %v !=n %v", m, n) + } + back := string(readbuf[:m]) + if back != msg { + t.Errorf("msg corrupted, wrote '%v', read '%v'", msg, back) + } + + // 2nd read + readbuf2 := make([]byte, len(msg)) + m2, err := nc.Read(readbuf2) + if err != nil { + t.Errorf("err = %v", err) + } + if m2 != n2 { + t.Errorf("Read truncated at %v !=n %v", m2, n2) + } + back2 := string(readbuf2[:m2]) + if back != ms2 { + t.Errorf("msg corrupted, wrote '%v', read '%v'", ms2, back2) + } + } + + // blocking read + readdone := make(chan struct{}) + readbuf3 := make([]byte, len(msg)) + var m3 int + var err error + go func() { + + // should block + m3, err = nc.Read(readbuf3) + if err != nil { + t.Errorf("err = %v", err) + } + close(readdone) + }() + select { + case <-readdone: + t.Fatal("read did not block") + case <-time.After(60 * time.Millisecond): + // good, read should have blocked. + } + + msg3 := "heya" + // write and see release + n3, err := nc.Write([]byte(msg3)) + if err != nil { + t.Errorf("err = %v", err) + } + if n3 != len(msg3) { + t.Errorf("Write truncated at %v < %v", n3, len(msg3)) + } + <-readdone + got := string(readbuf3[:m3]) + if got != msg3 { + t.Errorf(fmt.Errorf("msg corrupted, wrote '%v', read '%v'", msg3, got).Error()) + } + //fmt.Printf("\n got = '%s'\n", got) + +} + +func TestReadDeadlinesWork(t *testing.T) { + + var nc net.Conn = NewPipe(make([]byte, 100)) + + // deadlines should work + readbuf4 := make([]byte, 100) + + timeout := 50 * time.Millisecond + err := nc.SetReadDeadline(time.Now().Add(timeout)) + if err != nil { + t.Fatalf("must be able to SetReadDeadline") + } + deadlineFired, checkDone := make(chan bool), make(chan bool) + go func() { + select { + case <-time.After(6 * timeout): + + buf := make([]byte, 1<<20) + stacklen := runtime.Stack(buf, true) + fmt.Printf("\n%s\n\n", buf[:stacklen]) + + panic(fmt.Sprintf("%v deadline didn't fire after %v", + timeout, 6*timeout)) + case <-deadlineFired: + close(checkDone) + } + }() + + t0 := time.Now() + _, err = nc.Read(readbuf4) + elap := time.Since(t0) + close(deadlineFired) + <-checkDone + if err == nil { + t.Fatalf("Read beyond deadline should have returned an error") + } + if elap < timeout { + t.Fatalf("Read returned before deadline timeout") + } + fmt.Printf("good, err = '%v' after %s.\n", err, elap) + + // and should be able to read successfully after timeout: + msg := []byte("jabber") + _, err = nc.Write(msg) + if err != nil { + t.Fatalf("should have been able to write") + } + nr, err := nc.Read(readbuf4) + if nr != len(msg) { + t.Fatalf("should have been able to read all of msg") + } + if err != nil { + t.Fatalf("should have been able to read after previous read-deadline timeout: '%s'", err) + } +} + +func TestWriteDeadlinesWork(t *testing.T) { + + var nc net.Conn = NewPipe(make([]byte, 10)) + + // deadlines should work, trying to write more + // than we have space for... + writebuf := make([]byte, 100) + + timeout := 50 * time.Millisecond + err := nc.SetWriteDeadline(time.Now().Add(timeout)) + if err != nil { + t.Fatalf("must be able to SetWriteDeadline") + } + deadlineFired, checkDone := make(chan bool), make(chan bool) + go func() { + select { + case <-time.After(6 * timeout): + + buf := make([]byte, 1<<20) + stacklen := runtime.Stack(buf, true) + fmt.Printf("\n%s\n\n", buf[:stacklen]) + + panic(fmt.Sprintf("%v deadline didn't fire after %v", + timeout, 6*timeout)) + case <-deadlineFired: + close(checkDone) + } + }() + + t0 := time.Now() + _, err = nc.Write(writebuf) + elap := time.Since(t0) + close(deadlineFired) + <-checkDone + if err == nil { + t.Fatalf("Write beyond deadline should have returned an error") + } + if elap < timeout { + t.Fatalf("Write returned before deadline timeout") + } + fmt.Printf("good, err = '%v' after %s.\n", err, elap) + + // should be able to write small ok... + _, err = nc.Write(writebuf[:5]) + if err != nil { + t.Fatalf("small write of 5 to a capacity 10 buffer should work fine: '%s'", err) + } +} diff --git a/server/monitor.go b/server/monitor.go index 6cce144b7f3..dc473cc2047 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -473,10 +473,11 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { v.TotalConnections = s.totalClients v.Routes = len(s.routes) v.Remotes = len(s.remotes) - v.InMsgs = s.inMsgs - v.InBytes = s.inBytes - v.OutMsgs = s.outMsgs - v.OutBytes = s.outBytes + // atomic loads avoid data races with client.go:298,951 + v.InMsgs = atomic.LoadInt64(&s.inMsgs) + v.InBytes = atomic.LoadInt64(&s.inBytes) + v.OutMsgs = atomic.LoadInt64(&s.outMsgs) + v.OutBytes = atomic.LoadInt64(&s.outBytes) v.SlowConsumers = s.slowConsumers v.Subscriptions = s.sl.Count() s.httpReqStats[VarzPath]++ diff --git a/server/opts.go b/server/opts.go index 8b97fa451a6..46dc7d13c55 100644 --- a/server/opts.go +++ b/server/opts.go @@ -71,10 +71,10 @@ type Options struct { ProfPort int `json:"-"` PidFile string `json:"-"` LogFile string `json:"-"` - Syslog bool `json:"-"` RemoteSyslog string `json:"-"` Routes []*url.URL `json:"-"` RoutesStr string `json:"-"` + Syslog bool `json:"-"` TLSTimeout float64 `json:"tls_timeout"` TLS bool `json:"-"` TLSVerify bool `json:"-"` @@ -83,6 +83,12 @@ type Options struct { TLSCaCert string `json:"-"` TLSConfig *tls.Config `json:"-"` WriteDeadline time.Duration `json:"-"` + + InternalCli []InternalClient `json:"-"` + HealthAgent bool `json:"health_agent"` + HealthRank int `json:"health_rank"` + HealthLease time.Duration `json:"health_lease"` + HealthBeat time.Duration `json:"health_beat"` } // Configuration file authorization section. @@ -225,6 +231,14 @@ func ProcessConfigFile(configFile string) (*Options, error) { opts.PingInterval = time.Duration(int(v.(int64))) * time.Second case "ping_max": opts.MaxPingsOut = int(v.(int64)) + case "health_rank": + opts.HealthRank = int(v.(int64)) + case "health_lease": + opts.HealthLease = v.(time.Duration) + case "health_beat": + opts.HealthBeat = v.(time.Duration) + case "health_agent": + opts.HealthAgent = v.(bool) case "tls": tlsm := v.(map[string]interface{}) tc, err := parseTLS(tlsm) diff --git a/server/route.go b/server/route.go index 6e004bc16ed..1363e2864a7 100644 --- a/server/route.go +++ b/server/route.go @@ -661,6 +661,7 @@ func (s *Server) StartRouting(clientListenReady chan struct{}) { TLSVerify: tlsReq, MaxPayload: s.info.MaxPayload, ClientConnectURLs: clientConnectURLs, + HealthRank: s.opts.HealthRank, } // Check for Auth items if s.opts.Cluster.Username != "" { diff --git a/server/server.go b/server/server.go index 7d616d2b0dd..580c7647f8a 100644 --- a/server/server.go +++ b/server/server.go @@ -39,6 +39,7 @@ type Info struct { MaxPayload int `json:"max_payload"` IP string `json:"ip,omitempty"` ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to. + HealthRank int `json:"health_rank"` // lowest rank wins leader election. // Used internally for quick look-ups. clientConnectURLs map[string]struct{} @@ -77,6 +78,7 @@ type Server struct { grRunning bool grWG sync.WaitGroup // to wait on various go routines cproto int64 // number of clients supporting async INFO + icli iCli // in-process internal clients } // Make sure all are 64bits for atomic use @@ -108,6 +110,7 @@ func New(opts *Options) *Server { TLSVerify: verify, MaxPayload: opts.MaxPayload, clientConnectURLs: make(map[string]struct{}), + HealthRank: opts.HealthRank, } s := &Server{ @@ -118,6 +121,7 @@ func New(opts *Options) *Server { trace: opts.Trace, done: make(chan bool, 1), start: time.Now(), + icli: iCli{configured: opts.InternalCli}, } s.mu.Lock() @@ -267,10 +271,53 @@ func (s *Server) Start() { s.StartProfiler() } + // Run the internal clients in + // s.icli.configured. + // + // Retain only those started + // successfully in s.icli.running. + // + s.icli.mu.Lock() + go func(info Info, opts Options) { + defer s.icli.mu.Unlock() + + // wait for server to be accepting clients + select { + case <-s.rcQuit: + return + case <-clientListenReady: + } + + n := len(s.icli.configured) + if n == 0 { + return + } + Debugf("Starting the %v internal client(s).", n) + for _, ic := range s.icli.configured { + srv, err := ic.Start(info, opts, log.logger) + if err == nil { + if srv != nil { + s.InternalCliRegisterCallback(srv) + } + Noticef("InternalClient ['%s'] started.", ic.Name()) + } else { + Errorf("InternalClient ['%s'] failed to Start(): %s", ic.Name(), err) + } + } + }(s.info, *s.opts) + // Wait for clients. s.AcceptLoop(clientListenReady) } +// InternalCliRegisterCallback is public only for testing. +func (s *Server) InternalCliRegisterCallback(srv net.Conn) { + s.startGoRoutine(func() { + s.createClient(srv) + s.grWG.Done() + }) +} + // Shutdown will shutdown the server instance by kicking out the AcceptLoop // and closing all associated clients. func (s *Server) Shutdown() { @@ -332,6 +379,13 @@ func (s *Server) Shutdown() { // Release the solicited routes connect go routines. close(s.rcQuit) + // stop any internal clients + s.icli.mu.Lock() + for _, ic := range s.icli.configured { + ic.Stop() + } + s.icli.mu.Unlock() + s.mu.Unlock() // Close client and route connections @@ -531,6 +585,11 @@ func (s *Server) startMonitoring(secure bool) { func (s *Server) createClient(conn net.Conn) *client { c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: s.info.MaxPayload, start: time.Now()} + _, isInternal := conn.(LocalInternalClient) + if isInternal { + c.typ = HEALTH + } + // Grab JSON info string s.mu.Lock() info := s.infoJSON @@ -548,7 +607,7 @@ func (s *Server) createClient(conn net.Conn) *client { c.Debugf("Client connection created") // Check for Auth - if authRequired { + if !isInternal && authRequired { c.setAuthTimer(secondsToDuration(s.opts.AuthTimeout)) } @@ -582,7 +641,7 @@ func (s *Server) createClient(conn net.Conn) *client { c.mu.Lock() // Check for TLS - if tlsRequired { + if !isInternal && tlsRequired { c.Debugf("Starting TLS client connection handshake") c.nc = tls.Server(c.nc, s.opts.TLSConfig) conn := c.nc.(*tls.Conn) @@ -613,7 +672,7 @@ func (s *Server) createClient(conn net.Conn) *client { return c } - if tlsRequired { + if !isInternal && tlsRequired { // Rewrap bw c.bw = bufio.NewWriterSize(c.nc, startBufSize) } @@ -621,12 +680,14 @@ func (s *Server) createClient(conn net.Conn) *client { // Do final client initialization // Set the Ping timer - c.setPingTimer() + if !isInternal { + c.setPingTimer() + } // Spin up the read loop. s.startGoRoutine(func() { c.readLoop() }) - if tlsRequired { + if !isInternal && tlsRequired { c.Debugf("TLS handshake complete") cs := c.nc.(*tls.Conn).ConnectionState() c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite)) @@ -755,6 +816,8 @@ func (s *Server) checkAuth(c *client) bool { return s.checkClientAuth(c) case ROUTER: return s.checkRouterAuth(c) + case HEALTH: + return true default: return false }