Skip to content

Commit

Permalink
Merge pull request #2478 from iotaledger/metrics-pipes
Browse files Browse the repository at this point in the history
Metrics for pipe lenghts.
  • Loading branch information
kape1395 authored May 16, 2023
2 parents bb36783 + ef3e0cb commit 7bfa0b2
Show file tree
Hide file tree
Showing 17 changed files with 189 additions and 18 deletions.
3 changes: 3 additions & 0 deletions components/prometheus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ func configure() error {
if ParamsPrometheus.ChainNodeConnMetrics {
register("chain node conn", deps.ChainMetrics.PrometheusCollectorsChainNodeConn()...)
}
if ParamsPrometheus.ChainPipeMetrics {
deps.ChainMetrics.PrometheusRegisterChainPipeMetrics(deps.PrometheusRegistry)
}
if ParamsPrometheus.PeeringMetrics {
register("peering", deps.PeeringMetrics.Collectors()...)
}
Expand Down
1 change: 1 addition & 0 deletions components/prometheus/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type ParametersPrometheus struct {
ChainStateMetrics bool `default:"true" usage:"whether to include chain state metrics"`
ChainStateManagerMetrics bool `default:"true" usage:"whether to include chain state manager metrics"`
ChainNodeConnMetrics bool `default:"true" usage:"whether to include chain node conn metrics"`
ChainPipeMetrics bool `default:"true" usage:"whether to include chain pipe metrics"`
PeeringMetrics bool `default:"true" usage:"whether to include peering metrics"`
RestAPIMetrics bool `default:"true" usage:"whether to include restAPI metrics"`
GoMetrics bool `default:"true" usage:"whether to include go metrics"`
Expand Down
2 changes: 2 additions & 0 deletions config_defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@
"chainStateMetrics": true,
"chainStateManagerMetrics": true,
"chainNodeConnMetrics": true,
"chainPipeMetrics": true,
"peeringMetrics": true,
"restAPIMetrics": true,
"goMetrics": true,
"processMetrics": true,
Expand Down
4 changes: 4 additions & 0 deletions documentation/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ Example:
| chainStateMetrics | Whether to include chain state metrics | boolean | true |
| chainStateManagerMetrics | Whether to include chain state manager metrics | boolean | true |
| chainNodeConnMetrics | Whether to include chain node conn metrics | boolean | true |
| chainPipeMetrics | Whether to include chain pipe metrics | boolean | true |
| peeringMetrics | Whether to include peering metrics | boolean | true |
| restAPIMetrics | Whether to include restAPI metrics | boolean | true |
| goMetrics | Whether to include go metrics | boolean | true |
| processMetrics | Whether to include process metrics | boolean | true |
Expand All @@ -464,6 +466,8 @@ Example:
"chainStateMetrics": true,
"chainStateManagerMetrics": true,
"chainNodeConnMetrics": true,
"chainPipeMetrics": true,
"peeringMetrics": true,
"restAPIMetrics": true,
"goMetrics": true,
"processMetrics": true,
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -880,8 +880,6 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 h1:5llv2sWeaMSnA3w2kS57ouQQ4pudlXrR0dCgw51QK9o=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea h1:vLCWI/yYrdEHyN2JzIzPO3aaQJHQdp89IZBA/+azVC4=
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
7 changes: 7 additions & 0 deletions packages/chain/cons/cons_gr/gr.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type ConsGr struct {
netDisconnect context.CancelFunc
net peering.NetworkProvider
ctx context.Context
pipeMetrics metrics.IChainPipeMetrics
log *logger.Logger
}

Expand All @@ -136,6 +137,7 @@ func New(
redeliveryPeriod time.Duration,
printStatusPeriod time.Duration,
chainMetrics metrics.IChainConsensusMetrics,
pipeMetrics metrics.IChainPipeMetrics,
log *logger.Logger,
) *ConsGr {
cmtPubKey := dkShare.GetSharedPublic()
Expand Down Expand Up @@ -163,8 +165,12 @@ func New(
netDisconnect: nil, // Set bellow.
net: net,
ctx: ctx,
pipeMetrics: pipeMetrics,
log: log,
}

pipeMetrics.TrackPipeLenMax("cons-gr-netRecvPipe", netPeeringID.String(), cgr.netRecvPipe.Len)

constInstRaw := cons.New(chainID, chainStore, me, myNodeIdentity.GetPrivateKey(), dkShare, procCache, netPeeringID[:], gpa.NodeIDFromPublicKey, log).AsGPA()
cgr.consInst = gpa.NewAckHandler(me, constInstRaw, redeliveryPeriod)

Expand Down Expand Up @@ -202,6 +208,7 @@ func (cgr *ConsGr) Time(t time.Time) {
func (cgr *ConsGr) run() { //nolint:gocyclo,funlen
defer util.ExecuteIfNotNil(cgr.netDisconnect)
defer func() {
cgr.pipeMetrics.ForgetPipeLenMax("cons-gr-netRecvPipe", cgr.netPeeringID.String())
cgr.netRecvPipe.Discard()
}()

Expand Down
1 change: 1 addition & 0 deletions packages/chain/cons/cons_gr/gr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func testGrBasic(t *testing.T, n, f int, reliable bool) {
1*time.Second, // RedeliveryPeriod
5*time.Second, // PrintStatusPeriod
metrics.NewEmptyChainConsensusMetric(),
metrics.NewEmptyChainPipeMetrics(),
log.Named(fmt.Sprintf("N#%v", i)),
)
}
Expand Down
12 changes: 12 additions & 0 deletions packages/chain/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func New(
net peering.NetworkProvider,
log *logger.Logger,
metrics metrics.IChainMempoolMetrics,
pipeMetrics metrics.IChainPipeMetrics,
listener ChainListener,
) Mempool {
netPeeringID := peering.HashPeeringIDFromBytes(chainID.Bytes(), []byte("Mempool")) // ChainID × Mempool
Expand Down Expand Up @@ -230,6 +231,17 @@ func New(
metrics: metrics,
listener: listener,
}

pipeMetrics.TrackPipeLen("mp-serverNodesUpdatedPipe", mpi.serverNodesUpdatedPipe.Len)
pipeMetrics.TrackPipeLen("mp-accessNodesUpdatedPipe", mpi.accessNodesUpdatedPipe.Len)
pipeMetrics.TrackPipeLen("mp-reqConsensusProposalPipe", mpi.reqConsensusProposalPipe.Len)
pipeMetrics.TrackPipeLen("mp-reqConsensusRequestsPipe", mpi.reqConsensusRequestsPipe.Len)
pipeMetrics.TrackPipeLen("mp-reqReceiveOnLedgerRequestPipe", mpi.reqReceiveOnLedgerRequestPipe.Len)
pipeMetrics.TrackPipeLen("mp-reqReceiveOffLedgerRequestPipe", mpi.reqReceiveOffLedgerRequestPipe.Len)
pipeMetrics.TrackPipeLen("mp-reqTangleTimeUpdatedPipe", mpi.reqTangleTimeUpdatedPipe.Len)
pipeMetrics.TrackPipeLen("mp-reqTrackNewChainHeadPipe", mpi.reqTrackNewChainHeadPipe.Len)
pipeMetrics.TrackPipeLen("mp-netRecvPipe", mpi.netRecvPipe.Len)

mpi.distSync = distsync.New(
mpi.pubKeyAsNodeID(nodeIdentity.GetPublicKey()),
mpi.distSyncRequestNeededCB,
Expand Down
1 change: 1 addition & 0 deletions packages/chain/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv {
te.networkProviders[i],
te.log.Named(fmt.Sprintf("N#%v", i)),
metrics.NewEmptyChainMempoolMetric(),
metrics.NewEmptyChainPipeMetrics(),
chain.NewEmptyChainListener(),
)
}
Expand Down
12 changes: 12 additions & 0 deletions packages/chain/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,15 @@ func New(
chainMetrics: chainMetrics,
log: log,
}

cni.chainMetrics.TrackPipeLen("node-recvAliasOutputPipe", cni.recvAliasOutputPipe.Len)
cni.chainMetrics.TrackPipeLen("node-recvTxPublishedPipe", cni.recvTxPublishedPipe.Len)
cni.chainMetrics.TrackPipeLen("node-recvMilestonePipe", cni.recvMilestonePipe.Len)
cni.chainMetrics.TrackPipeLen("node-consOutputPipe", cni.consOutputPipe.Len)
cni.chainMetrics.TrackPipeLen("node-consRecoverPipe", cni.consRecoverPipe.Len)
cni.chainMetrics.TrackPipeLen("node-serversUpdatedPipe", cni.serversUpdatedPipe.Len)
cni.chainMetrics.TrackPipeLen("node-netRecvPipe", cni.netRecvPipe.Len)

cni.tryRecoverStoreFromWAL(chainStore, blockWAL)
cni.me = cni.pubKeyAsNodeID(nodeIdentity.GetPublicKey())
//
Expand Down Expand Up @@ -387,6 +396,7 @@ func New(
chainStore,
shutdownCoordinator.Nested("StateMgr"),
chainMetrics,
chainMetrics,
cni.log.Named("SM"),
)
if err != nil {
Expand All @@ -399,6 +409,7 @@ func New(
net,
cni.log.Named("MP"),
chainMetrics,
chainMetrics,
cni.listener,
)
cni.chainMgr = gpa.NewAckHandler(cni.me, chainMgr.AsGPA(), redeliveryPeriod)
Expand Down Expand Up @@ -849,6 +860,7 @@ func (cni *chainNodeImpl) ensureConsensusInst(ctx context.Context, needConsensus
cni.procCache, cni.mempool, cni.stateMgr, cni.net,
recoveryTimeout, redeliveryPeriod, printStatusPeriod,
cni.chainMetrics,
cni.chainMetrics,
cni.log.Named(fmt.Sprintf("C-%v.LI-%v", committeeAddr.String()[:10], logIndexCopy)),
)
consensusInstances.Set(addLogIndex, &consensusInst{
Expand Down
7 changes: 7 additions & 0 deletions packages/chain/statemanager/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func New(
store state.Store,
shutdownCoordinator *shutdown.Coordinator,
metrics metrics.IChainStateManagerMetrics,
pipeMetrics metrics.IChainPipeMetrics,
log *logger.Logger,
timersOpt ...sm_gpa.StateManagerTimers,
) (StateMgr, error) {
Expand Down Expand Up @@ -147,6 +148,12 @@ func New(
ctx: ctx,
shutdownCoordinator: shutdownCoordinator,
}

pipeMetrics.TrackPipeLen("sm-inputPipe", result.inputPipe.Len)
pipeMetrics.TrackPipeLen("sm-messagePipe", result.messagePipe.Len)
pipeMetrics.TrackPipeLen("sm-nodePubKeysPipe", result.nodePubKeysPipe.Len)
pipeMetrics.TrackPipeLen("sm-preliminaryBlockPipe", result.preliminaryBlockPipe.Len)

result.handleNodePublicKeys(&reqChainNodesUpdated{
serverNodes: peerPubKeys,
accessNodes: []*cryptolib.PublicKey{},
Expand Down
1 change: 1 addition & 0 deletions packages/chain/statemanager/state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func TestCruelWorld(t *testing.T) {
stores[i],
nil,
metrics.NewEmptyChainStateManagerMetric(),
metrics.NewEmptyChainPipeMetrics(),
log.Named(peeringURLs[i]),
timers,
)
Expand Down
14 changes: 14 additions & 0 deletions packages/metrics/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

const (
labelNameChain = "chain"
labelNamePipeName = "pipe_name"
labelNameMessageType = "message_type"
labelNameInMilestone = "in_milestone"
labelNameInStateOutputMetrics = "in_state_output"
Expand All @@ -32,6 +33,7 @@ const (
)

type IChainMetrics interface {
IChainPipeMetrics
IChainBlockWALMetrics
IChainConsensusMetrics
IChainMempoolMetrics
Expand Down Expand Up @@ -97,6 +99,7 @@ func (m *messageMetric[T]) LastMessage() T {
}

type emptyChainMetrics struct {
IChainPipeMetrics
IChainBlockWALMetrics
IChainConsensusMetrics
IChainMempoolMetrics
Expand All @@ -109,6 +112,7 @@ type emptyChainMetrics struct {

func NewEmptyChainMetrics() IChainMetrics {
return &emptyChainMetrics{
IChainPipeMetrics: NewEmptyChainPipeMetrics(),
IChainBlockWALMetrics: NewEmptyChainBlockWALMetrics(),
IChainConsensusMetrics: NewEmptyChainConsensusMetric(),
IChainMempoolMetrics: NewEmptyChainMempoolMetric(),
Expand All @@ -121,6 +125,7 @@ func NewEmptyChainMetrics() IChainMetrics {
}

type chainMetrics struct {
*chainPipeMetrics
*chainBlockWALMetrics
*chainConsensusMetric
*chainMempoolMetric
Expand All @@ -133,6 +138,7 @@ type chainMetrics struct {

func newChainMetrics(provider *ChainMetricsProvider, chainID isc.ChainID) *chainMetrics {
return &chainMetrics{
chainPipeMetrics: newChainPipeMetric(provider, chainID),
chainBlockWALMetrics: newChainBlockWALMetrics(provider, chainID),
chainConsensusMetric: newChainConsensusMetric(provider, chainID),
chainMempoolMetric: newChainMempoolMetric(provider, chainID),
Expand All @@ -146,6 +152,10 @@ func newChainMetrics(provider *ChainMetricsProvider, chainID isc.ChainID) *chain

// ChainMetricsProvider holds all metrics for all chains per chain
type ChainMetricsProvider struct {
// We use Func variant of a metric here, thus we register them
// explicitly when they are created. Therefore we need a registry here.
pipeLenRegistry *prometheus.Registry

// blockWAL
blockWALFailedWrites *prometheus.CounterVec
blockWALFailedReads *prometheus.CounterVec
Expand Down Expand Up @@ -645,6 +655,10 @@ func (m *ChainMetricsProvider) PrometheusCollectorsChainNodeConn() []prometheus.
}
}

func (m *ChainMetricsProvider) PrometheusRegisterChainPipeMetrics(reg *prometheus.Registry) {
m.pipeLenRegistry = reg
}

func (m *ChainMetricsProvider) PrometheusCollectorsWebAPI() []prometheus.Collector {
return []prometheus.Collector{
m.webAPIRequests,
Expand Down
118 changes: 118 additions & 0 deletions packages/metrics/chain_pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package metrics

import (
"fmt"

"github.com/prometheus/client_golang/prometheus"

"github.com/iotaledger/wasp/packages/isc"
)

type IChainPipeMetrics interface {
TrackPipeLen(name string, lenFunc func() int)
TrackPipeLenMax(name string, key string, lenFunc func() int)
ForgetPipeLenMax(name string, key string)
}

type emptyChainPipeMetrics struct{}

func NewEmptyChainPipeMetrics() IChainPipeMetrics { return &emptyChainPipeMetrics{} }
func (m *emptyChainPipeMetrics) TrackPipeLen(name string, lenFunc func() int) {}
func (m *emptyChainPipeMetrics) TrackPipeLenMax(name string, key string, lenFunc func() int) {}
func (m *emptyChainPipeMetrics) ForgetPipeLenMax(name string, key string) {}

type chainPipeMetrics struct {
chainID isc.ChainID
provider *ChainMetricsProvider
lenMetrics map[string]prometheus.Collector
maxMetrics map[string]*chainPipeMaxCollector
}

type chainPipeMaxCollector struct {
collector prometheus.Collector
valueFuncs map[string]func() int
}

func newChainPipeMetric(provider *ChainMetricsProvider, chainID isc.ChainID) *chainPipeMetrics {
return &chainPipeMetrics{
chainID: chainID,
provider: provider,
lenMetrics: map[string]prometheus.Collector{},
maxMetrics: map[string]*chainPipeMaxCollector{},
}
}

func (m *chainPipeMetrics) makeLabels(pipeName string) prometheus.Labels {
return prometheus.Labels{
labelNameChain: m.chainID.String(),
labelNamePipeName: pipeName,
}
}

func (m *chainPipeMetrics) TrackPipeLen(name string, lenFunc func() int) {
reg := m.provider.pipeLenRegistry
if reg == nil {
return
}
if oldCollector, ok := m.lenMetrics[name]; ok {
reg.Unregister(oldCollector)
}

collector := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "iota_wasp",
Subsystem: "chain_pipe",
Name: "len",
Help: "Length of a pipe",
ConstLabels: m.makeLabels(name),
}, func() float64 { return float64(lenFunc()) })

if err := reg.Register(collector); err != nil {
panic(fmt.Errorf("failed to register pipe len metric: %w", err))
}
}

func (m *chainPipeMetrics) TrackPipeLenMax(name string, key string, lenFunc func() int) {
reg := m.provider.pipeLenRegistry
if reg == nil {
return
}

maxCollector, found := m.maxMetrics[name]
if !found {
valueFuncs := map[string]func() int{}
collector := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "iota_wasp",
Subsystem: "chain_pipe",
Name: "len",
Help: "Length of a pipe",
ConstLabels: m.makeLabels(name),
}, func() float64 {
max := 0
for _, f := range valueFuncs {
fVal := f()
if max < fVal {
max = fVal
}
}
return float64(max)
})
maxCollector = &chainPipeMaxCollector{
collector: collector,
valueFuncs: valueFuncs,
}
m.maxMetrics[name] = maxCollector
}
maxCollector.valueFuncs[key] = lenFunc
}

func (m *chainPipeMetrics) ForgetPipeLenMax(name string, key string) {
reg := m.provider.pipeLenRegistry
if reg == nil {
return
}
maxCollector, found := m.maxMetrics[name]
if !found {
return
}
delete(maxCollector.valueFuncs, key)
}
Loading

0 comments on commit 7bfa0b2

Please sign in to comment.