diff --git a/node/factory.go b/node/factory.go index 6e15cd4a..f7a58ca0 100644 --- a/node/factory.go +++ b/node/factory.go @@ -19,8 +19,8 @@ var ( func Factory() *factory { cfxOnce.Do(func() { cfxFactory = newFactory( - func(group Group, name, url string, hm HealthMonitor) (Node, error) { - return NewCfxNode(group, name, url, hm) + func(group Group, name, url string) (Node, error) { + return NewCfxNode(group, name, url) }, cfg.Endpoint, urlCfg, cfg.Router.NodeRPCURL, ) @@ -33,8 +33,8 @@ func Factory() *factory { func EthFactory() *factory { ethOnce.Do(func() { ethFactory = newFactory( - func(group Group, name, url string, hm HealthMonitor) (Node, error) { - return NewEthNode(group, name, url, hm) + func(group Group, name, url string) (Node, error) { + return NewEthNode(group, name, url) }, cfg.EthEndpoint, ethUrlCfg, cfg.Router.EthNodeRPCURL, ) diff --git a/node/manager.go b/node/manager.go index 4e76c192..19dbd50e 100644 --- a/node/manager.go +++ b/node/manager.go @@ -10,7 +10,7 @@ import ( ) // nodeFactory factory method to create node instance -type nodeFactory func(group Group, name, url string, hm HealthMonitor) (Node, error) +type nodeFactory func(group Group, name, url string) (Node, error) // Manager manages full node cluster, including: // 1. Monitor node health and disable/enable full node automatically. @@ -23,8 +23,9 @@ type Manager struct { resolver RepartitionResolver // support repartition for hash ring mu sync.RWMutex - nodeName2Epochs map[string]uint64 // node name => epoch - midEpoch uint64 // middle epoch of managed full nodes. + // health monitor + monitorStatuses map[string]monitorStatus // node name => monitor status + midEpoch uint64 // middle epoch of managed full nodes. } func NewManager(group Group) *Manager { @@ -36,21 +37,11 @@ func NewManagerWithRepartition(group Group, resolver RepartitionResolver) *Manag group: group, nodes: make(map[string]Node), resolver: resolver, - nodeName2Epochs: make(map[string]uint64), + monitorStatuses: make(map[string]monitorStatus), hashRing: consistent.New(nil, cfg.HashRingRaw()), } } -// Close closes the manager to reclaim resources -func (m *Manager) Close() { - m.mu.Lock() - defer m.mu.Unlock() - - for _, node := range m.nodes { - node.Close() - } -} - // Add adds fullnode to monitor func (m *Manager) Add(nodes ...Node) { m.mu.Lock() @@ -73,7 +64,7 @@ func (m *Manager) Remove(nodeNames ...string) { if node, ok := m.nodes[nn]; ok { node.Close() delete(m.nodes, nn) - delete(m.nodeName2Epochs, nn) + delete(m.monitorStatuses, nn) m.hashRing.Remove(nn) } } diff --git a/node/manager_monitor.go b/node/manager_monitor.go index 0209ba19..ab6edfdc 100644 --- a/node/manager_monitor.go +++ b/node/manager_monitor.go @@ -1,12 +1,20 @@ package node import ( - "sort" + "slices" "sync/atomic" + "time" "github.com/sirupsen/logrus" ) +// monitorStatus is the monitor status of managed nodes. +type monitorStatus struct { + epoch uint64 // the latest epoch height + unhealthy bool // whether the node is unhealthy + unhealthReportAt time.Time // the last unhealthy report time +} + // Implementations for HealthMonitor interface. // HealthyEpoch returns the middle epoch height collected from managed cluster nodes, @@ -15,29 +23,55 @@ func (m *Manager) HealthyEpoch() uint64 { return atomic.LoadUint64(&m.midEpoch) } +// HealthStatus checks the health status of a full node and returns whether it is unhealthy +// along with the time when it was reported as unhealthy. +// +// Parameters: +// - nodeName: The name of the node to check. +// +// Returns: +// - isUnhealthy: A boolean indicating if the node is unhealthy. +// - reportedAt: The time when the node was reported as unhealthy. +func (m *Manager) HealthStatus(nodeName string) (isUnhealthy bool, reportedAt time.Time) { + m.mu.Lock() + defer m.mu.Unlock() + + status := m.monitorStatuses[nodeName] + return status.unhealthy, status.unhealthReportAt +} + // ReportEpoch reports latest epoch height of managed node to manager. func (m *Manager) ReportEpoch(nodeName string, epoch uint64) { m.mu.Lock() defer m.mu.Unlock() - m.nodeName2Epochs[nodeName] = epoch - if len(m.nodeName2Epochs) == 1 { + m.updateEpoch(nodeName, epoch) + + if len(m.monitorStatuses) == 1 { atomic.StoreUint64(&m.midEpoch, epoch) return } - var epochs []int - for _, epoch := range m.nodeName2Epochs { - epochs = append(epochs, int(epoch)) + var epochs []uint64 + for _, ms := range m.monitorStatuses { + epochs = append(epochs, ms.epoch) } - sort.Ints(epochs) + slices.Sort(epochs) - atomic.StoreUint64(&m.midEpoch, uint64(epochs[len(epochs)/2])) + atomic.StoreUint64(&m.midEpoch, epochs[len(epochs)/2]) +} + +func (m *Manager) updateEpoch(nodeName string, epoch uint64) { + status := m.monitorStatuses[nodeName] + status.epoch = epoch + m.monitorStatuses[nodeName] = status } // ReportUnhealthy reports unhealthy status of managed node to manager. func (m *Manager) ReportUnhealthy(nodeName string, remind bool, reason error) { + m.updateUnhealthy(nodeName) + logger := logrus.WithFields(logrus.Fields{ "node": nodeName, "group": m.group, @@ -56,6 +90,16 @@ func (m *Manager) ReportUnhealthy(nodeName string, remind bool, reason error) { // FIXME update repartition cache if configured } +func (m *Manager) updateUnhealthy(nodeName string) { + m.mu.Lock() + defer m.mu.Unlock() + + status := m.monitorStatuses[nodeName] + status.unhealthy = true + status.unhealthReportAt = time.Now() + m.monitorStatuses[nodeName] = status +} + // ReportHealthy reports healthy status of managed node to manager. func (m *Manager) ReportHealthy(nodeName string) { // alert @@ -68,3 +112,13 @@ func (m *Manager) ReportHealthy(nodeName string) { logrus.WithField("node", nodeName).Error("Node not found in manager") } } + +func (m *Manager) updateHealthy(nodeName string) { + m.mu.Lock() + defer m.mu.Unlock() + + status := m.monitorStatuses[nodeName] + status.unhealthy = false + status.unhealthReportAt = time.Time{} + m.monitorStatuses[nodeName] = status +} diff --git a/node/node.go b/node/node.go index 52256ec9..4f966598 100644 --- a/node/node.go +++ b/node/node.go @@ -2,6 +2,7 @@ package node import ( "context" + "sync" "sync/atomic" "time" @@ -26,17 +27,21 @@ type Node interface { Name() string Url() string Status() Status - LatestEpochNumber() (uint64, error) + Register(HealthMonitor) + Deregister(HealthMonitor) + Close() } type baseNode struct { + mu sync.Mutex name string url string cancel context.CancelFunc atomicStatus atomic.Value + monitors []HealthMonitor } func newBaseNode(name, url string, cancel context.CancelFunc) *baseNode { @@ -62,7 +67,7 @@ func (n *baseNode) String() string { } // monitor periodically heartbeats with node to monitor health status -func (n *baseNode) monitor(ctx context.Context, node Node, hm HealthMonitor) { +func (n *baseNode) monitor(ctx context.Context, node Node) { ticker := time.NewTicker(cfg.Monitor.Interval) defer ticker.Stop() @@ -73,14 +78,57 @@ func (n *baseNode) monitor(ctx context.Context, node Node, hm HealthMonitor) { return case <-ticker.C: status := n.atomicStatus.Load().(Status) - status.Update(node, hm) + status.Update(node) + + monitors := n.snapshot() + for _, hm := range monitors { + status.Report(hm) + } + + logrus.WithFields(logrus.Fields{ + "numHealthMonitors": len(monitors), + "status": status, + }).Debug("Node health status reported") + n.atomicStatus.Store(status) } } } +// snapshot snapshots all health monitors. +func (n *baseNode) snapshot() []HealthMonitor { + n.mu.Lock() + defer n.mu.Unlock() + + res := make([]HealthMonitor, 0, len(n.monitors)) + return append(res, n.monitors...) +} + +// Register registers a health monitor to node. +func (n *baseNode) Register(hm HealthMonitor) { + n.mu.Lock() + defer n.mu.Unlock() + + n.monitors = append(n.monitors, hm) +} + +// Deregister deregisters a health monitor from node. +func (n *baseNode) Deregister(hm HealthMonitor) { + n.mu.Lock() + defer n.mu.Unlock() + + monitors := []HealthMonitor{} + for _, m := range n.monitors { + if m != hm { + monitors = append(monitors, m) + } + } + n.monitors = monitors +} + func (n *baseNode) Close() { n.cancel() + status := n.Status() status.Close() } @@ -93,7 +141,7 @@ type EthNode struct { // NewEthNode creates an instance of evm space node and start to monitor // node health in a separate goroutine until node closed. -func NewEthNode(group Group, name, url string, hm HealthMonitor) (*EthNode, error) { +func NewEthNode(group Group, name, url string) (*EthNode, error) { eth, err := rpc.NewEthClient(url) if err != nil { return nil, err @@ -107,7 +155,7 @@ func NewEthNode(group Group, name, url string, hm HealthMonitor) (*EthNode, erro n.atomicStatus.Store(NewStatus(group, name)) - go n.monitor(ctx, n, hm) + go n.monitor(ctx, n) return n, nil } @@ -135,7 +183,7 @@ type CfxNode struct { // NewCfxNode creates an instance of core space fullnode and start to monitor // node health in a separate goroutine until node closed. -func NewCfxNode(group Group, name, url string, hm HealthMonitor) (*CfxNode, error) { +func NewCfxNode(group Group, name, url string) (*CfxNode, error) { cfx, err := rpc.NewCfxClient(url) if err != nil { return nil, err @@ -149,7 +197,7 @@ func NewCfxNode(group Group, name, url string, hm HealthMonitor) (*CfxNode, erro n.atomicStatus.Store(NewStatus(group, name)) - go n.monitor(ctx, n, hm) + go n.monitor(ctx, n) return n, nil } diff --git a/node/node_status.go b/node/node_status.go index 3b912717..bc106bf5 100644 --- a/node/node_status.go +++ b/node/node_status.go @@ -18,6 +18,10 @@ type HealthMonitor interface { // Usually, it is the middle epoch number of all full nodes. HealthyEpoch() uint64 + // HealthStatus checks the health status of a full node and returns whether it is unhealthy + // along with the time when it was reported as unhealthy. + HealthStatus(nodeName string) (isUnhealthy bool, reportedAt time.Time) + // ReportEpoch fired when epoch changes. ReportEpoch(nodeName string, epoch uint64) @@ -38,9 +42,6 @@ type Status struct { successCounter uint64 failureCounter uint64 - unhealthy bool - unhealthReportAt time.Time - latestHeartBeatErrs *ring.Ring } @@ -59,9 +60,13 @@ func NewStatus(group Group, nodeName string) Status { } // Update heartbeats with node and updates health status. -func (s *Status) Update(n Node, monitor HealthMonitor) { +func (s *Status) Update(n Node) { s.heartbeat(n) - s.updateHealth(monitor) +} + +// Report reports health status to monitor. +func (s *Status) Report(hm HealthMonitor) { + s.updateHealth(hm) } // MarshalJSON marshals as JSON. @@ -78,8 +83,6 @@ func (s *Status) MarshalJSON() ([]byte, error) { LatestStateEpoch uint64 `json:"latestStateEpoch"` SuccessCounter uint64 `json:"successCounter"` FailureCounter uint64 `json:"failureCounter"` - Unhealthy bool `json:"unhealthy"` - UnhealthReportAt string `json:"unhealthReportAt"` LatestHeartBeatErrs []string `json:"latestHeartBeatErrs"` } @@ -96,8 +99,6 @@ func (s *Status) MarshalJSON() ([]byte, error) { LatestStateEpoch: s.latestStateEpoch, SuccessCounter: s.successCounter, FailureCounter: s.failureCounter, - Unhealthy: s.unhealthy, - UnhealthReportAt: s.unhealthReportAt.Format(time.RFC3339), } hbErrors := s.latestHeartBeatErrs.Values() @@ -131,21 +132,19 @@ func (s *Status) heartbeat(n Node) { // updateHealth reports health status to monitor. func (s *Status) updateHealth(monitor HealthMonitor) { reason := s.checkHealth(monitor.HealthyEpoch()) + unhealthy, unhealthReportAt := monitor.HealthStatus(s.nodeName) - if s.unhealthy { + if unhealthy { if reason == nil { // node become healthy after N success if s.successCounter >= cfg.Monitor.Recover.SuccessCounter { - s.unhealthy = false - s.unhealthReportAt = time.Time{} monitor.ReportHealthy(s.nodeName) } } else { // remind long unhealthy every N minutes, even occasionally succeeded - remindTime := s.unhealthReportAt.Add(cfg.Monitor.Recover.RemindInterval) + remindTime := unhealthReportAt.Add(cfg.Monitor.Recover.RemindInterval) if now := time.Now(); now.After(remindTime) { monitor.ReportUnhealthy(s.nodeName, true, reason) - s.unhealthReportAt = now } } } else { @@ -153,8 +152,6 @@ func (s *Status) updateHealth(monitor HealthMonitor) { monitor.ReportEpoch(s.nodeName, s.latestStateEpoch) } else { // node become unhealthy - s.unhealthy = true - s.unhealthReportAt = time.Now() monitor.ReportUnhealthy(s.nodeName, false, reason) } } diff --git a/node/pool.go b/node/pool.go index a0505244..66b4f2bf 100644 --- a/node/pool.go +++ b/node/pool.go @@ -7,6 +7,12 @@ import ( "github.com/pkg/errors" ) +// refNode reusable nodes with reference count +type refNode struct { + Node + refCnt int +} + // nodePool manages all full nodes by group type nodePool struct { mu sync.Mutex @@ -16,12 +22,16 @@ type nodePool struct { // node cluster managers by group: // group name => node cluster manager managers map[Group]*Manager + // all managed nodes: + // node name => refNode + nodes map[string]refNode } func newNodePool(nf nodeFactory) *nodePool { return &nodePool{ nf: nf, managers: make(map[Group]*Manager), + nodes: make(map[string]refNode), } } @@ -40,23 +50,36 @@ func (p *nodePool) add(grp Group, urls ...string) error { } m := p.managers[grp] - nodes := make([]Node, 0, len(urls)) + refNodes := make([]refNode, 0, len(urls)) for i := range urls { nn := rpc.Url2NodeName(urls[i]) - if _, ok := m.Get(nn); ok { + if _, ok := m.Get(nn); ok { // node already grouped? + continue + } + + if node, ok := p.nodes[nn]; ok { // node already exists? + refNodes = append(refNodes, node) continue } - n, err := p.nf(grp, nn, urls[i], m) + n, err := p.nf(grp, nn, urls[i]) if err != nil { return errors.WithMessagef(err, "failed to new node with url %v", urls[i]) } - nodes = append(nodes, n) + refNodes = append(refNodes, refNode{Node: n}) + } + + for _, rn := range refNodes { + m.Add(rn) + rn.Register(m) + + // reference the shared node + rn.refCnt++ + p.nodes[rn.Name()] = rn } - m.Add(nodes...) return nil } @@ -71,12 +94,29 @@ func (p *nodePool) del(grp Group, urls ...string) { } for i := range urls { - m.Remove(rpc.Url2NodeName(urls[i])) + nn := rpc.Url2NodeName(urls[i]) + if _, ok := m.Get(nn); !ok { // node not grouped? + continue + } + + m.Remove(nn) + + if rn, ok := p.nodes[nn]; ok { + rn.Deregister(m) + + // unreference the shared node + if rn.refCnt > 1 { + rn.refCnt-- + p.nodes[nn] = rn + } else { + rn.Close() + delete(p.nodes, nn) + } + } } if len(m.List()) == 0 { // uninstall group manager if no node exists anymore - m.Close() delete(p.managers, grp) } } diff --git a/node/pool_test.go b/node/pool_test.go index 86838ee4..3e622db5 100644 --- a/node/pool_test.go +++ b/node/pool_test.go @@ -32,7 +32,7 @@ type dummyNode struct { *baseNode } -func newDummyNode(group Group, name, url string, hm HealthMonitor) (*dummyNode, error) { +func newDummyNode(group Group, name, url string) (*dummyNode, error) { return &dummyNode{newBaseNode(name, url, nil)}, nil } @@ -41,8 +41,10 @@ func (n *dummyNode) LatestEpochNumber() (uint64, error) { } func BenchmarkNodePoolRoute(b *testing.B) { - pool := newNodePool(func(group Group, name, url string, hm HealthMonitor) (Node, error) { - return newDummyNode(group, name, url, hm) + MustInit() + + pool := newNodePool(func(group Group, name, url string) (Node, error) { + return newDummyNode(group, name, url) }) var groups []Group diff --git a/util/rpc/client_cfx.go b/util/rpc/client_cfx.go index 60c0c002..bc3d14e5 100644 --- a/util/rpc/client_cfx.go +++ b/util/rpc/client_cfx.go @@ -69,9 +69,15 @@ func NewCfxClient(url string, options ...ClientOption) (*sdk.Client, error) { } cfx, err := sdk.NewClient(url, *opt.ClientOption) - if err == nil && opt.hookMetrics { - HookMiddlewares(cfx.Provider(), url, "cfx") + if err != nil { + return cfx, err + } + + hookFlag := MiddlewareHookAll + if !opt.hookMetrics { + hookFlag ^= MiddlewareHookLogMetrics } + HookMiddlewares(cfx.Provider(), url, "cfx", hookFlag) - return cfx, err + return cfx, nil } diff --git a/util/rpc/client_eth.go b/util/rpc/client_eth.go index 56ec000f..eed1dbff 100644 --- a/util/rpc/client_eth.go +++ b/util/rpc/client_eth.go @@ -71,9 +71,15 @@ func NewEthClient(url string, options ...ClientOption) (*web3go.Client, error) { } eth, err := web3go.NewClientWithOption(url, opt.ClientOption) - if err == nil && opt.hookMetrics { - HookMiddlewares(eth.Provider(), url, "eth") + if err != nil { + return eth, err + } + + hookFlag := MiddlewareHookAll + if !opt.hookMetrics { + hookFlag ^= MiddlewareHookLogMetrics } + HookMiddlewares(eth.Provider(), url, "eth", hookFlag) - return eth, err + return eth, nil } diff --git a/util/rpc/client_middlewares.go b/util/rpc/client_middlewares.go index 41199ad3..dbd847a0 100644 --- a/util/rpc/client_middlewares.go +++ b/util/rpc/client_middlewares.go @@ -44,10 +44,38 @@ func Url2NodeName(url string) string { return strings.TrimPrefix(nodeName, "/") } -func HookMiddlewares(provider *providers.MiddlewarableProvider, url, space string) { +// MiddlewareHookFlag represents the type for middleware hook flags. +type MiddlewareHookFlag int + +const ( + // MiddlewareHookNone represents no middleware hooks enabled. + MiddlewareHookNone MiddlewareHookFlag = 0 + + // MiddlewareHookAll enables all middleware hooks. + MiddlewareHookAll MiddlewareHookFlag = ^MiddlewareHookFlag(0) + + // MiddlewareHookLog enables logging middleware hook. + MiddlewareHookLog MiddlewareHookFlag = 1 << iota + + // MiddlewareHookLogMetrics enables metrics logging middleware hook. + MiddlewareHookLogMetrics +) + +func HookMiddlewares(provider *providers.MiddlewarableProvider, url, space string, flags ...MiddlewareHookFlag) { nodeName := Url2NodeName(url) - provider.HookCallContext(middlewareLog(nodeName, space)) - provider.HookCallContext(middlewareMetrics(nodeName, space)) + + flag := MiddlewareHookAll + if len(flags) > 0 { + flag = flags[0] + } + + if flag&MiddlewareHookLog != 0 { + provider.HookCallContext(middlewareLog(nodeName, space)) + } + + if flag&MiddlewareHookLogMetrics != 0 { + provider.HookCallContext(middlewareMetrics(nodeName, space)) + } } func middlewareMetrics(fullnode, space string) providers.CallContextMiddleware {