Skip to content

Commit

Permalink
fix: improve error handling and metric counters
Browse files Browse the repository at this point in the history
  • Loading branch information
chliddle committed Nov 15, 2024
1 parent e49216f commit f575942
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 43 deletions.
22 changes: 13 additions & 9 deletions cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
Expand All @@ -13,6 +14,8 @@ import (
"cosmos-evm-exporter/internal/config"
"cosmos-evm-exporter/internal/logger"
"cosmos-evm-exporter/internal/metrics"

"github.com/prometheus/client_golang/prometheus/promhttp"
)

func main() {
Expand All @@ -33,27 +36,25 @@ func main() {
LogFile: cfg.LogFile,
})

// Initialize metrics
blockMetrics := metrics.NewBlockMetrics()
// Initialize metrics and register with default prometheus handler
metrics := metrics.NewBlockMetrics()
http.Handle("/metrics", promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{}))

// Start metrics server
metricsServer := metrics.NewServer(cfg.MetricsPort, blockMetrics.Registry)
go func() {
if err := metricsServer.Start(); err != nil {
log.WriteJSONLog("error", "Metrics server failed", nil, err)
if err := http.ListenAndServe(cfg.MetricsPort, nil); err != nil {
log.WriteJSONLog("error", "Failed to start metrics server", nil, err)
os.Exit(1)
}
}()

// Initialize block processor
processor, err := blockchain.NewBlockProcessor(cfg, blockMetrics, log)
processor, err := blockchain.NewBlockProcessor(cfg, metrics, log)
if err != nil {
log.WriteJSONLog("error", "Failed to create block processor", nil, err)
os.Exit(1)
}

// Start metrics updater
processor.StartMetricsUpdater(5 * time.Second)

// Setup graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -68,6 +69,9 @@ func main() {
cancel()
}()

// Start metrics updater
processor.StartMetricsUpdater(ctx, 5*time.Second)

// Log startup
log.WriteJSONLog("info", "Starting exporter", map[string]interface{}{
"metrics_port": cfg.MetricsPort,
Expand Down
72 changes: 49 additions & 23 deletions internal/blockchain/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ func NewBlockProcessor(config *config.Config, metrics *metrics.BlockMetrics, log
}

func (p *BlockProcessor) ProcessBlock(block *BlockResponse) error {
if block == nil {
return fmt.Errorf("block is nil")
if block == nil || block.Result.BlockID.Hash == "" {
p.metrics.Errors.Inc()
return fmt.Errorf("block is nil or invalid")
}

header := block.Result.Block.Header
Expand All @@ -41,6 +42,7 @@ func (p *BlockProcessor) ProcessBlock(block *BlockResponse) error {
}, nil)

if header.ProposerAddress == "" {
p.metrics.Errors.Inc()
return fmt.Errorf("received empty proposer address for height %s", header.Height)
}

Expand Down Expand Up @@ -75,7 +77,17 @@ func (p *BlockProcessor) checkExecutionBlocks(clHeight, expectedELHeight int64)

// Check if consensus block was empty
block, err := GetBlock(httpClient.NewClient(), p.config.RPCEndpoint, clHeight)
if err == nil && len(block.Result.Block.Data.Txs) == 0 {
if err != nil {
p.metrics.Errors.Inc()
return fmt.Errorf("failed to get consensus block: %w", err)
}

if block == nil || block.Result.BlockID.Hash == "" {
p.metrics.Errors.Inc()
return fmt.Errorf("failed to get consensus block: invalid response")
}

if len(block.Result.Block.Data.Txs) == 0 {
p.metrics.EmptyConsensusBlocks.Inc()
p.logger.WriteJSONLog("info", "Empty consensus block", map[string]interface{}{
"height": clHeight,
Expand All @@ -86,6 +98,7 @@ func (p *BlockProcessor) checkExecutionBlocks(clHeight, expectedELHeight int64)
for height := startHeight; height <= endHeight; height++ {
block, err := p.client.BlockByNumber(context.Background(), big.NewInt(height))
if err != nil {
p.metrics.Errors.Inc()
p.logger.WriteJSONLog("error", "Failed to fetch block", map[string]interface{}{
"height": height,
}, err)
Expand Down Expand Up @@ -135,6 +148,7 @@ func (p *BlockProcessor) Start(ctx context.Context) {
if currentHeight == 0 {
height, err := p.GetCurrentHeight()
if err != nil {
p.metrics.Errors.Inc()
p.logger.WriteJSONLog("error", "Failed to get current height", nil, err)
time.Sleep(2 * time.Second)
continue
Expand All @@ -156,13 +170,15 @@ func (p *BlockProcessor) Start(ctx context.Context) {

// Process the block
if err := p.ProcessBlock(block); err != nil {
p.metrics.Errors.Inc()
p.logger.WriteJSONLog("error", "Error processing block", map[string]interface{}{
"height": currentHeight,
}, err)
errorBlocks++
// Only increment height if it's not our validator's block
// This ensures we don't miss our validator's blocks due to temporary errors
if err.Error() != "block is nil" && block.Result.Block.Header.ProposerAddress != p.config.TargetValidator {
// Only increment height if it's not our validator's block and block is valid
if err.Error() != "block is nil" && block != nil &&
block.Result.BlockID.Hash != "" && // Check for valid block using hash instead
block.Result.Block.Header.ProposerAddress != p.config.TargetValidator {
currentHeight++
}
continue
Expand All @@ -176,32 +192,42 @@ func (p *BlockProcessor) Start(ctx context.Context) {
}

// StartMetricsUpdater starts goroutines that continuously update metrics
func (p *BlockProcessor) StartMetricsUpdater(interval time.Duration) {
// Update current height metric
func (p *BlockProcessor) StartMetricsUpdater(ctx context.Context, interval time.Duration) {
// Current height updater
go func() {
for {
height, err := p.GetCurrentHeight()
if err != nil {
p.logger.WriteJSONLog("error", "Failed to get current height", nil, err)
p.metrics.Errors.Inc()
} else {
p.metrics.CurrentHeight.Set(float64(height))
select {
case <-ctx.Done():
return
default:
height, err := p.GetCurrentHeight()
if err != nil {
p.logger.WriteJSONLog("error", "Failed to get current height", nil, err)
p.metrics.Errors.Inc()
} else {
p.metrics.CurrentHeight.Set(float64(height))
}
time.Sleep(interval)
}
time.Sleep(interval)
}
}()

// Update gap metric
// Gap metric updater
go func() {
for {
gap, err := p.GetCurrentGap()
if err != nil {
p.logger.WriteJSONLog("error", "Failed to get current gap", nil, err)
p.metrics.Errors.Inc()
} else {
p.metrics.ElToClGap.Set(float64(gap))
select {
case <-ctx.Done():
return
default:
gap, err := p.GetCurrentGap()
if err != nil {
p.logger.WriteJSONLog("error", "Failed to get current gap", nil, err)
p.metrics.Errors.Inc()
} else {
p.metrics.ElToClGap.Set(float64(gap))
}
time.Sleep(interval)
}
time.Sleep(interval)
}
}()
}
88 changes: 77 additions & 11 deletions internal/blockchain/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"cosmos-evm-exporter/internal/logger"
"cosmos-evm-exporter/internal/metrics"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

Expand All @@ -26,7 +27,15 @@ func (m *MockEthClient) BlockByNumber(ctx context.Context, number *big.Int) (*ty
if block, ok := m.blocks[number.Int64()]; ok {
return block, nil
}
return nil, fmt.Errorf("block not found")
// Create a mock block for testing
header := &types.Header{
Number: number,
Time: uint64(time.Now().Unix()),
Coinbase: common.HexToAddress("0x1234"), // Match the test case EVMAddress
}
block := types.NewBlockWithHeader(header)
m.blocks[number.Int64()] = block
return block, nil
}

func TestProcessBlock(t *testing.T) {
Expand All @@ -36,13 +45,14 @@ func TestProcessBlock(t *testing.T) {
// Setup mock servers for EL and CL endpoints
elServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"jsonrpc":"2.0","id":"1","result":"0x64"}`) // hex for height 100
fmt.Fprintln(w, `{"jsonrpc":"2.0","id":1,"result":"0x64"}`) // hex for height 100
}))
defer elServer.Close()

clServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if r.URL.Path == "/status" {
switch r.URL.Path {
case "/status":
json.NewEncoder(w).Encode(StatusResponse{
Result: struct {
SyncInfo struct {
Expand All @@ -56,6 +66,31 @@ func TestProcessBlock(t *testing.T) {
},
},
})
case "/block":
// Return a valid block response
json.NewEncoder(w).Encode(map[string]interface{}{
"jsonrpc": "2.0",
"id": 1,
"result": map[string]interface{}{
"block_id": map[string]interface{}{
"hash": "test_hash_123",
"parts": map[string]interface{}{
"total": 1,
"hash": "parts_hash_123",
},
},
"block": map[string]interface{}{
"header": map[string]interface{}{
"height": "100",
"proposer_address": "validator1",
"time": time.Now(),
},
"data": map[string]interface{}{
"txs": []string{"tx1", "tx2"},
},
},
},
})
}
}))
defer clServer.Close()
Expand Down Expand Up @@ -113,6 +148,22 @@ func TestProcessBlock(t *testing.T) {
} `json:"data"`
} `json:"block"`
}{
BlockID: struct {
Hash string `json:"hash"`
Parts struct {
Total int `json:"total"`
Hash string `json:"hash"`
} `json:"parts"`
}{
Hash: "test_hash_123",
Parts: struct {
Total int `json:"total"`
Hash string `json:"hash"`
}{
Total: 1,
Hash: "parts_hash_123",
},
},
Block: struct {
Header struct {
Version struct {
Expand Down Expand Up @@ -229,6 +280,22 @@ func TestProcessBlock(t *testing.T) {
} `json:"data"`
} `json:"block"`
}{
BlockID: struct {
Hash string `json:"hash"`
Parts struct {
Total int `json:"total"`
Hash string `json:"hash"`
} `json:"parts"`
}{
Hash: "test_hash_123",
Parts: struct {
Total int `json:"total"`
Hash string `json:"hash"`
}{
Total: 1,
Hash: "parts_hash_123",
},
},
Block: struct {
Header struct {
Version struct {
Expand Down Expand Up @@ -312,18 +379,17 @@ func TestProcessBlock(t *testing.T) {
RPCEndpoint: clServer.URL,
}

mockClient := &MockEthClient{
blocks: make(map[int64]*types.Block),
processor, err := NewBlockProcessor(config, metrics, testLogger)
if err != nil {
t.Fatalf("Failed to create processor: %v", err)
}

processor := &BlockProcessor{
config: config,
logger: testLogger,
metrics: metrics,
client: mockClient,
// Initialize mock client
processor.client = &MockEthClient{
blocks: make(map[int64]*types.Block),
}

err := processor.ProcessBlock(tt.block)
err = processor.ProcessBlock(tt.block)
if (err != nil) != tt.wantError {
t.Errorf("ProcessBlock() error = %v, wantError %v", err, tt.wantError)
}
Expand Down

0 comments on commit f575942

Please sign in to comment.