Skip to content

Commit

Permalink
Add cluster ping to ensure node health
Browse files Browse the repository at this point in the history
  • Loading branch information
NHAS committed May 1, 2024
1 parent d11c0a9 commit e61068e
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 19 deletions.
33 changes: 33 additions & 0 deletions internal/data/clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"strconv"
"strings"
"time"

"github.com/NHAS/wag/internal/config"
"go.etcd.io/etcd/client/pkg/v3/types"
Expand Down Expand Up @@ -58,9 +59,41 @@ func StepDown() error {
}

func GetMembers() []*membership.Member {

return etcdServer.Server.Cluster().Members()
}

func GetLastPing(idHex string) (time.Time, error) {
id, err := strconv.ParseUint(idHex, 16, 64)
if err != nil {
return time.Time{}, err
}

if etcdServer.Server.Cluster().Member(types.ID(id)) == nil {
return time.Time{}, errors.New("id is not part of cluster")
}

lastPing, err := etcd.Get(context.Background(), path.Join(NodeEvents, idHex, "ping"))
if err != nil {
return time.Time{}, err
}

if lastPing.Count == 0 {
return time.Time{}, errors.New("node missing")
}

if len(lastPing.Kvs) != 1 {
return time.Time{}, errors.New("multiple keys match ping")
}

t, err := time.Parse(time.RFC1123Z, string(lastPing.Kvs[0].Value))
if err != nil {
return t, err
}

return t, nil
}

func SetDrained(idHex string, on bool) error {
_, err := strconv.ParseUint(idHex, 16, 64)
if err != nil {
Expand Down
49 changes: 33 additions & 16 deletions internal/data/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
clusterHealthListeners = map[string]func(string){}

EventsQueue = queue.NewQueue(40)
exit = make(chan bool)
)

func RegisterEventListener[T any](path string, isPrefix bool, f func(key string, current, previous T, et EventType) error) (string, error) {
Expand Down Expand Up @@ -173,29 +174,45 @@ func notifyClusterHealthListeners(event string) {

func checkClusterHealth() {

for {
leaderMonitor := time.NewTicker(1 * time.Second)
go func() {
for range leaderMonitor.C {
if etcdServer.Server.Leader() == 0 {

select {
case <-etcdServer.Server.LeaderChangedNotify():
notifyHealthy()
notifyClusterHealthListeners("electing")
time.Sleep(etcdServer.Server.Cfg.ElectionTimeout() * 2)

case <-time.After(1 * time.Second):
testState()
if etcdServer.Server.Leader() == 0 {
notifyClusterHealthListeners("dead")
}
}
}
}()

clusterMonitor := time.NewTicker(5 * time.Second)
go func() {
for range clusterMonitor.C {
testCluster()
}
}()

<-exit

log.Println("etcd server was instructed to terminate")
leaderMonitor.Stop()
clusterMonitor.Stop()

}
}

func testState() {
if etcdServer.Server.Leader() == 0 {
notifyClusterHealthListeners("electing")
<-time.After(etcdServer.Server.Cfg.ElectionTimeout() * 2)
if etcdServer.Server.Leader() == 0 {
notifyClusterHealthListeners("dead")
return
}
// Intentional drop through
func testCluster() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)

_, err := etcd.Put(ctx, path.Join(NodeEvents, GetServerID(), "ping"), time.Now().Format(time.RFC1123Z))
cancel()
if err != nil {
log.Println("unable to write liveness value")
notifyClusterHealthListeners("dead")
return
}

notifyHealthy()
Expand Down
1 change: 1 addition & 0 deletions internal/data/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ func migrateFromSql(database *sql.DB) error {
}

func TearDown() {
close(exit)
if etcdServer != nil {

etcd.Close()
Expand Down
25 changes: 24 additions & 1 deletion ui/clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"log"
"net/http"
"time"

"github.com/NHAS/wag/internal/data"
"go.etcd.io/etcd/client/pkg/v3/types"
Expand All @@ -14,6 +15,7 @@ type MembershipDTO struct {
*membership.Member
IsDrained bool

Ping string
Status string
}

Expand Down Expand Up @@ -62,15 +64,36 @@ func clusterMembersUI(w http.ResponseWriter, r *http.Request) {
if drained {
status = "drained"
} else if !members[i].IsStarted() {
status = "connecting..."
status = "wait for first connection..."
} else if members[i].IsLearner {
status = "learner"
}

ping := ""
if status != "learner" {
lastPing, err := data.GetLastPing(members[i].ID.String())
if err != nil {
log.Println("unable to fetch last ping: ", err)
status = "no last ping"
} else {

if lastPing.Before(time.Now().Add(-6 * time.Second)) {
status += "(lagging ping)"
}

if lastPing.Before(time.Now().Add(-14 * time.Second)) {
status = "dead"
}

ping = lastPing.Format(time.RFC822)
}
}

d.Members = append(d.Members, MembershipDTO{
Member: members[i],
IsDrained: drained,
Status: status,
Ping: ping,
})

}
Expand Down
11 changes: 10 additions & 1 deletion ui/templates/cluster/members.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

{{range $index, $val := .Members}}
<div class="col fit-content">
<div class='card border-left-{{if (eq .Status "healthy")}}success{{else}}warning{{end}} shadow mb-2'>
<div
class='card border-left-{{if (eq .Status "healthy")}}success{{else if (eq .Status "dead")}}danger{{else}}warning{{end}} shadow mb-2'>
<div class="card-body">

<div class="card-title ">
Expand Down Expand Up @@ -56,6 +57,14 @@ <h5 class="font-weight-bold text-truncate">{{if
{{.Status}}
</div>
</div>
<div class="row mb-1">
<div class="col-4">
Last Ping:
</div>
<div class="col">
{{if .Ping}}{{.Ping}}{{else}}N/A{{end}}
</div>
</div>
<div class="row mt-2">
<div class="col-4">
Address{{if gt (len .PeerURLs) 1}}es{{end}}:
Expand Down
2 changes: 1 addition & 1 deletion ui/ui_webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func Teardown() {
}

if config.Values.ManagementUI.Enabled {
log.Println("Stopped MFA portal")
log.Println("Stopped Management UI")
}

}
Expand Down

0 comments on commit e61068e

Please sign in to comment.