Skip to content

Commit

Permalink
Merge pull request #2123 from iotaledger/mempool-metrics
Browse files Browse the repository at this point in the history
Refactor chain and BlockWAL metrics
  • Loading branch information
muXxer authored Mar 20, 2023
2 parents 12d3bf8 + 14d1126 commit 846ee3e
Show file tree
Hide file tree
Showing 28 changed files with 437 additions and 330 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ARG GOLANG_IMAGE_TAG=1.20-bullseye
# Build stage
FROM golang:${GOLANG_IMAGE_TAG} AS build
ARG BUILD_TAGS=rocksdb
ARG BUILD_LD_FLAGS=""
ARG BUILD_LD_FLAGS="--X=github.com/iotaledger/wasp/core/app.Version=v0.0.0-testing"

LABEL org.label-schema.description="Wasp"
LABEL org.label-schema.name="iotaledger/wasp"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.noncached
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ARG GOLANG_IMAGE_TAG=1.20-bullseye
# Build stage
FROM golang:${GOLANG_IMAGE_TAG} AS build
ARG BUILD_TAGS=rocksdb
ARG BUILD_LD_FLAGS=""
ARG BUILD_LD_FLAGS="--X=github.com/iotaledger/wasp/core/app.Version=v0.0.0-testing"

LABEL org.label-schema.description="Wasp"
LABEL org.label-schema.name="iotaledger/wasp"
Expand Down
1 change: 1 addition & 0 deletions config_defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"bindAddress": "0.0.0.0:2112",
"nodeMetrics": true,
"nodeConnMetrics": true,
"chainMetrics": true,
"blockWALMetrics": true,
"restAPIMetrics": true,
"goMetrics": true,
Expand Down
27 changes: 26 additions & 1 deletion core/chains/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,28 @@ func initConfigPars(c *dig.Container) error {
}

func provide(c *dig.Container) error {
type metricsDeps struct {
dig.In

NodeConnection chain.NodeConnection
}

type metricsResult struct {
dig.Out

ChainMetrics *metrics.ChainMetrics
BlockWALMetrics *metrics.BlockWALMetrics
}

if err := c.Provide(func(deps metricsDeps) metricsResult {
return metricsResult{
ChainMetrics: metrics.NewChainMetrics(deps.NodeConnection.GetMetrics()),
BlockWALMetrics: metrics.NewBlockWALMetrics(),
}
}); err != nil {
CoreComponent.LogPanic(err)
}

type chainsDeps struct {
dig.In

Expand All @@ -78,7 +100,8 @@ func provide(c *dig.Container) error {
NodeIdentityProvider registry.NodeIdentityProvider
ConsensusStateRegistry cmtLog.ConsensusStateRegistry
ChainListener *publisher.Publisher
Metrics *metrics.Metrics `optional:"true"`
ChainMetrics *metrics.ChainMetrics
BlockWALMetrics *metrics.BlockWALMetrics
}

type chainsResult struct {
Expand Down Expand Up @@ -107,6 +130,8 @@ func provide(c *dig.Container) error {
deps.ConsensusStateRegistry,
deps.ChainListener,
shutdown.NewCoordinator("chains", CoreComponent.Logger().Named("Shutdown")),
deps.ChainMetrics,
deps.BlockWALMetrics,
),
}
}); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions documentation/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ Example:
| bindAddress | The bind address on which the Prometheus exporter listens on | string | "0.0.0.0:2112" |
| nodeMetrics | Whether to include node metrics | boolean | true |
| nodeConnMetrics | Whether to include node connection metrics | boolean | true |
| chainMetrics | Whether to include chain metrics | boolean | true |
| blockWALMetrics | Whether to include block Write-Ahead Log (WAL) metrics | boolean | true |
| restAPIMetrics | Whether to include restAPI metrics | boolean | true |
| goMetrics | Whether to include go metrics | boolean | true |
Expand All @@ -364,6 +365,7 @@ Example:
"bindAddress": "0.0.0.0:2112",
"nodeMetrics": true,
"nodeConnMetrics": true,
"chainMetrics": true,
"blockWALMetrics": true,
"restAPIMetrics": true,
"goMetrics": true,
Expand Down
4 changes: 2 additions & 2 deletions packages/chain/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ type mempoolImpl struct {
netPeerPubs map[gpa.NodeID]*cryptolib.PublicKey
net peering.NetworkProvider
log *logger.Logger
metrics metrics.MempoolMetrics
metrics metrics.IMempoolMetrics
listener ChainListener
}

Expand Down Expand Up @@ -192,7 +192,7 @@ func New(
nodeIdentity *cryptolib.KeyPair,
net peering.NetworkProvider,
log *logger.Logger,
metrics metrics.MempoolMetrics,
metrics metrics.IMempoolMetrics,
listener ChainListener,
) Mempool {
netPeeringID := peering.HashPeeringIDFromBytes(chainID.Bytes(), []byte("Mempool")) // ChainID × Mempool
Expand Down
2 changes: 1 addition & 1 deletion packages/chain/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (m *MockMempoolMetrics) IncRequestsProcessed() {
m.processedRequestCounter++
}

func (m *MockMempoolMetrics) SetRequestProcessingTime(reqID isc.RequestID, elapse time.Duration) {}
func (m *MockMempoolMetrics) SetRequestProcessingTime(_ time.Duration) {}

func (m *MockMempoolMetrics) IncBlocksPerChain() {}

Expand Down
4 changes: 2 additions & 2 deletions packages/chain/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func New(
listener ChainListener,
accessNodesFromNode []*cryptolib.PublicKey,
net peering.NetworkProvider,
chainMetric metrics.IChainMetric,
shutdownCoordinator *shutdown.Coordinator,
log *logger.Logger,
) (Chain, error) {
Expand Down Expand Up @@ -303,7 +304,6 @@ func New(
cni.me = cni.pubKeyAsNodeID(nodeIdentity.GetPublicKey())
//
// Create sub-components.
chainMetrics := metrics.EmptyChainMetrics()
chainMgr, err := chainMgr.New(
cni.me,
cni.chainID,
Expand Down Expand Up @@ -339,7 +339,7 @@ func New(
nodeIdentity,
net,
cni.log.Named("MP"),
chainMetrics,
chainMetric,
cni.listener,
)
cni.chainMgr = gpa.NewAckHandler(cni.me, chainMgr.AsGPA(), redeliveryPeriod)
Expand Down
2 changes: 2 additions & 0 deletions packages/chain/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/iotaledger/wasp/packages/cryptolib"
"github.com/iotaledger/wasp/packages/isc"
"github.com/iotaledger/wasp/packages/kv/dict"
"github.com/iotaledger/wasp/packages/metrics"
"github.com/iotaledger/wasp/packages/metrics/nodeconnmetrics"
"github.com/iotaledger/wasp/packages/parameters"
"github.com/iotaledger/wasp/packages/peering"
Expand Down Expand Up @@ -446,6 +447,7 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv {
chain.NewEmptyChainListener(),
[]*cryptolib.PublicKey{}, // Access nodes.
te.networkProviders[i],
metrics.NewEmptyChainMetric(),
shutdown.NewCoordinator("test", log),
log,
)
Expand Down
31 changes: 16 additions & 15 deletions packages/chain/statemanager/smGPA/smGPAUtils/block_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,29 @@ import (
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/runtime/ioutils"
"github.com/iotaledger/wasp/packages/isc"
"github.com/iotaledger/wasp/packages/metrics"
"github.com/iotaledger/wasp/packages/state"
)

type blockWAL struct {
*logger.WrappedLogger

dir string
metrics *BlockWALMetrics
dir string
blockWALMetrics metrics.IBlockWALMetric
}

const constFileSuffix = ".blk"

func NewBlockWAL(log *logger.Logger, baseDir string, chainID isc.ChainID, metrics *BlockWALMetrics) (BlockWAL, error) {
func NewBlockWAL(log *logger.Logger, baseDir string, chainID isc.ChainID, blockWALMetrics metrics.IBlockWALMetric) (BlockWAL, error) {
dir := filepath.Join(baseDir, chainID.String())
if err := ioutils.CreateDirectory(dir, 0o777); err != nil {
return nil, fmt.Errorf("BlockWAL cannot create folder %v: %w", dir, err)
}

result := &blockWAL{
WrappedLogger: logger.NewWrappedLogger(log),
dir: dir,
metrics: metrics,
WrappedLogger: logger.NewWrappedLogger(log),
dir: dir,
blockWALMetrics: blockWALMetrics,
}
result.LogDebugf("BlockWAL created in folder %v", dir)
return result, nil
Expand All @@ -44,21 +45,21 @@ func (bwT *blockWAL) Write(block state.Block) error {
bwT.LogDebugf("Writing block %s to wal; file name - %s", commitment, fileName)
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o666)
if err != nil {
bwT.metrics.failedWrites.Inc()
bwT.blockWALMetrics.IncFailedWrites()
return fmt.Errorf("openning file %s for writing failed: %w", fileName, err)
}
defer f.Close()
blockBytes := block.Bytes()
n, err := f.Write(blockBytes)
if err != nil {
bwT.metrics.failedReads.Inc()
bwT.blockWALMetrics.IncFailedReads()
return fmt.Errorf("writing block data to file %s failed: %w", fileName, err)
}
if len(blockBytes) != n {
bwT.metrics.failedReads.Inc()
bwT.blockWALMetrics.IncFailedReads()
return fmt.Errorf("only %v of total %v bytes of block were written to file %s", n, len(blockBytes), fileName)
}
bwT.metrics.segments.Inc()
bwT.blockWALMetrics.IncSegments()
return nil
}

Expand All @@ -72,28 +73,28 @@ func (bwT *blockWAL) Read(blockHash state.BlockHash) (state.Block, error) {
filePath := filepath.Join(bwT.dir, fileName)
f, err := os.OpenFile(filePath, os.O_RDONLY, 0o666)
if err != nil {
bwT.metrics.failedReads.Inc()
bwT.blockWALMetrics.IncFailedReads()
return nil, fmt.Errorf("opening file %s for reading failed: %w", fileName, err)
}
defer f.Close()
stat, err := f.Stat()
if err != nil {
bwT.metrics.failedReads.Inc()
bwT.blockWALMetrics.IncFailedReads()
return nil, fmt.Errorf("reading file %s information failed: %w", fileName, err)
}
blockBytes := make([]byte, stat.Size())
n, err := bufio.NewReader(f).Read(blockBytes)
if err != nil {
bwT.metrics.failedReads.Inc()
bwT.blockWALMetrics.IncFailedReads()
return nil, fmt.Errorf("reading file %s failed: %w", fileName, err)
}
if int64(n) != stat.Size() {
bwT.metrics.failedReads.Inc()
bwT.blockWALMetrics.IncFailedReads()
return nil, fmt.Errorf("only %v of total %v bytes of file %s were read", n, stat.Size(), fileName)
}
block, err := state.BlockFromBytes(blockBytes)
if err != nil {
bwT.metrics.failedReads.Inc()
bwT.blockWALMetrics.IncFailedReads()
return nil, fmt.Errorf("error parsing block from bytes read from file %s: %w", fileName, err)
}
return block, nil
Expand Down
42 changes: 0 additions & 42 deletions packages/chain/statemanager/smGPA/smGPAUtils/block_wal_metrics.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"pgregory.net/rapid"

"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/wasp/packages/metrics"
"github.com/iotaledger/wasp/packages/origin"
"github.com/iotaledger/wasp/packages/state"
"github.com/iotaledger/wasp/packages/testutil/testlogger"
Expand All @@ -31,7 +32,7 @@ func (bwtsmT *blockWALTestSM) Init(t *rapid.T) {
bwtsmT.factory = NewBlockFactory(t)
bwtsmT.lastBlockCommitment = origin.L1Commitment(nil, 0)
bwtsmT.log = testlogger.NewLogger(t)
bwtsmT.bw, err = NewBlockWAL(bwtsmT.log, constTestFolder, bwtsmT.factory.GetChainID(), NewBlockWALMetrics())
bwtsmT.bw, err = NewBlockWAL(bwtsmT.log, constTestFolder, bwtsmT.factory.GetChainID(), metrics.NewEmptyBlockWALMetric())
require.NoError(t, err)
bwtsmT.blocks = make(map[state.BlockHash]state.Block)
bwtsmT.blocksMoved = make([]state.BlockHash, 0)
Expand Down Expand Up @@ -164,7 +165,7 @@ func (bwtsmT *blockWALTestSM) ReadDamagedBlock(t *rapid.T) {

func (bwtsmT *blockWALTestSM) Restart(t *rapid.T) {
var err error
bwtsmT.bw, err = NewBlockWAL(bwtsmT.log, constTestFolder, bwtsmT.factory.GetChainID(), NewBlockWALMetrics())
bwtsmT.bw, err = NewBlockWAL(bwtsmT.log, constTestFolder, bwtsmT.factory.GetChainID(), metrics.NewEmptyBlockWALMetric())
require.NoError(t, err)
t.Log("Block WAL restarted")
}
Expand Down
11 changes: 6 additions & 5 deletions packages/chain/statemanager/smGPA/smGPAUtils/block_wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/iotaledger/wasp/packages/isc"
"github.com/iotaledger/wasp/packages/metrics"
"github.com/iotaledger/wasp/packages/state"
"github.com/iotaledger/wasp/packages/testutil/testlogger"
)
Expand All @@ -22,9 +23,9 @@ func TestBlockWALBasic(t *testing.T) {
factory := NewBlockFactory(t)
blocks := factory.GetBlocks(5, 1)
blocksInWAL := blocks[:4]
walGood, err := NewBlockWAL(log, constTestFolder, factory.GetChainID(), NewBlockWALMetrics())
walGood, err := NewBlockWAL(log, constTestFolder, factory.GetChainID(), metrics.NewEmptyBlockWALMetric())
require.NoError(t, err)
walBad, err := NewBlockWAL(log, constTestFolder, isc.RandomChainID(), NewBlockWALMetrics())
walBad, err := NewBlockWAL(log, constTestFolder, isc.RandomChainID(), metrics.NewEmptyBlockWALMetric())
require.NoError(t, err)
for i := range blocksInWAL {
err = walGood.Write(blocks[i])
Expand Down Expand Up @@ -57,7 +58,7 @@ func TestBlockWALOverwrite(t *testing.T) {

factory := NewBlockFactory(t)
blocks := factory.GetBlocks(4, 1)
wal, err := NewBlockWAL(log, constTestFolder, factory.GetChainID(), NewBlockWALMetrics())
wal, err := NewBlockWAL(log, constTestFolder, factory.GetChainID(), metrics.NewEmptyBlockWALMetric())
require.NoError(t, err)
for i := range blocks {
err = wal.Write(blocks[i])
Expand Down Expand Up @@ -99,15 +100,15 @@ func TestBlockWALRestart(t *testing.T) {

factory := NewBlockFactory(t)
blocks := factory.GetBlocks(4, 1)
wal, err := NewBlockWAL(log, constTestFolder, factory.GetChainID(), NewBlockWALMetrics())
wal, err := NewBlockWAL(log, constTestFolder, factory.GetChainID(), metrics.NewEmptyBlockWALMetric())
require.NoError(t, err)
for i := range blocks {
err = wal.Write(blocks[i])
require.NoError(t, err)
}

// Restart: WAL object is recreated
wal, err = NewBlockWAL(log, constTestFolder, factory.GetChainID(), NewBlockWALMetrics())
wal, err = NewBlockWAL(log, constTestFolder, factory.GetChainID(), metrics.NewEmptyBlockWALMetric())
require.NoError(t, err)
for i := range blocks {
require.True(t, wal.Contains(blocks[i].Hash()))
Expand Down
Loading

0 comments on commit 846ee3e

Please sign in to comment.