From 662470e3e20fc11912c426d2c5d3a6978a60f109 Mon Sep 17 00:00:00 2001 From: djkazic Date: Wed, 24 Jan 2024 08:34:16 -0500 Subject: [PATCH] lndmon: cache closedChannels response --- collectors/channels_collector.go | 71 +++++++++++++++++++++++++++----- collectors/prometheus.go | 10 +++-- lndmon.go | 4 +- 3 files changed, 69 insertions(+), 16 deletions(-) diff --git a/collectors/channels_collector.go b/collectors/channels_collector.go index 085e308..138374a 100644 --- a/collectors/channels_collector.go +++ b/collectors/channels_collector.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "strconv" + "sync" + "time" "github.com/btcsuite/btcd/btcutil" "github.com/lightninglabs/lndclient" @@ -11,6 +13,9 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// Cache refresh interval magic number. +const cacheRefreshInterval = 10 * time.Minute + // ChannelsCollector is a collector that keeps track of channel information. type ChannelsCollector struct { channelBalanceDesc *prometheus.Desc @@ -51,17 +56,25 @@ type ChannelsCollector struct { // errChan is a channel that we send any errors that we encounter into. // This channel should be buffered so that it does not block sends. errChan chan<- error + + // quit is a channel that we use to signal for graceful shutdown. + quit chan struct{} + + // cache is for storing results from a ticker to reduce grpc server + // load on lnd. + closedChannelsCache []lndclient.ClosedChannel + cacheMutex sync.RWMutex } // NewChannelsCollector returns a new instance of the ChannelsCollector for the // target lnd client. func NewChannelsCollector(lnd lndclient.LightningClient, errChan chan<- error, - cfg *MonitoringConfig) *ChannelsCollector { + quitChan chan struct{}, cfg *MonitoringConfig) *ChannelsCollector { // Our set of labels, status should either be active or inactive. The // initiator is "true" if we are the initiator, and "false" otherwise. labels := []string{"chan_id", "status", "initiator", "peer"} - return &ChannelsCollector{ + collector := &ChannelsCollector{ channelBalanceDesc: prometheus.NewDesc( "lnd_channels_open_balance_sat", "total balance of channels in satoshis", @@ -174,10 +187,49 @@ func NewChannelsCollector(lnd lndclient.LightningClient, errChan chan<- error, []string{"amount"}, nil, ), - lnd: lnd, - primaryNode: cfg.PrimaryNode, - errChan: errChan, + lnd: lnd, + primaryNode: cfg.PrimaryNode, + closedChannelsCache: nil, + errChan: errChan, + quit: quitChan, } + + // Start a ticker to update the cache once per 10m + go func() { + ticker := time.NewTicker(cacheRefreshInterval) + defer ticker.Stop() + + for { + err := collector.refreshClosedChannelsCache() + if err != nil { + errChan <- err + } + + select { + case <-ticker.C: + continue + + case <-collector.quit: + return + } + } + }() + + return collector +} + +// refreshClosedChannelsCache acquires a mutex write lock to update +// the closedChannelsCache. +func (c *ChannelsCollector) refreshClosedChannelsCache() error { + data, err := c.lnd.ClosedChannels(context.Background()) + if err != nil { + return err + } + c.cacheMutex.Lock() + c.closedChannelsCache = data + c.cacheMutex.Unlock() + + return nil } // Describe sends the super-set of all possible descriptors of metrics @@ -452,12 +504,9 @@ func (c *ChannelsCollector) Collect(ch chan<- prometheus.Metric) { ) // Get the list of closed channels. - closedChannelsResp, err := c.lnd.ClosedChannels(context.Background()) - if err != nil { - c.errChan <- fmt.Errorf("ChannelsCollector ClosedChannels "+ - "failed with: %v", err) - return - } + c.cacheMutex.RLock() + closedChannelsResp := c.closedChannelsCache + c.cacheMutex.RUnlock() closeCounts := make(map[string]int) for _, channel := range closedChannelsResp { typeString, ok := closeTypeLabelMap[channel.CloseType] diff --git a/collectors/prometheus.go b/collectors/prometheus.go index c5368cc..1e97236 100644 --- a/collectors/prometheus.go +++ b/collectors/prometheus.go @@ -84,7 +84,7 @@ func DefaultConfig() *PrometheusConfig { // NewPrometheusExporter makes a new instance of the PrometheusExporter given // the address to listen for Prometheus on and an lnd gRPC client. func NewPrometheusExporter(cfg *PrometheusConfig, lnd *lndclient.LndServices, - monitoringCfg *MonitoringConfig) *PrometheusExporter { + monitoringCfg *MonitoringConfig, quitChan chan struct{}) *PrometheusExporter { // We have six collectors and a htlc monitor running, so we buffer our // error channel by 7 so that we do not need to consume all errors from @@ -94,11 +94,13 @@ func NewPrometheusExporter(cfg *PrometheusConfig, lnd *lndclient.LndServices, htlcMonitor := newHtlcMonitor(lnd.Router, errChan) + chanCollector := NewChannelsCollector( + lnd.Client, errChan, quitChan, monitoringCfg, + ) + collectors := []prometheus.Collector{ NewChainCollector(lnd.Client, errChan), - NewChannelsCollector( - lnd.Client, errChan, monitoringCfg, - ), + chanCollector, NewWalletCollector(lnd, errChan), NewPeerCollector(lnd.Client, errChan), NewInfoCollector(lnd.Client, errChan), diff --git a/lndmon.go b/lndmon.go index bcf062b..538177f 100755 --- a/lndmon.go +++ b/lndmon.go @@ -31,6 +31,7 @@ func start() error { return err } + quit := make(chan struct{}) interceptor, err := signal.Intercept() if err != nil { return fmt.Errorf("could not intercept signal: %v", err) @@ -72,7 +73,7 @@ func start() error { // Start our Prometheus exporter. This exporter spawns a goroutine // that pulls metrics from our lnd client on a set interval. exporter := collectors.NewPrometheusExporter( - cfg.Prometheus, &lnd.LndServices, &monitoringCfg, + cfg.Prometheus, &lnd.LndServices, &monitoringCfg, quit, ) if err := exporter.Start(); err != nil { return err @@ -83,6 +84,7 @@ func start() error { var stopErr error select { case <-interceptor.ShutdownChannel(): + close(quit) fmt.Println("Exiting lndmon.") case stopErr = <-exporter.Errors():