diff --git a/cmd/exporter/main.go b/cmd/exporter/main.go index 5a5e9be..071e6b0 100644 --- a/cmd/exporter/main.go +++ b/cmd/exporter/main.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "net/http" "os" "os/signal" "syscall" @@ -13,6 +14,8 @@ import ( "cosmos-evm-exporter/internal/config" "cosmos-evm-exporter/internal/logger" "cosmos-evm-exporter/internal/metrics" + + "github.com/prometheus/client_golang/prometheus/promhttp" ) func main() { @@ -33,27 +36,25 @@ func main() { LogFile: cfg.LogFile, }) - // Initialize metrics - blockMetrics := metrics.NewBlockMetrics() + // Initialize metrics and register with default prometheus handler + metrics := metrics.NewBlockMetrics() + http.Handle("/metrics", promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{})) // Start metrics server - metricsServer := metrics.NewServer(cfg.MetricsPort, blockMetrics.Registry) go func() { - if err := metricsServer.Start(); err != nil { - log.WriteJSONLog("error", "Metrics server failed", nil, err) + if err := http.ListenAndServe(cfg.MetricsPort, nil); err != nil { + log.WriteJSONLog("error", "Failed to start metrics server", nil, err) + os.Exit(1) } }() // Initialize block processor - processor, err := blockchain.NewBlockProcessor(cfg, blockMetrics, log) + processor, err := blockchain.NewBlockProcessor(cfg, metrics, log) if err != nil { log.WriteJSONLog("error", "Failed to create block processor", nil, err) os.Exit(1) } - // Start metrics updater - processor.StartMetricsUpdater(5 * time.Second) - // Setup graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -68,6 +69,9 @@ func main() { cancel() }() + // Start metrics updater + processor.StartMetricsUpdater(ctx, 5*time.Second) + // Log startup log.WriteJSONLog("info", "Starting exporter", map[string]interface{}{ "metrics_port": cfg.MetricsPort, diff --git a/internal/blockchain/processor.go b/internal/blockchain/processor.go index bc37e49..bd3b0bf 100644 --- a/internal/blockchain/processor.go +++ b/internal/blockchain/processor.go @@ -29,8 +29,9 @@ func NewBlockProcessor(config *config.Config, metrics *metrics.BlockMetrics, log } func (p *BlockProcessor) ProcessBlock(block *BlockResponse) error { - if block == nil { - return fmt.Errorf("block is nil") + if block == nil || block.Result.BlockID.Hash == "" { + p.metrics.Errors.Inc() + return fmt.Errorf("block is nil or invalid") } header := block.Result.Block.Header @@ -41,6 +42,7 @@ func (p *BlockProcessor) ProcessBlock(block *BlockResponse) error { }, nil) if header.ProposerAddress == "" { + p.metrics.Errors.Inc() return fmt.Errorf("received empty proposer address for height %s", header.Height) } @@ -75,7 +77,17 @@ func (p *BlockProcessor) checkExecutionBlocks(clHeight, expectedELHeight int64) // Check if consensus block was empty block, err := GetBlock(httpClient.NewClient(), p.config.RPCEndpoint, clHeight) - if err == nil && len(block.Result.Block.Data.Txs) == 0 { + if err != nil { + p.metrics.Errors.Inc() + return fmt.Errorf("failed to get consensus block: %w", err) + } + + if block == nil || block.Result.BlockID.Hash == "" { + p.metrics.Errors.Inc() + return fmt.Errorf("failed to get consensus block: invalid response") + } + + if len(block.Result.Block.Data.Txs) == 0 { p.metrics.EmptyConsensusBlocks.Inc() p.logger.WriteJSONLog("info", "Empty consensus block", map[string]interface{}{ "height": clHeight, @@ -86,6 +98,7 @@ func (p *BlockProcessor) checkExecutionBlocks(clHeight, expectedELHeight int64) for height := startHeight; height <= endHeight; height++ { block, err := p.client.BlockByNumber(context.Background(), big.NewInt(height)) if err != nil { + p.metrics.Errors.Inc() p.logger.WriteJSONLog("error", "Failed to fetch block", map[string]interface{}{ "height": height, }, err) @@ -135,6 +148,7 @@ func (p *BlockProcessor) Start(ctx context.Context) { if currentHeight == 0 { height, err := p.GetCurrentHeight() if err != nil { + p.metrics.Errors.Inc() p.logger.WriteJSONLog("error", "Failed to get current height", nil, err) time.Sleep(2 * time.Second) continue @@ -156,13 +170,15 @@ func (p *BlockProcessor) Start(ctx context.Context) { // Process the block if err := p.ProcessBlock(block); err != nil { + p.metrics.Errors.Inc() p.logger.WriteJSONLog("error", "Error processing block", map[string]interface{}{ "height": currentHeight, }, err) errorBlocks++ - // Only increment height if it's not our validator's block - // This ensures we don't miss our validator's blocks due to temporary errors - if err.Error() != "block is nil" && block.Result.Block.Header.ProposerAddress != p.config.TargetValidator { + // Only increment height if it's not our validator's block and block is valid + if err.Error() != "block is nil" && block != nil && + block.Result.BlockID.Hash != "" && // Check for valid block using hash instead + block.Result.Block.Header.ProposerAddress != p.config.TargetValidator { currentHeight++ } continue @@ -176,32 +192,42 @@ func (p *BlockProcessor) Start(ctx context.Context) { } // StartMetricsUpdater starts goroutines that continuously update metrics -func (p *BlockProcessor) StartMetricsUpdater(interval time.Duration) { - // Update current height metric +func (p *BlockProcessor) StartMetricsUpdater(ctx context.Context, interval time.Duration) { + // Current height updater go func() { for { - height, err := p.GetCurrentHeight() - if err != nil { - p.logger.WriteJSONLog("error", "Failed to get current height", nil, err) - p.metrics.Errors.Inc() - } else { - p.metrics.CurrentHeight.Set(float64(height)) + select { + case <-ctx.Done(): + return + default: + height, err := p.GetCurrentHeight() + if err != nil { + p.logger.WriteJSONLog("error", "Failed to get current height", nil, err) + p.metrics.Errors.Inc() + } else { + p.metrics.CurrentHeight.Set(float64(height)) + } + time.Sleep(interval) } - time.Sleep(interval) } }() - // Update gap metric + // Gap metric updater go func() { for { - gap, err := p.GetCurrentGap() - if err != nil { - p.logger.WriteJSONLog("error", "Failed to get current gap", nil, err) - p.metrics.Errors.Inc() - } else { - p.metrics.ElToClGap.Set(float64(gap)) + select { + case <-ctx.Done(): + return + default: + gap, err := p.GetCurrentGap() + if err != nil { + p.logger.WriteJSONLog("error", "Failed to get current gap", nil, err) + p.metrics.Errors.Inc() + } else { + p.metrics.ElToClGap.Set(float64(gap)) + } + time.Sleep(interval) } - time.Sleep(interval) } }() } diff --git a/internal/blockchain/processor_test.go b/internal/blockchain/processor_test.go index 04da8c3..87174fd 100644 --- a/internal/blockchain/processor_test.go +++ b/internal/blockchain/processor_test.go @@ -14,6 +14,7 @@ import ( "cosmos-evm-exporter/internal/logger" "cosmos-evm-exporter/internal/metrics" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" ) @@ -26,7 +27,15 @@ func (m *MockEthClient) BlockByNumber(ctx context.Context, number *big.Int) (*ty if block, ok := m.blocks[number.Int64()]; ok { return block, nil } - return nil, fmt.Errorf("block not found") + // Create a mock block for testing + header := &types.Header{ + Number: number, + Time: uint64(time.Now().Unix()), + Coinbase: common.HexToAddress("0x1234"), // Match the test case EVMAddress + } + block := types.NewBlockWithHeader(header) + m.blocks[number.Int64()] = block + return block, nil } func TestProcessBlock(t *testing.T) { @@ -36,13 +45,14 @@ func TestProcessBlock(t *testing.T) { // Setup mock servers for EL and CL endpoints elServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - fmt.Fprintln(w, `{"jsonrpc":"2.0","id":"1","result":"0x64"}`) // hex for height 100 + fmt.Fprintln(w, `{"jsonrpc":"2.0","id":1,"result":"0x64"}`) // hex for height 100 })) defer elServer.Close() clServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - if r.URL.Path == "/status" { + switch r.URL.Path { + case "/status": json.NewEncoder(w).Encode(StatusResponse{ Result: struct { SyncInfo struct { @@ -56,6 +66,31 @@ func TestProcessBlock(t *testing.T) { }, }, }) + case "/block": + // Return a valid block response + json.NewEncoder(w).Encode(map[string]interface{}{ + "jsonrpc": "2.0", + "id": 1, + "result": map[string]interface{}{ + "block_id": map[string]interface{}{ + "hash": "test_hash_123", + "parts": map[string]interface{}{ + "total": 1, + "hash": "parts_hash_123", + }, + }, + "block": map[string]interface{}{ + "header": map[string]interface{}{ + "height": "100", + "proposer_address": "validator1", + "time": time.Now(), + }, + "data": map[string]interface{}{ + "txs": []string{"tx1", "tx2"}, + }, + }, + }, + }) } })) defer clServer.Close() @@ -113,6 +148,22 @@ func TestProcessBlock(t *testing.T) { } `json:"data"` } `json:"block"` }{ + BlockID: struct { + Hash string `json:"hash"` + Parts struct { + Total int `json:"total"` + Hash string `json:"hash"` + } `json:"parts"` + }{ + Hash: "test_hash_123", + Parts: struct { + Total int `json:"total"` + Hash string `json:"hash"` + }{ + Total: 1, + Hash: "parts_hash_123", + }, + }, Block: struct { Header struct { Version struct { @@ -229,6 +280,22 @@ func TestProcessBlock(t *testing.T) { } `json:"data"` } `json:"block"` }{ + BlockID: struct { + Hash string `json:"hash"` + Parts struct { + Total int `json:"total"` + Hash string `json:"hash"` + } `json:"parts"` + }{ + Hash: "test_hash_123", + Parts: struct { + Total int `json:"total"` + Hash string `json:"hash"` + }{ + Total: 1, + Hash: "parts_hash_123", + }, + }, Block: struct { Header struct { Version struct { @@ -312,18 +379,17 @@ func TestProcessBlock(t *testing.T) { RPCEndpoint: clServer.URL, } - mockClient := &MockEthClient{ - blocks: make(map[int64]*types.Block), + processor, err := NewBlockProcessor(config, metrics, testLogger) + if err != nil { + t.Fatalf("Failed to create processor: %v", err) } - processor := &BlockProcessor{ - config: config, - logger: testLogger, - metrics: metrics, - client: mockClient, + // Initialize mock client + processor.client = &MockEthClient{ + blocks: make(map[int64]*types.Block), } - err := processor.ProcessBlock(tt.block) + err = processor.ProcessBlock(tt.block) if (err != nil) != tt.wantError { t.Errorf("ProcessBlock() error = %v, wantError %v", err, tt.wantError) }