Skip to content

Commit

Permalink
Optimize node mgr by eliminating duplicate heartbeats (#214)
Browse files Browse the repository at this point in the history
- Eliminate redundant heartbeat ping sent to identical full nodes.
  • Loading branch information
wanliqun authored Nov 4, 2024
1 parent 913e7e4 commit db84e68
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 69 deletions.
8 changes: 4 additions & 4 deletions node/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ var (
func Factory() *factory {
cfxOnce.Do(func() {
cfxFactory = newFactory(
func(group Group, name, url string, hm HealthMonitor) (Node, error) {
return NewCfxNode(group, name, url, hm)
func(group Group, name, url string) (Node, error) {
return NewCfxNode(group, name, url)
},
cfg.Endpoint, urlCfg, cfg.Router.NodeRPCURL,
)
Expand All @@ -33,8 +33,8 @@ func Factory() *factory {
func EthFactory() *factory {
ethOnce.Do(func() {
ethFactory = newFactory(
func(group Group, name, url string, hm HealthMonitor) (Node, error) {
return NewEthNode(group, name, url, hm)
func(group Group, name, url string) (Node, error) {
return NewEthNode(group, name, url)
},
cfg.EthEndpoint, ethUrlCfg, cfg.Router.EthNodeRPCURL,
)
Expand Down
21 changes: 6 additions & 15 deletions node/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// nodeFactory factory method to create node instance
type nodeFactory func(group Group, name, url string, hm HealthMonitor) (Node, error)
type nodeFactory func(group Group, name, url string) (Node, error)

// Manager manages full node cluster, including:
// 1. Monitor node health and disable/enable full node automatically.
Expand All @@ -23,8 +23,9 @@ type Manager struct {
resolver RepartitionResolver // support repartition for hash ring
mu sync.RWMutex

nodeName2Epochs map[string]uint64 // node name => epoch
midEpoch uint64 // middle epoch of managed full nodes.
// health monitor
monitorStatuses map[string]monitorStatus // node name => monitor status
midEpoch uint64 // middle epoch of managed full nodes.
}

func NewManager(group Group) *Manager {
Expand All @@ -36,21 +37,11 @@ func NewManagerWithRepartition(group Group, resolver RepartitionResolver) *Manag
group: group,
nodes: make(map[string]Node),
resolver: resolver,
nodeName2Epochs: make(map[string]uint64),
monitorStatuses: make(map[string]monitorStatus),
hashRing: consistent.New(nil, cfg.HashRingRaw()),
}
}

// Close closes the manager to reclaim resources
func (m *Manager) Close() {
m.mu.Lock()
defer m.mu.Unlock()

for _, node := range m.nodes {
node.Close()
}
}

// Add adds fullnode to monitor
func (m *Manager) Add(nodes ...Node) {
m.mu.Lock()
Expand All @@ -73,7 +64,7 @@ func (m *Manager) Remove(nodeNames ...string) {
if node, ok := m.nodes[nn]; ok {
node.Close()
delete(m.nodes, nn)
delete(m.nodeName2Epochs, nn)
delete(m.monitorStatuses, nn)
m.hashRing.Remove(nn)
}
}
Expand Down
70 changes: 62 additions & 8 deletions node/manager_monitor.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package node

import (
"sort"
"slices"
"sync/atomic"
"time"

"github.com/sirupsen/logrus"
)

// monitorStatus is the monitor status of managed nodes.
type monitorStatus struct {
epoch uint64 // the latest epoch height
unhealthy bool // whether the node is unhealthy
unhealthReportAt time.Time // the last unhealthy report time
}

// Implementations for HealthMonitor interface.

// HealthyEpoch returns the middle epoch height collected from managed cluster nodes,
Expand All @@ -15,29 +23,55 @@ func (m *Manager) HealthyEpoch() uint64 {
return atomic.LoadUint64(&m.midEpoch)
}

// HealthStatus checks the health status of a full node and returns whether it is unhealthy
// along with the time when it was reported as unhealthy.
//
// Parameters:
// - nodeName: The name of the node to check.
//
// Returns:
// - isUnhealthy: A boolean indicating if the node is unhealthy.
// - reportedAt: The time when the node was reported as unhealthy.
func (m *Manager) HealthStatus(nodeName string) (isUnhealthy bool, reportedAt time.Time) {
m.mu.Lock()
defer m.mu.Unlock()

status := m.monitorStatuses[nodeName]
return status.unhealthy, status.unhealthReportAt
}

// ReportEpoch reports latest epoch height of managed node to manager.
func (m *Manager) ReportEpoch(nodeName string, epoch uint64) {
m.mu.Lock()
defer m.mu.Unlock()

m.nodeName2Epochs[nodeName] = epoch
if len(m.nodeName2Epochs) == 1 {
m.updateEpoch(nodeName, epoch)

if len(m.monitorStatuses) == 1 {
atomic.StoreUint64(&m.midEpoch, epoch)
return
}

var epochs []int
for _, epoch := range m.nodeName2Epochs {
epochs = append(epochs, int(epoch))
var epochs []uint64
for _, ms := range m.monitorStatuses {
epochs = append(epochs, ms.epoch)
}

sort.Ints(epochs)
slices.Sort(epochs)

atomic.StoreUint64(&m.midEpoch, uint64(epochs[len(epochs)/2]))
atomic.StoreUint64(&m.midEpoch, epochs[len(epochs)/2])
}

func (m *Manager) updateEpoch(nodeName string, epoch uint64) {
status := m.monitorStatuses[nodeName]
status.epoch = epoch
m.monitorStatuses[nodeName] = status
}

// ReportUnhealthy reports unhealthy status of managed node to manager.
func (m *Manager) ReportUnhealthy(nodeName string, remind bool, reason error) {
m.updateUnhealthy(nodeName)

logger := logrus.WithFields(logrus.Fields{
"node": nodeName,
"group": m.group,
Expand All @@ -56,6 +90,16 @@ func (m *Manager) ReportUnhealthy(nodeName string, remind bool, reason error) {
// FIXME update repartition cache if configured
}

func (m *Manager) updateUnhealthy(nodeName string) {
m.mu.Lock()
defer m.mu.Unlock()

status := m.monitorStatuses[nodeName]
status.unhealthy = true
status.unhealthReportAt = time.Now()
m.monitorStatuses[nodeName] = status
}

// ReportHealthy reports healthy status of managed node to manager.
func (m *Manager) ReportHealthy(nodeName string) {
// alert
Expand All @@ -68,3 +112,13 @@ func (m *Manager) ReportHealthy(nodeName string) {
logrus.WithField("node", nodeName).Error("Node not found in manager")
}
}

func (m *Manager) updateHealthy(nodeName string) {
m.mu.Lock()
defer m.mu.Unlock()

status := m.monitorStatuses[nodeName]
status.unhealthy = false
status.unhealthReportAt = time.Time{}
m.monitorStatuses[nodeName] = status
}
62 changes: 55 additions & 7 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package node

import (
"context"
"sync"
"sync/atomic"
"time"

Expand All @@ -26,17 +27,21 @@ type Node interface {
Name() string
Url() string
Status() Status

LatestEpochNumber() (uint64, error)

Register(HealthMonitor)
Deregister(HealthMonitor)

Close()
}

type baseNode struct {
mu sync.Mutex
name string
url string
cancel context.CancelFunc
atomicStatus atomic.Value
monitors []HealthMonitor
}

func newBaseNode(name, url string, cancel context.CancelFunc) *baseNode {
Expand All @@ -62,7 +67,7 @@ func (n *baseNode) String() string {
}

// monitor periodically heartbeats with node to monitor health status
func (n *baseNode) monitor(ctx context.Context, node Node, hm HealthMonitor) {
func (n *baseNode) monitor(ctx context.Context, node Node) {
ticker := time.NewTicker(cfg.Monitor.Interval)
defer ticker.Stop()

Expand All @@ -73,14 +78,57 @@ func (n *baseNode) monitor(ctx context.Context, node Node, hm HealthMonitor) {
return
case <-ticker.C:
status := n.atomicStatus.Load().(Status)
status.Update(node, hm)
status.Update(node)

monitors := n.snapshot()
for _, hm := range monitors {
status.Report(hm)
}

logrus.WithFields(logrus.Fields{
"numHealthMonitors": len(monitors),
"status": status,
}).Debug("Node health status reported")

n.atomicStatus.Store(status)
}
}
}

// snapshot snapshots all health monitors.
func (n *baseNode) snapshot() []HealthMonitor {
n.mu.Lock()
defer n.mu.Unlock()

res := make([]HealthMonitor, 0, len(n.monitors))
return append(res, n.monitors...)
}

// Register registers a health monitor to node.
func (n *baseNode) Register(hm HealthMonitor) {
n.mu.Lock()
defer n.mu.Unlock()

n.monitors = append(n.monitors, hm)
}

// Deregister deregisters a health monitor from node.
func (n *baseNode) Deregister(hm HealthMonitor) {
n.mu.Lock()
defer n.mu.Unlock()

monitors := []HealthMonitor{}
for _, m := range n.monitors {
if m != hm {
monitors = append(monitors, m)
}
}
n.monitors = monitors
}

func (n *baseNode) Close() {
n.cancel()

status := n.Status()
status.Close()
}
Expand All @@ -93,7 +141,7 @@ type EthNode struct {

// NewEthNode creates an instance of evm space node and start to monitor
// node health in a separate goroutine until node closed.
func NewEthNode(group Group, name, url string, hm HealthMonitor) (*EthNode, error) {
func NewEthNode(group Group, name, url string) (*EthNode, error) {
eth, err := rpc.NewEthClient(url)
if err != nil {
return nil, err
Expand All @@ -107,7 +155,7 @@ func NewEthNode(group Group, name, url string, hm HealthMonitor) (*EthNode, erro

n.atomicStatus.Store(NewStatus(group, name))

go n.monitor(ctx, n, hm)
go n.monitor(ctx, n)

return n, nil
}
Expand Down Expand Up @@ -135,7 +183,7 @@ type CfxNode struct {

// NewCfxNode creates an instance of core space fullnode and start to monitor
// node health in a separate goroutine until node closed.
func NewCfxNode(group Group, name, url string, hm HealthMonitor) (*CfxNode, error) {
func NewCfxNode(group Group, name, url string) (*CfxNode, error) {
cfx, err := rpc.NewCfxClient(url)
if err != nil {
return nil, err
Expand All @@ -149,7 +197,7 @@ func NewCfxNode(group Group, name, url string, hm HealthMonitor) (*CfxNode, erro

n.atomicStatus.Store(NewStatus(group, name))

go n.monitor(ctx, n, hm)
go n.monitor(ctx, n)

return n, nil
}
Expand Down
Loading

0 comments on commit db84e68

Please sign in to comment.