From 3aa76a77711e86b27bcd6e33866965f92058182d Mon Sep 17 00:00:00 2001 From: aloknerurkar Date: Fri, 20 Sep 2024 22:16:04 +0530 Subject: [PATCH] fix: misc fixes from negative testing (#407) --- oracle/pkg/updater/updater.go | 34 ++++++++++++++----------- p2p/pkg/rpc/provider/export_test.go | 5 ++++ p2p/pkg/rpc/provider/service.go | 8 ++++++ p2p/pkg/rpc/provider/service_test.go | 11 +++++++++ x/contracts/events/publisher/http.go | 8 +++--- x/contracts/txmonitor/eth_helper.go | 37 ++++++++++++++++++---------- 6 files changed, 71 insertions(+), 32 deletions(-) create mode 100644 p2p/pkg/rpc/provider/export_test.go diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index a42c5389e..072784b14 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -487,12 +487,12 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx txns, ok := u.l1BlockCache.Get(blockNum) if ok { u.metrics.BlockTxnCacheHits.Inc() - u.logger.Info("cache hit for block transactions", "blockNum", blockNum) + u.logger.Debug("cache hit for block transactions", "blockNum", blockNum) return txns, nil } u.metrics.BlockTxnCacheMisses.Inc() - u.logger.Info("cache miss for block transactions", "blockNum", blockNum) + u.logger.Debug("cache miss for block transactions", "blockNum", blockNum) block, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum)) if err != nil { @@ -500,7 +500,7 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx return nil, fmt.Errorf("failed to get block by number: %w", err) } - u.logger.Info("retrieved block", "blockNum", blockNum, "blockHash", block.Hash().Hex()) + u.logger.Debug("retrieved block", "blockNum", blockNum, "blockHash", block.Hash().Hex()) var txnReceipts sync.Map eg, ctx := errgroup.WithContext(ctx) @@ -527,22 +527,22 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx for _, bucket := range buckets { eg.Go(func() error { start := time.Now() - u.logger.Info("requesting batch receipts", "bucketSize", len(bucket)) + u.logger.Debug("requesting batch receipts", "bucketSize", len(bucket)) results, err := u.receiptBatcher.BatchReceipts(ctx, bucket) if err != nil { u.logger.Error("failed to get batch receipts", "error", err) return fmt.Errorf("failed to get batch receipts: %w", err) } u.metrics.TxnReceiptRequestDuration.Observe(time.Since(start).Seconds()) - u.logger.Info("received batch receipts", "duration", time.Since(start).Seconds()) + u.logger.Debug("received batch receipts", "duration", time.Since(start).Seconds()) for _, result := range results { if result.Err != nil { u.logger.Error("failed to get receipt for txn", "txnHash", result.Receipt.TxHash.Hex(), "error", result.Err) - return fmt.Errorf("failed to get receipt for txn: %s", result.Err) + continue } txnReceipts.Store(result.Receipt.TxHash.Hex(), result.Receipt) - u.logger.Info("stored receipt", "txnHash", result.Receipt.TxHash.Hex()) + u.logger.Debug("stored receipt", "txnHash", result.Receipt.TxHash.Hex()) } return nil @@ -562,14 +562,14 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx receipt, ok := txnReceipts.Load(tx.Hex()) if !ok { u.logger.Error("receipt not found for txn", "txnHash", tx.Hex()) - return nil, fmt.Errorf("receipt not found for txn: %s", tx) + continue } txnsMap[strings.TrimPrefix(tx.Hex(), "0x")] = TxMetadata{PosInBlock: i, Succeeded: receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful} - u.logger.Info("added txn to map", "txnHash", tx.Hex(), "posInBlock", i, "succeeded", receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful) + u.logger.Debug("added txn to map", "txnHash", tx.Hex(), "posInBlock", i, "succeeded", receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful) } _ = u.l1BlockCache.Add(blockNum, txnsMap) - u.logger.Info("added block transactions to cache", "blockNum", blockNum) + u.logger.Debug("added block transactions to cache", "blockNum", blockNum) return txnsMap, nil } @@ -579,24 +579,28 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx // (e.g they could be unix or unixMili timestamps) func (u *Updater) computeDecayPercentage(startTimestamp, endTimestamp, commitTimestamp uint64) int64 { if startTimestamp >= endTimestamp || startTimestamp > commitTimestamp || endTimestamp <= commitTimestamp { - u.logger.Info("timestamp out of range", "startTimestamp", startTimestamp, "endTimestamp", endTimestamp, "commitTimestamp", commitTimestamp) + u.logger.Debug("timestamp out of range", "startTimestamp", startTimestamp, "endTimestamp", endTimestamp, "commitTimestamp", commitTimestamp) return 0 } // Calculate the total time in seconds totalTime := endTimestamp - startTimestamp - u.logger.Info("totalTime", "totalTime", totalTime) // Calculate the time passed in seconds timePassed := commitTimestamp - startTimestamp - u.logger.Info("timePassed", "timePassed", timePassed) // Calculate the decay percentage decayPercentage := float64(timePassed) / float64(totalTime) - u.logger.Info("decayPercentage", "decayPercentage", decayPercentage) decayPercentageRound := int64(math.Round(decayPercentage * 100)) if decayPercentageRound > 100 { decayPercentageRound = 100 } - u.logger.Info("decayPercentageRound", "decayPercentageRound", decayPercentageRound) + u.logger.Debug("decay information", + "startTimestamp", startTimestamp, + "endTimestamp", endTimestamp, + "commitTimestamp", commitTimestamp, + "totalTime", totalTime, + "timePassed", timePassed, + "decayPercentage", decayPercentage, + ) return decayPercentageRound } diff --git a/p2p/pkg/rpc/provider/export_test.go b/p2p/pkg/rpc/provider/export_test.go new file mode 100644 index 000000000..2c568982e --- /dev/null +++ b/p2p/pkg/rpc/provider/export_test.go @@ -0,0 +1,5 @@ +package providerapi + +func (s *Service) ActiveReceivers() int { + return int(s.activeReceivers.Load()) +} diff --git a/p2p/pkg/rpc/provider/service.go b/p2p/pkg/rpc/provider/service.go index ece3c3670..f5211907e 100644 --- a/p2p/pkg/rpc/provider/service.go +++ b/p2p/pkg/rpc/provider/service.go @@ -9,6 +9,7 @@ import ( "math/big" "strings" "sync" + "sync/atomic" "github.com/bufbuild/protovalidate-go" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -38,6 +39,7 @@ type Service struct { optsGetter OptsGetter metrics *metrics validator *protovalidate.Validator + activeReceivers atomic.Int32 } type ProviderRegistryContract interface { @@ -89,6 +91,9 @@ func (s *Service) ProcessBid( ctx context.Context, bid *preconfpb.Bid, ) (chan ProcessedBidResponse, error) { + if s.activeReceivers.Load() == 0 { + return nil, status.Error(codes.Internal, "no active receivers") + } var revertingTxnHashes []string if bid.RevertingTxHashes != "" { revertingTxnHashes = strings.Split(bid.RevertingTxHashes, ",") @@ -139,6 +144,9 @@ func (s *Service) ReceiveBids( _ *providerapiv1.EmptyMessage, srv providerapiv1.Provider_ReceiveBidsServer, ) error { + s.activeReceivers.Add(1) + defer s.activeReceivers.Add(-1) + for { select { case <-srv.Context().Done(): diff --git a/p2p/pkg/rpc/provider/service_test.go b/p2p/pkg/rpc/provider/service_test.go index 69eef7ac9..7fb0c93fb 100644 --- a/p2p/pkg/rpc/provider/service_test.go +++ b/p2p/pkg/rpc/provider/service_test.go @@ -379,6 +379,17 @@ func TestBidHandling(t *testing.T) { } }() + activeReceiverTimeout := time.Now().Add(2 * time.Second) + for { + if svc.ActiveReceivers() > 0 { + break + } + if time.Now().After(activeReceiverTimeout) { + t.Fatalf("timed out waiting for active receivers") + } + time.Sleep(10 * time.Millisecond) + } + sndr, err := client.SendProcessedBids(context.Background()) if err != nil { t.Fatalf("error sending processed bids: %v", err) diff --git a/x/contracts/events/publisher/http.go b/x/contracts/events/publisher/http.go index 2a9e5b101..87cfa0a31 100644 --- a/x/contracts/events/publisher/http.go +++ b/x/contracts/events/publisher/http.go @@ -71,8 +71,8 @@ func (h *httpPublisher) Start(ctx context.Context, contracts ...common.Address) case <-ticker.C: blockNumber, err := h.evmClient.BlockNumber(ctx) if err != nil { - h.logger.Error("failed to get block number", "error", err) - return + h.logger.Warn("failed to get block number", "error", err) + continue } if blockNumber > lastBlock { @@ -84,8 +84,8 @@ func (h *httpPublisher) Start(ctx context.Context, contracts ...common.Address) logs, err := h.evmClient.FilterLogs(ctx, q) if err != nil { - h.logger.Error("failed to filter logs", "error", err) - return + h.logger.Warn("failed to filter logs", "error", err) + continue } for _, logMsg := range logs { diff --git a/x/contracts/txmonitor/eth_helper.go b/x/contracts/txmonitor/eth_helper.go index b27ab10f6..6b3dba840 100644 --- a/x/contracts/txmonitor/eth_helper.go +++ b/x/contracts/txmonitor/eth_helper.go @@ -49,7 +49,6 @@ func NewEVMHelperWithLogger(client *ethclient.Client, logger *slog.Logger, contr // BatchReceipts retrieves multiple receipts for a list of transaction hashes. func (e *EVMHelperImpl) BatchReceipts(ctx context.Context, txHashes []common.Hash) ([]Result, error) { - e.logger.Debug("Starting BatchReceipts", "txHashes", txHashes) batch := make([]rpc.BatchElem, len(txHashes)) for i, hash := range txHashes { @@ -92,19 +91,31 @@ func (e *EVMHelperImpl) BatchReceipts(ctx context.Context, txHashes []common.Has // Retry individual failed transactions for i, receipt := range receipts { - if receipt.Err != nil { - e.logger.Info("Retrying failed transaction", "index", i, "hash", txHashes[i].Hex()) - for attempts := 0; attempts < 50; attempts++ { - e.logger.Info("Attempting individual call", "attempt", attempts+1, "hash", txHashes[i].Hex()) - err = e.client.Client().CallContext(context.Background(), receipt.Receipt, "eth_getTransactionReceipt", txHashes[i]) - if err == nil { - e.logger.Info("Individual call succeeded", "attempt", attempts+1, "hash", txHashes[i].Hex()) - receipts[i].Err = nil - break - } - e.logger.Error("Individual call attempt failed", "attempt", attempts+1, "hash", txHashes[i].Hex(), "error", err) - time.Sleep(1 * time.Second) + switch { + case receipt.Err == nil: + continue + case errors.Is(receipt.Err, ethereum.NotFound): + e.logger.Info("Transaction not found", "index", i, "hash", txHashes[i].Hex()) + continue + case errors.Is(receipt.Err, rpc.ErrMissingBatchResponse): + e.logger.Info("Missing batch response", "index", i, "hash", txHashes[i].Hex()) + case errors.Is(receipt.Err, rpc.ErrNoResult): + e.logger.Info("No result from batch call", "index", i, "hash", txHashes[i].Hex()) + default: + e.logger.Error("Unknown error", "index", i, "hash", txHashes[i].Hex(), "error", receipt.Err) + continue + } + e.logger.Info("Retrying failed transaction", "index", i, "hash", txHashes[i].Hex()) + for attempts := 0; attempts < 10; attempts++ { + e.logger.Info("Attempting individual call", "attempt", attempts+1, "hash", txHashes[i].Hex()) + err = e.client.Client().CallContext(context.Background(), receipt.Receipt, "eth_getTransactionReceipt", txHashes[i]) + if err == nil { + e.logger.Info("Individual call succeeded", "attempt", attempts+1, "hash", txHashes[i].Hex()) + receipt.Err = nil + break } + e.logger.Error("Individual call attempt failed", "attempt", attempts+1, "hash", txHashes[i].Hex(), "error", err) + time.Sleep(1 * time.Second) } }