Skip to content

Commit

Permalink
enhance sender test
Browse files Browse the repository at this point in the history
  • Loading branch information
colinlyguo committed Jan 24, 2024
1 parent 0213777 commit 80a613b
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 31 deletions.
27 changes: 13 additions & 14 deletions rollup/internal/controller/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (s *Sender) checkPendingTransaction() {

transactionsToCheck, err := s.pendingTransactionOrm.GetPendingOrReplacedTransactionsBySenderType(s.ctx, s.senderType, 100)
if err != nil {
log.Error("failed to load pending transactions", "sender meta", s.getSenderMeta(), "error", err)
log.Error("failed to load pending transactions", "sender meta", s.getSenderMeta(), "err", err)
return
}

Expand All @@ -412,24 +412,23 @@ func (s *Sender) checkPendingTransaction() {

for _, txnToCheck := range transactionsToCheck {
tx := new(gethTypes.Transaction)
rlpStream := rlp.NewStream(bytes.NewReader(txnToCheck.RLPEncoding), 0)
if err := tx.DecodeRLP(rlpStream); err != nil {
log.Error("failed to decode RLP", "context ID", txnToCheck.ContextID, "sender meta", s.getSenderMeta(), "error", err)
if err := tx.DecodeRLP(rlp.NewStream(bytes.NewReader(txnToCheck.RLPEncoding), 0)); err != nil {
log.Error("failed to decode RLP", "context ID", txnToCheck.ContextID, "sender meta", s.getSenderMeta(), "err", err)
continue
}

receipt, err := s.client.TransactionReceipt(s.ctx, tx.Hash())
if (err == nil) && (receipt != nil) {
if (err == nil) && (receipt != nil) { // tx confirmed.
if receipt.BlockNumber.Uint64() <= confirmed {
err := s.db.Transaction(func(dbTX *gorm.DB) error {
// Update the status of the transaction to TxStatusConfirmed.
if err := s.pendingTransactionOrm.UpdatePendingTransactionStatusByTxHash(s.ctx, tx.Hash().String(), types.TxStatusConfirmed, dbTX); err != nil {
if err := s.pendingTransactionOrm.UpdatePendingTransactionStatusByTxHash(s.ctx, tx.Hash(), types.TxStatusConfirmed, dbTX); err != nil {
log.Error("failed to update transaction status by tx hash", "hash", tx.Hash().String(), "sender meta", s.getSenderMeta(), "from", s.auth.From.String(), "nonce", tx.Nonce(), "err", err)
return err
}
// Update other transactions with the same nonce and sender address as failed.
if err := s.pendingTransactionOrm.UpdateOtherTransactionsAsFailedByNonce(s.ctx, txnToCheck.SenderAddress, txnToCheck.Nonce, txnToCheck.Hash, dbTX); err != nil {
log.Error("failed to update other transactions as failed by nonce", "senderAddress", txnToCheck.SenderAddress, "nonce", txnToCheck.Nonce, "excludedTxHash", txnToCheck.Hash, "err", err)
if err := s.pendingTransactionOrm.UpdateOtherTransactionsAsFailedByNonce(s.ctx, txnToCheck.SenderAddress, tx.Nonce(), tx.Hash(), dbTX); err != nil {
log.Error("failed to update other transactions as failed by nonce", "senderAddress", txnToCheck.SenderAddress, "nonce", tx.Nonce(), "excludedTxHash", tx.Hash(), "err", err)
return err
}
return nil
Expand All @@ -446,11 +445,11 @@ func (s *Sender) checkPendingTransaction() {
SenderType: s.senderType,
}
}
} else if txnToCheck.Status == types.TxStatusPending && // Only resubmit the last pending transaction of the same ContextID.
} else if txnToCheck.Status == types.TxStatusPending && // Only check the last transaction (status pending) of the same ContextID.
s.config.EscalateBlocks+txnToCheck.SubmitBlockNumber <= blockNumber {
// It's possible that the pending transaction was marked as failed earlier in this loop (e.g., if one of its replacements has already been confirmed).
// Therefore, we fetch the current transaction status again for accuracy before proceeding.
status, err := s.pendingTransactionOrm.GetTxStatusByTxHash(s.ctx, tx.Hash().String())
status, err := s.pendingTransactionOrm.GetTxStatusByTxHash(s.ctx, tx.Hash())
if err != nil {
log.Error("failed to get transaction status by tx hash", "hash", tx.Hash().String(), "err", err)
return
Expand All @@ -464,17 +463,17 @@ func (s *Sender) checkPendingTransaction() {
"hash", tx.Hash().String(),
"from", s.auth.From.String(),
"nonce", tx.Nonce(),
"submit block number", txnToCheck.SubmitBlockNumber,
"current block number", blockNumber,
"configured escalateBlocks", s.config.EscalateBlocks)
"submitBlockNumber", txnToCheck.SubmitBlockNumber,
"currentBlockNumber", blockNumber,
"escalateBlocks", s.config.EscalateBlocks)

if newTx, err := s.resubmitTransaction(s.auth, tx, baseFee); err != nil {
s.metrics.resubmitTransactionFailedTotal.WithLabelValues(s.service, s.name).Inc()
log.Error("failed to resubmit transaction", "context ID", txnToCheck.ContextID, "sender meta", s.getSenderMeta(), "from", s.auth.From.String(), "nonce", newTx.Nonce(), "err", err)
} else {
err := s.db.Transaction(func(dbTX *gorm.DB) error {
// Update the status of the original transaction as replaced, while still checking its confirmation status.
if err := s.pendingTransactionOrm.UpdatePendingTransactionStatusByTxHash(s.ctx, tx.Hash().String(), types.TxStatusReplaced, dbTX); err != nil {
if err := s.pendingTransactionOrm.UpdatePendingTransactionStatusByTxHash(s.ctx, tx.Hash(), types.TxStatusReplaced, dbTX); err != nil {
return fmt.Errorf("failed to update status of transaction with hash %s to TxStatusReplaced, err: %w", tx.Hash().String(), err)
}
// Record the new transaction that has replaced the original one.
Expand Down
38 changes: 32 additions & 6 deletions rollup/internal/controller/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func testCheckPendingTransactionTxConfirmed(t *testing.T) {

cfgCopy := *cfg.L1Config.RelayerConfig.SenderConfig
cfgCopy.TxType = txType
s, err := NewSender(context.Background(), &cfgCopy, privateKey, "test", "test", types.SenderTypeUnknown, db, nil)
s, err := NewSender(context.Background(), &cfgCopy, privateKey, "test", "test", types.SenderTypeCommitBatch, db, nil)
assert.NoError(t, err)

_, err = s.SendTransaction("test", &common.Address{}, big.NewInt(0), nil, 0)
Expand All @@ -366,6 +366,8 @@ func testCheckPendingTransactionTxConfirmed(t *testing.T) {
txs, err := s.pendingTransactionOrm.GetPendingOrReplacedTransactionsBySenderType(context.Background(), s.senderType, 1)
assert.NoError(t, err)
assert.Len(t, txs, 1)
assert.Equal(t, types.TxStatusPending, txs[0].Status)
assert.Equal(t, types.SenderTypeCommitBatch, txs[0].SenderType)

patchGuard := gomonkey.ApplyMethodFunc(s.client, "TransactionReceipt", func(_ context.Context, hash common.Hash) (*gethTypes.Receipt, error) {
return &gethTypes.Receipt{TxHash: hash, BlockNumber: big.NewInt(0), Status: gethTypes.ReceiptStatusSuccessful}, nil
Expand All @@ -392,7 +394,7 @@ func testCheckPendingTransactionResubmitTxConfirmed(t *testing.T) {
cfgCopy := *cfg.L1Config.RelayerConfig.SenderConfig
cfgCopy.TxType = txType
cfgCopy.EscalateBlocks = 0
s, err := NewSender(context.Background(), &cfgCopy, privateKey, "test", "test", types.SenderTypeUnknown, db, nil)
s, err := NewSender(context.Background(), &cfgCopy, privateKey, "test", "test", types.SenderTypeFinalizeBatch, db, nil)
assert.NoError(t, err)

originTxHash, err := s.SendTransaction("test", &common.Address{}, big.NewInt(0), nil, 0)
Expand All @@ -401,6 +403,8 @@ func testCheckPendingTransactionResubmitTxConfirmed(t *testing.T) {
txs, err := s.pendingTransactionOrm.GetPendingOrReplacedTransactionsBySenderType(context.Background(), s.senderType, 1)
assert.NoError(t, err)
assert.Len(t, txs, 1)
assert.Equal(t, types.TxStatusPending, txs[0].Status)
assert.Equal(t, types.SenderTypeFinalizeBatch, txs[0].SenderType)

patchGuard := gomonkey.ApplyMethodFunc(s.client, "TransactionReceipt", func(_ context.Context, hash common.Hash) (*gethTypes.Receipt, error) {
if hash == originTxHash {
Expand All @@ -413,6 +417,16 @@ func testCheckPendingTransactionResubmitTxConfirmed(t *testing.T) {
s.checkPendingTransaction()
assert.NoError(t, err)

status, err := s.pendingTransactionOrm.GetTxStatusByTxHash(context.Background(), originTxHash)
assert.NoError(t, err)
assert.Equal(t, types.TxStatusReplaced, status)

txs, err = s.pendingTransactionOrm.GetPendingOrReplacedTransactionsBySenderType(context.Background(), s.senderType, 2)
assert.NoError(t, err)
assert.Len(t, txs, 2)
assert.Equal(t, types.TxStatusReplaced, txs[0].Status)
assert.Equal(t, types.TxStatusPending, txs[1].Status)

// Check the pending transactions again after attempting to resubmit.
s.checkPendingTransaction()
assert.NoError(t, err)
Expand All @@ -435,21 +449,23 @@ func testCheckPendingTransactionReplacedTxConfirmed(t *testing.T) {
cfgCopy := *cfg.L1Config.RelayerConfig.SenderConfig
cfgCopy.TxType = txType
cfgCopy.EscalateBlocks = 0
s, err := NewSender(context.Background(), &cfgCopy, privateKey, "test", "test", types.SenderTypeUnknown, db, nil)
s, err := NewSender(context.Background(), &cfgCopy, privateKey, "test", "test", types.SenderTypeL1GasOracle, db, nil)
assert.NoError(t, err)

_, err = s.SendTransaction("test", &common.Address{}, big.NewInt(0), nil, 0)
txHash, err := s.SendTransaction("test", &common.Address{}, big.NewInt(0), nil, 0)
assert.NoError(t, err)

txs, err := s.pendingTransactionOrm.GetPendingOrReplacedTransactionsBySenderType(context.Background(), s.senderType, 1)
assert.NoError(t, err)
assert.Len(t, txs, 1)
assert.Equal(t, types.TxStatusPending, txs[0].Status)
assert.Equal(t, types.SenderTypeL1GasOracle, txs[0].SenderType)

patchGuard := gomonkey.ApplyMethodFunc(s.client, "TransactionReceipt", func(_ context.Context, hash common.Hash) (*gethTypes.Receipt, error) {
var status types.TxStatus
status, err = s.pendingTransactionOrm.GetTxStatusByTxHash(context.Background(), hash.Hex())
status, err = s.pendingTransactionOrm.GetTxStatusByTxHash(context.Background(), hash)
if err != nil {
return nil, fmt.Errorf("failed to get transaction status, hash: %s, err: %w", hash.Hex(), err)
return nil, fmt.Errorf("failed to get transaction status, hash: %s, err: %w", hash.String(), err)
}
// If the transaction status is 'replaced', return a successful receipt.
if status == types.TxStatusReplaced {
Expand All @@ -466,6 +482,16 @@ func testCheckPendingTransactionReplacedTxConfirmed(t *testing.T) {
s.checkPendingTransaction()
assert.NoError(t, err)

status, err := s.pendingTransactionOrm.GetTxStatusByTxHash(context.Background(), txHash)
assert.NoError(t, err)
assert.Equal(t, types.TxStatusReplaced, status)

txs, err = s.pendingTransactionOrm.GetPendingOrReplacedTransactionsBySenderType(context.Background(), s.senderType, 2)
assert.NoError(t, err)
assert.Len(t, txs, 2)
assert.Equal(t, types.TxStatusReplaced, txs[0].Status)
assert.Equal(t, types.TxStatusPending, txs[1].Status)

// Check the pending transactions again after attempting to resubmit.
s.checkPendingTransaction()
assert.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions rollup/internal/orm/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func TestTransactionOrm(t *testing.T) {
err = pendingTransactionOrm.InsertPendingTransaction(context.Background(), "test", senderMeta, tx1, 0)
assert.NoError(t, err)

err = pendingTransactionOrm.UpdatePendingTransactionStatusByTxHash(context.Background(), tx0.Hash().String(), types.TxStatusReplaced)
err = pendingTransactionOrm.UpdatePendingTransactionStatusByTxHash(context.Background(), tx0.Hash(), types.TxStatusReplaced)
assert.NoError(t, err)

txs, err := pendingTransactionOrm.GetPendingOrReplacedTransactionsBySenderType(context.Background(), senderMeta.Type, 2)
Expand All @@ -377,21 +377,21 @@ func TestTransactionOrm(t *testing.T) {
assert.Equal(t, senderMeta.Address.String(), txs[1].SenderAddress)
assert.Equal(t, senderMeta.Type, txs[1].SenderType)

err = pendingTransactionOrm.UpdatePendingTransactionStatusByTxHash(context.Background(), tx1.Hash().String(), types.TxStatusConfirmed)
err = pendingTransactionOrm.UpdatePendingTransactionStatusByTxHash(context.Background(), tx1.Hash(), types.TxStatusConfirmed)
assert.NoError(t, err)

txs, err = pendingTransactionOrm.GetPendingOrReplacedTransactionsBySenderType(context.Background(), senderMeta.Type, 2)
assert.NoError(t, err)
assert.Len(t, txs, 1)

err = pendingTransactionOrm.UpdateOtherTransactionsAsFailedByNonce(context.Background(), senderMeta.Address.String(), tx1.Nonce(), tx1.Hash().String())
err = pendingTransactionOrm.UpdateOtherTransactionsAsFailedByNonce(context.Background(), senderMeta.Address.String(), tx1.Nonce(), tx1.Hash())
assert.NoError(t, err)

txs, err = pendingTransactionOrm.GetPendingOrReplacedTransactionsBySenderType(context.Background(), senderMeta.Type, 2)
assert.NoError(t, err)
assert.Len(t, txs, 0)

status, err := pendingTransactionOrm.GetTxStatusByTxHash(context.Background(), tx0.Hash().String())
status, err := pendingTransactionOrm.GetTxStatusByTxHash(context.Background(), tx0.Hash())
assert.NoError(t, err)
assert.Equal(t, types.TxStatusConfirmedFailed, status)
}
14 changes: 7 additions & 7 deletions rollup/internal/orm/pending_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func NewPendingTransaction(db *gorm.DB) *PendingTransaction {
}

// GetTxStatusByTxHash retrieves the status of a transaction by its hash.
func (o *PendingTransaction) GetTxStatusByTxHash(ctx context.Context, hash string) (types.TxStatus, error) {
func (o *PendingTransaction) GetTxStatusByTxHash(ctx context.Context, hash common.Hash) (types.TxStatus, error) {
var status types.TxStatus
db := o.db.WithContext(ctx)
db = db.Model(&PendingTransaction{})
db = db.Select("status")
db = db.Where("hash = ?", hash)
db = db.Where("hash = ?", hash.String())
if err := db.Scan(&status).Error; err != nil {
return types.TxStatusUnknown, fmt.Errorf("failed to get tx status by hash, hash: %v, err: %w", hash, err)
}
Expand Down Expand Up @@ -123,22 +123,22 @@ func (o *PendingTransaction) InsertPendingTransaction(ctx context.Context, conte
}

// UpdatePendingTransactionStatusByTxHash updates the status of a transaction based on the transaction hash.
func (o *PendingTransaction) UpdatePendingTransactionStatusByTxHash(ctx context.Context, hash string, status types.TxStatus, dbTX ...*gorm.DB) error {
func (o *PendingTransaction) UpdatePendingTransactionStatusByTxHash(ctx context.Context, hash common.Hash, status types.TxStatus, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&PendingTransaction{})
db = db.Where("hash = ?", hash)
db = db.Where("hash = ?", hash.String())
if err := db.Update("status", status).Error; err != nil {
return fmt.Errorf("failed to UpdatePendingTransactionStatusByTxHash, txHash: %s, error: %w", hash, err)
}
return nil
}

// UpdateOtherTransactionsAsFailedByNonce updates the status of all transactions to TxStatusConfirmedFailed for a specific nonce and sender address, excluding a specified transaction hash.
func (o *PendingTransaction) UpdateOtherTransactionsAsFailedByNonce(ctx context.Context, senderAddress string, nonce uint64, txHash string, dbTX ...*gorm.DB) error {
func (o *PendingTransaction) UpdateOtherTransactionsAsFailedByNonce(ctx context.Context, senderAddress string, nonce uint64, hash common.Hash, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
Expand All @@ -147,9 +147,9 @@ func (o *PendingTransaction) UpdateOtherTransactionsAsFailedByNonce(ctx context.
db = db.Model(&PendingTransaction{})
db = db.Where("sender_address = ?", senderAddress)
db = db.Where("nonce = ?", nonce)
db = db.Where("hash != ?", txHash)
db = db.Where("hash != ?", hash.String())
if err := db.Update("status", types.TxStatusConfirmedFailed).Error; err != nil {
return fmt.Errorf("failed to update other transactions as failed by nonce, senderAddress: %s, nonce: %d, txHash: %s, error: %w", senderAddress, nonce, txHash, err)
return fmt.Errorf("failed to update other transactions as failed by nonce, senderAddress: %s, nonce: %d, txHash: %s, error: %w", senderAddress, nonce, hash, err)
}
return nil
}

0 comments on commit 80a613b

Please sign in to comment.