diff --git a/internal/data/clustering.go b/internal/data/clustering.go index f95a836e..f476cf3a 100644 --- a/internal/data/clustering.go +++ b/internal/data/clustering.go @@ -10,6 +10,7 @@ import ( "path" "strconv" "strings" + "time" "github.com/NHAS/wag/internal/config" "go.etcd.io/etcd/client/pkg/v3/types" @@ -58,9 +59,41 @@ func StepDown() error { } func GetMembers() []*membership.Member { + return etcdServer.Server.Cluster().Members() } +func GetLastPing(idHex string) (time.Time, error) { + id, err := strconv.ParseUint(idHex, 16, 64) + if err != nil { + return time.Time{}, err + } + + if etcdServer.Server.Cluster().Member(types.ID(id)) == nil { + return time.Time{}, errors.New("id is not part of cluster") + } + + lastPing, err := etcd.Get(context.Background(), path.Join(NodeEvents, idHex, "ping")) + if err != nil { + return time.Time{}, err + } + + if lastPing.Count == 0 { + return time.Time{}, errors.New("node missing") + } + + if len(lastPing.Kvs) != 1 { + return time.Time{}, errors.New("multiple keys match ping") + } + + t, err := time.Parse(time.RFC1123Z, string(lastPing.Kvs[0].Value)) + if err != nil { + return t, err + } + + return t, nil +} + func SetDrained(idHex string, on bool) error { _, err := strconv.ParseUint(idHex, 16, 64) if err != nil { diff --git a/internal/data/events.go b/internal/data/events.go index 2e63d184..7f7e3015 100644 --- a/internal/data/events.go +++ b/internal/data/events.go @@ -53,6 +53,7 @@ var ( clusterHealthListeners = map[string]func(string){} EventsQueue = queue.NewQueue(40) + exit = make(chan bool) ) func RegisterEventListener[T any](path string, isPrefix bool, f func(key string, current, previous T, et EventType) error) (string, error) { @@ -173,29 +174,45 @@ func notifyClusterHealthListeners(event string) { func checkClusterHealth() { - for { + leaderMonitor := time.NewTicker(1 * time.Second) + go func() { + for range leaderMonitor.C { + if etcdServer.Server.Leader() == 0 { - select { - case <-etcdServer.Server.LeaderChangedNotify(): - notifyHealthy() + notifyClusterHealthListeners("electing") + time.Sleep(etcdServer.Server.Cfg.ElectionTimeout() * 2) - case <-time.After(1 * time.Second): - testState() + if etcdServer.Server.Leader() == 0 { + notifyClusterHealthListeners("dead") + } + } + } + }() + clusterMonitor := time.NewTicker(5 * time.Second) + go func() { + for range clusterMonitor.C { + testCluster() } + }() + + <-exit + + log.Println("etcd server was instructed to terminate") + leaderMonitor.Stop() + clusterMonitor.Stop() - } } -func testState() { - if etcdServer.Server.Leader() == 0 { - notifyClusterHealthListeners("electing") - <-time.After(etcdServer.Server.Cfg.ElectionTimeout() * 2) - if etcdServer.Server.Leader() == 0 { - notifyClusterHealthListeners("dead") - return - } - // Intentional drop through +func testCluster() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + + _, err := etcd.Put(ctx, path.Join(NodeEvents, GetServerID(), "ping"), time.Now().Format(time.RFC1123Z)) + cancel() + if err != nil { + log.Println("unable to write liveness value") + notifyClusterHealthListeners("dead") + return } notifyHealthy() diff --git a/internal/data/init.go b/internal/data/init.go index 0d9a6c32..427a8c80 100644 --- a/internal/data/init.go +++ b/internal/data/init.go @@ -462,6 +462,7 @@ func migrateFromSql(database *sql.DB) error { } func TearDown() { + close(exit) if etcdServer != nil { etcd.Close() diff --git a/ui/clustering.go b/ui/clustering.go index ce61b3cb..4cfe7314 100644 --- a/ui/clustering.go +++ b/ui/clustering.go @@ -4,6 +4,7 @@ import ( "encoding/json" "log" "net/http" + "time" "github.com/NHAS/wag/internal/data" "go.etcd.io/etcd/client/pkg/v3/types" @@ -14,6 +15,7 @@ type MembershipDTO struct { *membership.Member IsDrained bool + Ping string Status string } @@ -62,15 +64,36 @@ func clusterMembersUI(w http.ResponseWriter, r *http.Request) { if drained { status = "drained" } else if !members[i].IsStarted() { - status = "connecting..." + status = "wait for first connection..." } else if members[i].IsLearner { status = "learner" } + ping := "" + if status != "learner" { + lastPing, err := data.GetLastPing(members[i].ID.String()) + if err != nil { + log.Println("unable to fetch last ping: ", err) + status = "no last ping" + } else { + + if lastPing.Before(time.Now().Add(-6 * time.Second)) { + status += "(lagging ping)" + } + + if lastPing.Before(time.Now().Add(-14 * time.Second)) { + status = "dead" + } + + ping = lastPing.Format(time.RFC822) + } + } + d.Members = append(d.Members, MembershipDTO{ Member: members[i], IsDrained: drained, Status: status, + Ping: ping, }) } diff --git a/ui/templates/cluster/members.html b/ui/templates/cluster/members.html index 1108a00e..7f0a189b 100755 --- a/ui/templates/cluster/members.html +++ b/ui/templates/cluster/members.html @@ -14,7 +14,8 @@ {{range $index, $val := .Members}}