Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: misc fixes from negative testing #407

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions oracle/pkg/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,20 +487,20 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx
txns, ok := u.l1BlockCache.Get(blockNum)
if ok {
u.metrics.BlockTxnCacheHits.Inc()
u.logger.Info("cache hit for block transactions", "blockNum", blockNum)
u.logger.Debug("cache hit for block transactions", "blockNum", blockNum)
return txns, nil
}

u.metrics.BlockTxnCacheMisses.Inc()
u.logger.Info("cache miss for block transactions", "blockNum", blockNum)
u.logger.Debug("cache miss for block transactions", "blockNum", blockNum)

block, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum))
if err != nil {
u.logger.Error("failed to get block by number", "blockNum", blockNum, "error", err)
return nil, fmt.Errorf("failed to get block by number: %w", err)
}

u.logger.Info("retrieved block", "blockNum", blockNum, "blockHash", block.Hash().Hex())
u.logger.Debug("retrieved block", "blockNum", blockNum, "blockHash", block.Hash().Hex())

var txnReceipts sync.Map
eg, ctx := errgroup.WithContext(ctx)
Expand All @@ -527,22 +527,22 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx
for _, bucket := range buckets {
eg.Go(func() error {
start := time.Now()
u.logger.Info("requesting batch receipts", "bucketSize", len(bucket))
u.logger.Debug("requesting batch receipts", "bucketSize", len(bucket))
results, err := u.receiptBatcher.BatchReceipts(ctx, bucket)
if err != nil {
u.logger.Error("failed to get batch receipts", "error", err)
return fmt.Errorf("failed to get batch receipts: %w", err)
}
u.metrics.TxnReceiptRequestDuration.Observe(time.Since(start).Seconds())
u.logger.Info("received batch receipts", "duration", time.Since(start).Seconds())
u.logger.Debug("received batch receipts", "duration", time.Since(start).Seconds())
for _, result := range results {
if result.Err != nil {
u.logger.Error("failed to get receipt for txn", "txnHash", result.Receipt.TxHash.Hex(), "error", result.Err)
return fmt.Errorf("failed to get receipt for txn: %s", result.Err)
continue
}

txnReceipts.Store(result.Receipt.TxHash.Hex(), result.Receipt)
u.logger.Info("stored receipt", "txnHash", result.Receipt.TxHash.Hex())
u.logger.Debug("stored receipt", "txnHash", result.Receipt.TxHash.Hex())
}

return nil
Expand All @@ -562,14 +562,14 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx
receipt, ok := txnReceipts.Load(tx.Hex())
if !ok {
u.logger.Error("receipt not found for txn", "txnHash", tx.Hex())
return nil, fmt.Errorf("receipt not found for txn: %s", tx)
continue
}
txnsMap[strings.TrimPrefix(tx.Hex(), "0x")] = TxMetadata{PosInBlock: i, Succeeded: receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful}
u.logger.Info("added txn to map", "txnHash", tx.Hex(), "posInBlock", i, "succeeded", receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful)
u.logger.Debug("added txn to map", "txnHash", tx.Hex(), "posInBlock", i, "succeeded", receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful)
}

_ = u.l1BlockCache.Add(blockNum, txnsMap)
u.logger.Info("added block transactions to cache", "blockNum", blockNum)
u.logger.Debug("added block transactions to cache", "blockNum", blockNum)

return txnsMap, nil
}
Expand All @@ -579,24 +579,28 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx
// (e.g they could be unix or unixMili timestamps)
func (u *Updater) computeDecayPercentage(startTimestamp, endTimestamp, commitTimestamp uint64) int64 {
if startTimestamp >= endTimestamp || startTimestamp > commitTimestamp || endTimestamp <= commitTimestamp {
u.logger.Info("timestamp out of range", "startTimestamp", startTimestamp, "endTimestamp", endTimestamp, "commitTimestamp", commitTimestamp)
u.logger.Debug("timestamp out of range", "startTimestamp", startTimestamp, "endTimestamp", endTimestamp, "commitTimestamp", commitTimestamp)
return 0
}

// Calculate the total time in seconds
totalTime := endTimestamp - startTimestamp
u.logger.Info("totalTime", "totalTime", totalTime)
// Calculate the time passed in seconds
timePassed := commitTimestamp - startTimestamp
u.logger.Info("timePassed", "timePassed", timePassed)
// Calculate the decay percentage
decayPercentage := float64(timePassed) / float64(totalTime)
u.logger.Info("decayPercentage", "decayPercentage", decayPercentage)

decayPercentageRound := int64(math.Round(decayPercentage * 100))
if decayPercentageRound > 100 {
decayPercentageRound = 100
}
u.logger.Info("decayPercentageRound", "decayPercentageRound", decayPercentageRound)
u.logger.Debug("decay information",
"startTimestamp", startTimestamp,
"endTimestamp", endTimestamp,
"commitTimestamp", commitTimestamp,
"totalTime", totalTime,
"timePassed", timePassed,
"decayPercentage", decayPercentage,
)
return decayPercentageRound
}
5 changes: 5 additions & 0 deletions p2p/pkg/rpc/provider/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package providerapi

func (s *Service) ActiveReceivers() int {
return int(s.activeReceivers.Load())
}
8 changes: 8 additions & 0 deletions p2p/pkg/rpc/provider/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/big"
"strings"
"sync"
"sync/atomic"

"github.com/bufbuild/protovalidate-go"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -38,6 +39,7 @@ type Service struct {
optsGetter OptsGetter
metrics *metrics
validator *protovalidate.Validator
activeReceivers atomic.Int32
}

type ProviderRegistryContract interface {
Expand Down Expand Up @@ -89,6 +91,9 @@ func (s *Service) ProcessBid(
ctx context.Context,
bid *preconfpb.Bid,
) (chan ProcessedBidResponse, error) {
if s.activeReceivers.Load() == 0 {
return nil, status.Error(codes.Internal, "no active receivers")
}
var revertingTxnHashes []string
if bid.RevertingTxHashes != "" {
revertingTxnHashes = strings.Split(bid.RevertingTxHashes, ",")
Expand Down Expand Up @@ -139,6 +144,9 @@ func (s *Service) ReceiveBids(
_ *providerapiv1.EmptyMessage,
srv providerapiv1.Provider_ReceiveBidsServer,
) error {
s.activeReceivers.Add(1)
defer s.activeReceivers.Add(-1)

for {
select {
case <-srv.Context().Done():
Expand Down
11 changes: 11 additions & 0 deletions p2p/pkg/rpc/provider/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,17 @@ func TestBidHandling(t *testing.T) {
}
}()

activeReceiverTimeout := time.Now().Add(2 * time.Second)
for {
if svc.ActiveReceivers() > 0 {
break
}
if time.Now().After(activeReceiverTimeout) {
t.Fatalf("timed out waiting for active receivers")
}
time.Sleep(10 * time.Millisecond)
}

sndr, err := client.SendProcessedBids(context.Background())
if err != nil {
t.Fatalf("error sending processed bids: %v", err)
Expand Down
8 changes: 4 additions & 4 deletions x/contracts/events/publisher/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (h *httpPublisher) Start(ctx context.Context, contracts ...common.Address)
case <-ticker.C:
blockNumber, err := h.evmClient.BlockNumber(ctx)
if err != nil {
h.logger.Error("failed to get block number", "error", err)
return
h.logger.Warn("failed to get block number", "error", err)
continue
}

if blockNumber > lastBlock {
Expand All @@ -84,8 +84,8 @@ func (h *httpPublisher) Start(ctx context.Context, contracts ...common.Address)

logs, err := h.evmClient.FilterLogs(ctx, q)
if err != nil {
h.logger.Error("failed to filter logs", "error", err)
return
h.logger.Warn("failed to filter logs", "error", err)
continue
}

for _, logMsg := range logs {
Expand Down
37 changes: 24 additions & 13 deletions x/contracts/txmonitor/eth_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func NewEVMHelperWithLogger(client *ethclient.Client, logger *slog.Logger, contr

// BatchReceipts retrieves multiple receipts for a list of transaction hashes.
func (e *EVMHelperImpl) BatchReceipts(ctx context.Context, txHashes []common.Hash) ([]Result, error) {
e.logger.Debug("Starting BatchReceipts", "txHashes", txHashes)
batch := make([]rpc.BatchElem, len(txHashes))

for i, hash := range txHashes {
Expand Down Expand Up @@ -92,19 +91,31 @@ func (e *EVMHelperImpl) BatchReceipts(ctx context.Context, txHashes []common.Has

// Retry individual failed transactions
for i, receipt := range receipts {
if receipt.Err != nil {
e.logger.Info("Retrying failed transaction", "index", i, "hash", txHashes[i].Hex())
for attempts := 0; attempts < 50; attempts++ {
e.logger.Info("Attempting individual call", "attempt", attempts+1, "hash", txHashes[i].Hex())
err = e.client.Client().CallContext(context.Background(), receipt.Receipt, "eth_getTransactionReceipt", txHashes[i])
if err == nil {
e.logger.Info("Individual call succeeded", "attempt", attempts+1, "hash", txHashes[i].Hex())
receipts[i].Err = nil
break
}
e.logger.Error("Individual call attempt failed", "attempt", attempts+1, "hash", txHashes[i].Hex(), "error", err)
time.Sleep(1 * time.Second)
switch {
case receipt.Err == nil:
continue
case errors.Is(receipt.Err, ethereum.NotFound):
e.logger.Info("Transaction not found", "index", i, "hash", txHashes[i].Hex())
continue
case errors.Is(receipt.Err, rpc.ErrMissingBatchResponse):
e.logger.Info("Missing batch response", "index", i, "hash", txHashes[i].Hex())
case errors.Is(receipt.Err, rpc.ErrNoResult):
e.logger.Info("No result from batch call", "index", i, "hash", txHashes[i].Hex())
default:
e.logger.Error("Unknown error", "index", i, "hash", txHashes[i].Hex(), "error", receipt.Err)
continue
}
e.logger.Info("Retrying failed transaction", "index", i, "hash", txHashes[i].Hex())
for attempts := 0; attempts < 10; attempts++ {
e.logger.Info("Attempting individual call", "attempt", attempts+1, "hash", txHashes[i].Hex())
err = e.client.Client().CallContext(context.Background(), receipt.Receipt, "eth_getTransactionReceipt", txHashes[i])
if err == nil {
e.logger.Info("Individual call succeeded", "attempt", attempts+1, "hash", txHashes[i].Hex())
receipt.Err = nil
break
}
e.logger.Error("Individual call attempt failed", "attempt", attempts+1, "hash", txHashes[i].Hex(), "error", err)
time.Sleep(1 * time.Second)
}
}

Expand Down
Loading