Skip to content

Commit

Permalink
fix: misc fixes from negative testing (#407)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloknerurkar authored Sep 20, 2024
1 parent 5fe2843 commit 3aa76a7
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 32 deletions.
34 changes: 19 additions & 15 deletions oracle/pkg/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,20 +487,20 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx
txns, ok := u.l1BlockCache.Get(blockNum)
if ok {
u.metrics.BlockTxnCacheHits.Inc()
u.logger.Info("cache hit for block transactions", "blockNum", blockNum)
u.logger.Debug("cache hit for block transactions", "blockNum", blockNum)
return txns, nil
}

u.metrics.BlockTxnCacheMisses.Inc()
u.logger.Info("cache miss for block transactions", "blockNum", blockNum)
u.logger.Debug("cache miss for block transactions", "blockNum", blockNum)

block, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum))
if err != nil {
u.logger.Error("failed to get block by number", "blockNum", blockNum, "error", err)
return nil, fmt.Errorf("failed to get block by number: %w", err)
}

u.logger.Info("retrieved block", "blockNum", blockNum, "blockHash", block.Hash().Hex())
u.logger.Debug("retrieved block", "blockNum", blockNum, "blockHash", block.Hash().Hex())

var txnReceipts sync.Map
eg, ctx := errgroup.WithContext(ctx)
Expand All @@ -527,22 +527,22 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx
for _, bucket := range buckets {
eg.Go(func() error {
start := time.Now()
u.logger.Info("requesting batch receipts", "bucketSize", len(bucket))
u.logger.Debug("requesting batch receipts", "bucketSize", len(bucket))
results, err := u.receiptBatcher.BatchReceipts(ctx, bucket)
if err != nil {
u.logger.Error("failed to get batch receipts", "error", err)
return fmt.Errorf("failed to get batch receipts: %w", err)
}
u.metrics.TxnReceiptRequestDuration.Observe(time.Since(start).Seconds())
u.logger.Info("received batch receipts", "duration", time.Since(start).Seconds())
u.logger.Debug("received batch receipts", "duration", time.Since(start).Seconds())
for _, result := range results {
if result.Err != nil {
u.logger.Error("failed to get receipt for txn", "txnHash", result.Receipt.TxHash.Hex(), "error", result.Err)
return fmt.Errorf("failed to get receipt for txn: %s", result.Err)
continue
}

txnReceipts.Store(result.Receipt.TxHash.Hex(), result.Receipt)
u.logger.Info("stored receipt", "txnHash", result.Receipt.TxHash.Hex())
u.logger.Debug("stored receipt", "txnHash", result.Receipt.TxHash.Hex())
}

return nil
Expand All @@ -562,14 +562,14 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx
receipt, ok := txnReceipts.Load(tx.Hex())
if !ok {
u.logger.Error("receipt not found for txn", "txnHash", tx.Hex())
return nil, fmt.Errorf("receipt not found for txn: %s", tx)
continue
}
txnsMap[strings.TrimPrefix(tx.Hex(), "0x")] = TxMetadata{PosInBlock: i, Succeeded: receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful}
u.logger.Info("added txn to map", "txnHash", tx.Hex(), "posInBlock", i, "succeeded", receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful)
u.logger.Debug("added txn to map", "txnHash", tx.Hex(), "posInBlock", i, "succeeded", receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful)
}

_ = u.l1BlockCache.Add(blockNum, txnsMap)
u.logger.Info("added block transactions to cache", "blockNum", blockNum)
u.logger.Debug("added block transactions to cache", "blockNum", blockNum)

return txnsMap, nil
}
Expand All @@ -579,24 +579,28 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx
// (e.g they could be unix or unixMili timestamps)
func (u *Updater) computeDecayPercentage(startTimestamp, endTimestamp, commitTimestamp uint64) int64 {
if startTimestamp >= endTimestamp || startTimestamp > commitTimestamp || endTimestamp <= commitTimestamp {
u.logger.Info("timestamp out of range", "startTimestamp", startTimestamp, "endTimestamp", endTimestamp, "commitTimestamp", commitTimestamp)
u.logger.Debug("timestamp out of range", "startTimestamp", startTimestamp, "endTimestamp", endTimestamp, "commitTimestamp", commitTimestamp)
return 0
}

// Calculate the total time in seconds
totalTime := endTimestamp - startTimestamp
u.logger.Info("totalTime", "totalTime", totalTime)
// Calculate the time passed in seconds
timePassed := commitTimestamp - startTimestamp
u.logger.Info("timePassed", "timePassed", timePassed)
// Calculate the decay percentage
decayPercentage := float64(timePassed) / float64(totalTime)
u.logger.Info("decayPercentage", "decayPercentage", decayPercentage)

decayPercentageRound := int64(math.Round(decayPercentage * 100))
if decayPercentageRound > 100 {
decayPercentageRound = 100
}
u.logger.Info("decayPercentageRound", "decayPercentageRound", decayPercentageRound)
u.logger.Debug("decay information",
"startTimestamp", startTimestamp,
"endTimestamp", endTimestamp,
"commitTimestamp", commitTimestamp,
"totalTime", totalTime,
"timePassed", timePassed,
"decayPercentage", decayPercentage,
)
return decayPercentageRound
}
5 changes: 5 additions & 0 deletions p2p/pkg/rpc/provider/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package providerapi

func (s *Service) ActiveReceivers() int {
return int(s.activeReceivers.Load())
}
8 changes: 8 additions & 0 deletions p2p/pkg/rpc/provider/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/big"
"strings"
"sync"
"sync/atomic"

"github.com/bufbuild/protovalidate-go"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -38,6 +39,7 @@ type Service struct {
optsGetter OptsGetter
metrics *metrics
validator *protovalidate.Validator
activeReceivers atomic.Int32
}

type ProviderRegistryContract interface {
Expand Down Expand Up @@ -89,6 +91,9 @@ func (s *Service) ProcessBid(
ctx context.Context,
bid *preconfpb.Bid,
) (chan ProcessedBidResponse, error) {
if s.activeReceivers.Load() == 0 {
return nil, status.Error(codes.Internal, "no active receivers")
}
var revertingTxnHashes []string
if bid.RevertingTxHashes != "" {
revertingTxnHashes = strings.Split(bid.RevertingTxHashes, ",")
Expand Down Expand Up @@ -139,6 +144,9 @@ func (s *Service) ReceiveBids(
_ *providerapiv1.EmptyMessage,
srv providerapiv1.Provider_ReceiveBidsServer,
) error {
s.activeReceivers.Add(1)
defer s.activeReceivers.Add(-1)

for {
select {
case <-srv.Context().Done():
Expand Down
11 changes: 11 additions & 0 deletions p2p/pkg/rpc/provider/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,17 @@ func TestBidHandling(t *testing.T) {
}
}()

activeReceiverTimeout := time.Now().Add(2 * time.Second)
for {
if svc.ActiveReceivers() > 0 {
break
}
if time.Now().After(activeReceiverTimeout) {
t.Fatalf("timed out waiting for active receivers")
}
time.Sleep(10 * time.Millisecond)
}

sndr, err := client.SendProcessedBids(context.Background())
if err != nil {
t.Fatalf("error sending processed bids: %v", err)
Expand Down
8 changes: 4 additions & 4 deletions x/contracts/events/publisher/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (h *httpPublisher) Start(ctx context.Context, contracts ...common.Address)
case <-ticker.C:
blockNumber, err := h.evmClient.BlockNumber(ctx)
if err != nil {
h.logger.Error("failed to get block number", "error", err)
return
h.logger.Warn("failed to get block number", "error", err)
continue
}

if blockNumber > lastBlock {
Expand All @@ -84,8 +84,8 @@ func (h *httpPublisher) Start(ctx context.Context, contracts ...common.Address)

logs, err := h.evmClient.FilterLogs(ctx, q)
if err != nil {
h.logger.Error("failed to filter logs", "error", err)
return
h.logger.Warn("failed to filter logs", "error", err)
continue
}

for _, logMsg := range logs {
Expand Down
37 changes: 24 additions & 13 deletions x/contracts/txmonitor/eth_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func NewEVMHelperWithLogger(client *ethclient.Client, logger *slog.Logger, contr

// BatchReceipts retrieves multiple receipts for a list of transaction hashes.
func (e *EVMHelperImpl) BatchReceipts(ctx context.Context, txHashes []common.Hash) ([]Result, error) {
e.logger.Debug("Starting BatchReceipts", "txHashes", txHashes)
batch := make([]rpc.BatchElem, len(txHashes))

for i, hash := range txHashes {
Expand Down Expand Up @@ -92,19 +91,31 @@ func (e *EVMHelperImpl) BatchReceipts(ctx context.Context, txHashes []common.Has

// Retry individual failed transactions
for i, receipt := range receipts {
if receipt.Err != nil {
e.logger.Info("Retrying failed transaction", "index", i, "hash", txHashes[i].Hex())
for attempts := 0; attempts < 50; attempts++ {
e.logger.Info("Attempting individual call", "attempt", attempts+1, "hash", txHashes[i].Hex())
err = e.client.Client().CallContext(context.Background(), receipt.Receipt, "eth_getTransactionReceipt", txHashes[i])
if err == nil {
e.logger.Info("Individual call succeeded", "attempt", attempts+1, "hash", txHashes[i].Hex())
receipts[i].Err = nil
break
}
e.logger.Error("Individual call attempt failed", "attempt", attempts+1, "hash", txHashes[i].Hex(), "error", err)
time.Sleep(1 * time.Second)
switch {
case receipt.Err == nil:
continue
case errors.Is(receipt.Err, ethereum.NotFound):
e.logger.Info("Transaction not found", "index", i, "hash", txHashes[i].Hex())
continue
case errors.Is(receipt.Err, rpc.ErrMissingBatchResponse):
e.logger.Info("Missing batch response", "index", i, "hash", txHashes[i].Hex())
case errors.Is(receipt.Err, rpc.ErrNoResult):
e.logger.Info("No result from batch call", "index", i, "hash", txHashes[i].Hex())
default:
e.logger.Error("Unknown error", "index", i, "hash", txHashes[i].Hex(), "error", receipt.Err)
continue
}
e.logger.Info("Retrying failed transaction", "index", i, "hash", txHashes[i].Hex())
for attempts := 0; attempts < 10; attempts++ {
e.logger.Info("Attempting individual call", "attempt", attempts+1, "hash", txHashes[i].Hex())
err = e.client.Client().CallContext(context.Background(), receipt.Receipt, "eth_getTransactionReceipt", txHashes[i])
if err == nil {
e.logger.Info("Individual call succeeded", "attempt", attempts+1, "hash", txHashes[i].Hex())
receipt.Err = nil
break
}
e.logger.Error("Individual call attempt failed", "attempt", attempts+1, "hash", txHashes[i].Hex(), "error", err)
time.Sleep(1 * time.Second)
}
}

Expand Down

0 comments on commit 3aa76a7

Please sign in to comment.