Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
colinlyguo committed Jan 31, 2024
1 parent 19b89d5 commit f70f390
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 36 deletions.
4 changes: 2 additions & 2 deletions rollup/cmd/gas_oracle/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ func action(ctx *cli.Context) error {

l1watcher := watcher.NewL1WatcherClient(ctx.Context, l1client, cfg.L1Config.StartHeight, cfg.L1Config.Confirmations, cfg.L1Config.L1MessageQueueAddress, cfg.L1Config.ScrollChainContractAddress, db, registry)

l1relayer, err := relayer.NewLayer1Relayer(ctx.Context, db, cfg.L1Config.RelayerConfig, registry)
l1relayer, err := relayer.NewLayer1Relayer(ctx.Context, db, cfg.L1Config.RelayerConfig, true, registry)
if err != nil {
log.Crit("failed to create new l1 relayer", "config file", cfgFile, "error", err)
}
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, false /* initGenesis */, registry)
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, false /* initGenesis */, true, registry)
if err != nil {
log.Crit("failed to create new l2 relayer", "config file", cfgFile, "error", err)
}
Expand Down
2 changes: 1 addition & 1 deletion rollup/cmd/rollup_relayer/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func action(ctx *cli.Context) error {
}

initGenesis := ctx.Bool(utils.ImportGenesisFlag.Name)
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, initGenesis, registry)
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, initGenesis, false, registry)
if err != nil {
log.Crit("failed to create l2 relayer", "config file", cfgFile, "error", err)
}
Expand Down
7 changes: 6 additions & 1 deletion rollup/internal/controller/relayer/l1_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,18 @@ type Layer1Relayer struct {
}

// NewLayer1Relayer will return a new instance of Layer1RelayerClient
func NewLayer1Relayer(ctx context.Context, db *gorm.DB, cfg *config.RelayerConfig, reg prometheus.Registerer) (*Layer1Relayer, error) {
func NewLayer1Relayer(ctx context.Context, db *gorm.DB, cfg *config.RelayerConfig, gasOracle bool, reg prometheus.Registerer) (*Layer1Relayer, error) {
gasOracleSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.GasOracleSenderPrivateKey, "l1_relayer", "gas_oracle_sender", types.SenderTypeL1GasOracle, db, reg)
if err != nil {
addr := crypto.PubkeyToAddress(cfg.GasOracleSenderPrivateKey.PublicKey)
return nil, fmt.Errorf("new gas oracle sender failed for address %s, err: %v", addr.Hex(), err)
}

// Currently L1 relayer is only used as gas-oracle.
if gasOracle {
go gasOracleSender.CheckPendingTransactionLoop(ctx)
}

// Ensure test features aren't enabled on the mainnet.
if gasOracleSender.GetChainID() == big.NewInt(1) && cfg.EnableTestEnvBypassFeatures {
return nil, fmt.Errorf("cannot enable test env features in mainnet")
Expand Down
6 changes: 3 additions & 3 deletions rollup/internal/controller/relayer/l1_relayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func setupL1RelayerDB(t *testing.T) *gorm.DB {
func testCreateNewL1Relayer(t *testing.T) {
db := setupL1RelayerDB(t)
defer database.CloseDB(db)
relayer, err := NewLayer1Relayer(context.Background(), db, cfg.L2Config.RelayerConfig, nil)
relayer, err := NewLayer1Relayer(context.Background(), db, cfg.L2Config.RelayerConfig, true, nil)
assert.NoError(t, err)
assert.NotNil(t, relayer)
}
Expand All @@ -56,7 +56,7 @@ func testL1RelayerGasOracleConfirm(t *testing.T) {
l1Cfg := cfg.L1Config
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l1Relayer, err := NewLayer1Relayer(ctx, db, l1Cfg.RelayerConfig, nil)
l1Relayer, err := NewLayer1Relayer(ctx, db, l1Cfg.RelayerConfig, true, nil)
assert.NoError(t, err)

// Simulate message confirmations.
Expand Down Expand Up @@ -88,7 +88,7 @@ func testL1RelayerProcessGasPriceOracle(t *testing.T) {
l1Cfg := cfg.L1Config
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l1Relayer, err := NewLayer1Relayer(ctx, db, l1Cfg.RelayerConfig, nil)
l1Relayer, err := NewLayer1Relayer(ctx, db, l1Cfg.RelayerConfig, true, nil)
assert.NoError(t, err)
assert.NotNil(t, l1Relayer)

Expand Down
9 changes: 8 additions & 1 deletion rollup/internal/controller/relayer/l2_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Layer2Relayer struct {
}

// NewLayer2Relayer will return a new instance of Layer2RelayerClient
func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db *gorm.DB, cfg *config.RelayerConfig, initGenesis bool, reg prometheus.Registerer) (*Layer2Relayer, error) {
func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db *gorm.DB, cfg *config.RelayerConfig, initGenesis bool, gasOracle bool, reg prometheus.Registerer) (*Layer2Relayer, error) {
commitSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.CommitSenderPrivateKey, "l2_relayer", "commit_sender", types.SenderTypeCommitBatch, db, reg)
if err != nil {
addr := crypto.PubkeyToAddress(cfg.CommitSenderPrivateKey.PublicKey)
Expand All @@ -80,6 +80,13 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db *gorm.
return nil, fmt.Errorf("new gas oracle sender failed for address %s, err: %w", addr.Hex(), err)
}

if gasOracle {
go gasOracleSender.CheckPendingTransactionLoop(ctx)
} else {
go commitSender.CheckPendingTransactionLoop(ctx)
go finalizeSender.CheckPendingTransactionLoop(ctx)
}

// Ensure test features aren't enabled on the mainnet.
if commitSender.GetChainID() == big.NewInt(1) && cfg.EnableTestEnvBypassFeatures {
return nil, fmt.Errorf("cannot enable test env features in mainnet")
Expand Down
16 changes: 8 additions & 8 deletions rollup/internal/controller/relayer/l2_relayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func setupL2RelayerDB(t *testing.T) *gorm.DB {
func testCreateNewRelayer(t *testing.T) {
db := setupL2RelayerDB(t)
defer database.CloseDB(db)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, nil)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, false, nil)
assert.NoError(t, err)
assert.NotNil(t, relayer)
}
Expand All @@ -48,7 +48,7 @@ func testL2RelayerProcessPendingBatches(t *testing.T) {
defer database.CloseDB(db)

l2Cfg := cfg.L2Config
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, nil)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, false, nil)
assert.NoError(t, err)

l2BlockOrm := orm.NewL2Block(db)
Expand Down Expand Up @@ -82,7 +82,7 @@ func testL2RelayerProcessCommittedBatches(t *testing.T) {
defer database.CloseDB(db)

l2Cfg := cfg.L2Config
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, nil)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, false, nil)
assert.NoError(t, err)
batchMeta := &types.BatchMeta{
StartChunkIndex: 0,
Expand Down Expand Up @@ -128,7 +128,7 @@ func testL2RelayerFinalizeTimeoutBatches(t *testing.T) {
l2Cfg := cfg.L2Config
l2Cfg.RelayerConfig.EnableTestEnvBypassFeatures = true
l2Cfg.RelayerConfig.FinalizeBatchWithoutProofTimeoutSec = 0
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, nil)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, false, nil)
assert.NoError(t, err)
batchMeta := &types.BatchMeta{
StartChunkIndex: 0,
Expand Down Expand Up @@ -160,7 +160,7 @@ func testL2RelayerCommitConfirm(t *testing.T) {
l2Cfg := cfg.L2Config
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, nil)
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, false, nil)
assert.NoError(t, err)

// Simulate message confirmations.
Expand Down Expand Up @@ -214,7 +214,7 @@ func testL2RelayerFinalizeConfirm(t *testing.T) {
l2Cfg := cfg.L2Config
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, nil)
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, false, nil)
assert.NoError(t, err)

// Simulate message confirmations.
Expand Down Expand Up @@ -287,7 +287,7 @@ func testL2RelayerGasOracleConfirm(t *testing.T) {
l2Cfg := cfg.L2Config
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, nil)
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, false, nil)
assert.NoError(t, err)

// Simulate message confirmations.
Expand Down Expand Up @@ -326,7 +326,7 @@ func testLayer2RelayerProcessGasPriceOracle(t *testing.T) {
db := setupL2RelayerDB(t)
defer database.CloseDB(db)

relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, nil)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, true, nil)
assert.NoError(t, err)
assert.NotNil(t, relayer)

Expand Down
24 changes: 12 additions & 12 deletions rollup/internal/controller/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ func NewSender(ctx context.Context, config *config.SenderConfig, priv *ecdsa.Pri
}
sender.metrics = initSenderMetrics(reg)

go sender.loop(ctx)

return sender, nil
}

Expand Down Expand Up @@ -298,15 +296,15 @@ func (s *Sender) resetNonce(ctx context.Context) {
s.auth.Nonce = big.NewInt(int64(nonce))
}

func (s *Sender) resubmitTransaction(auth *bind.TransactOpts, tx *gethTypes.Transaction, baseFee uint64) (*gethTypes.Transaction, error) {
func (s *Sender) resubmitTransaction(tx *gethTypes.Transaction, baseFee uint64) (*gethTypes.Transaction, error) {
escalateMultipleNum := new(big.Int).SetUint64(s.config.EscalateMultipleNum)
escalateMultipleDen := new(big.Int).SetUint64(s.config.EscalateMultipleDen)
maxGasPrice := new(big.Int).SetUint64(s.config.MaxGasPrice)

txInfo := map[string]interface{}{
"tx_hash": tx.Hash().String(),
"tx_type": s.config.TxType,
"from": auth.From.String(),
"from": s.auth.From.String(),
"nonce": tx.Nonce(),
}

Expand Down Expand Up @@ -434,7 +432,8 @@ func (s *Sender) checkPendingTransaction() {
return nil
})
if err != nil {
log.Error("db transaction failed", "err", err)
log.Error("db transaction failed after receiving confirmation", "err", err)
return
}

// send confirm message
Expand All @@ -456,7 +455,7 @@ func (s *Sender) checkPendingTransaction() {
}
if status == types.TxStatusConfirmedFailed {
log.Warn("transaction already marked as failed, skipping resubmission", "hash", tx.Hash().String())
return
continue
}

log.Info("resubmit transaction",
Expand All @@ -467,7 +466,7 @@ func (s *Sender) checkPendingTransaction() {
"currentBlockNumber", blockNumber,
"escalateBlocks", s.config.EscalateBlocks)

if newTx, err := s.resubmitTransaction(s.auth, tx, baseFee); err != nil {
if newTx, err := s.resubmitTransaction(tx, baseFee); err != nil {
s.metrics.resubmitTransactionFailedTotal.WithLabelValues(s.service, s.name).Inc()
log.Error("failed to resubmit transaction", "context ID", txnToCheck.ContextID, "sender meta", s.getSenderMeta(), "from", s.auth.From.String(), "nonce", newTx.Nonce(), "err", err)
} else {
Expand All @@ -477,21 +476,22 @@ func (s *Sender) checkPendingTransaction() {
return fmt.Errorf("failed to update status of transaction with hash %s to TxStatusReplaced, err: %w", tx.Hash().String(), err)
}
// Record the new transaction that has replaced the original one.
if err := s.pendingTransactionOrm.InsertPendingTransaction(s.ctx, txnToCheck.ContextID, s.getSenderMeta(), newTx, txnToCheck.SubmitBlockNumber, dbTX); err != nil {
return fmt.Errorf("failed to insert new pending transaction with context ID: %s, nonce: %d, hash: %v, err: %w", txnToCheck.ContextID, newTx.Nonce(), newTx.Hash().String(), err)
if err := s.pendingTransactionOrm.InsertPendingTransaction(s.ctx, txnToCheck.ContextID, s.getSenderMeta(), newTx, blockNumber, dbTX); err != nil {
return fmt.Errorf("failed to insert new pending transaction with context ID: %s, nonce: %d, hash: %v, previous block number: %v, current block number: %v, err: %w", txnToCheck.ContextID, newTx.Nonce(), newTx.Hash().String(), txnToCheck.SubmitBlockNumber, blockNumber, err)
}
return nil
})
if err != nil {
log.Error("db transaction failed", "err", err)
log.Error("db transaction failed after resubmitting", "err", err)
return
}
}
}
}
}

// Loop is the main event loop
func (s *Sender) loop(ctx context.Context) {
// CheckPendingTransactionLoop is the main event loop that checks for the confirmation of pending transactions.
func (s *Sender) CheckPendingTransactionLoop(ctx context.Context) {
checkTick := time.NewTicker(time.Duration(s.config.CheckPendingTime) * time.Second)
defer checkTick.Stop()

Expand Down
52 changes: 48 additions & 4 deletions rollup/internal/controller/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func TestSender(t *testing.T) {
t.Run("test check pending transaction tx confirmed", testCheckPendingTransactionTxConfirmed)
t.Run("test check pending transaction resubmit tx confirmed", testCheckPendingTransactionResubmitTxConfirmed)
t.Run("test check pending transaction replaced tx confirmed", testCheckPendingTransactionReplacedTxConfirmed)
t.Run("test check pending transaction multiple times with only one transaction pending", testCheckPendingTransactionTxMultipleTimesWithOnlyOneTxPending)
}

func testNewSender(t *testing.T) {
Expand Down Expand Up @@ -219,7 +220,7 @@ func testResubmitZeroGasPriceTransaction(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, tx)
// Increase at least 1 wei in gas price, gas tip cap and gas fee cap.
_, err = s.resubmitTransaction(s.auth, tx, 0)
_, err = s.resubmitTransaction(tx, 0)
assert.NoError(t, err)
s.Stop()
}
Expand Down Expand Up @@ -278,7 +279,7 @@ func testResubmitNonZeroGasPriceTransaction(t *testing.T) {
tx, err := s.createAndSendTx(feeData, &common.Address{}, big.NewInt(0), nil, nil)
assert.NoError(t, err)
assert.NotNil(t, tx)
_, err = s.resubmitTransaction(s.auth, tx, 0)
_, err = s.resubmitTransaction(tx, 0)
assert.NoError(t, err)
s.Stop()
}
Expand Down Expand Up @@ -306,7 +307,7 @@ func testResubmitUnderpricedTransaction(t *testing.T) {
tx, err := s.createAndSendTx(feeData, &common.Address{}, big.NewInt(0), nil, nil)
assert.NoError(t, err)
assert.NotNil(t, tx)
_, err = s.resubmitTransaction(s.auth, tx, 0)
_, err = s.resubmitTransaction(tx, 0)
assert.Error(t, err, "replacement transaction underpriced")
s.Stop()
}
Expand All @@ -328,7 +329,7 @@ func testResubmitTransactionWithRisingBaseFee(t *testing.T) {
// bump the basefee by 10x
baseFeePerGas *= 10
// resubmit and check that the gas fee has been adjusted accordingly
newTx, err := s.resubmitTransaction(s.auth, tx, baseFeePerGas)
newTx, err := s.resubmitTransaction( tx, baseFeePerGas)
assert.NoError(t, err)

escalateMultipleNum := new(big.Int).SetUint64(s.config.EscalateMultipleNum)
Expand Down Expand Up @@ -504,3 +505,46 @@ func testCheckPendingTransactionReplacedTxConfirmed(t *testing.T) {
patchGuard.Reset()
}
}

func testCheckPendingTransactionTxMultipleTimesWithOnlyOneTxPending(t *testing.T) {
for _, txType := range txTypes {
sqlDB, err := db.DB()
assert.NoError(t, err)
assert.NoError(t, migrate.ResetDB(sqlDB))

cfgCopy := *cfg.L1Config.RelayerConfig.SenderConfig
cfgCopy.TxType = txType
cfgCopy.EscalateBlocks = 0
s, err := NewSender(context.Background(), &cfgCopy, privateKey, "test", "test", types.SenderTypeCommitBatch, db, nil)
assert.NoError(t, err)

_, err = s.SendTransaction("test", &common.Address{}, big.NewInt(0), nil, 0)
assert.NoError(t, err)

txs, err := s.pendingTransactionOrm.GetPendingOrReplacedTransactionsBySenderType(context.Background(), s.senderType, 1)
assert.NoError(t, err)
assert.Len(t, txs, 1)
assert.Equal(t, types.TxStatusPending, txs[0].Status)
assert.Equal(t, types.SenderTypeCommitBatch, txs[0].SenderType)

patchGuard := gomonkey.ApplyMethodFunc(s.client, "TransactionReceipt", func(_ context.Context, hash common.Hash) (*gethTypes.Receipt, error) {
return nil, fmt.Errorf("simulated transaction receipt error")
})

for i := 1; i <= 12; i++ {
s.checkPendingTransaction()
assert.NoError(t, err)

txs, err = s.pendingTransactionOrm.GetPendingOrReplacedTransactionsBySenderType(context.Background(), s.senderType, 100)
assert.NoError(t, err)
assert.Len(t, txs, i+1)
for j := 0; j < i; j++ {
assert.Equal(t, types.TxStatusReplaced, txs[j].Status)
}
assert.Equal(t, types.TxStatusPending, txs[i].Status)
}

s.Stop()
patchGuard.Reset()
}
}
4 changes: 2 additions & 2 deletions rollup/tests/gas_oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func testImportL1GasPrice(t *testing.T) {
l1Cfg := rollupApp.Config.L1Config

// Create L1Relayer
l1Relayer, err := relayer.NewLayer1Relayer(context.Background(), db, l1Cfg.RelayerConfig, nil)
l1Relayer, err := relayer.NewLayer1Relayer(context.Background(), db, l1Cfg.RelayerConfig, true, nil)
assert.NoError(t, err)

// Create L1Watcher
Expand Down Expand Up @@ -67,7 +67,7 @@ func testImportL2GasPrice(t *testing.T) {
prepareContracts(t)

l2Cfg := rollupApp.Config.L2Config
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig, false, nil)
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig, false, true, nil)
assert.NoError(t, err)

// add fake chunk
Expand Down
4 changes: 2 additions & 2 deletions rollup/tests/rollup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func testCommitAndFinalizeGenesisBatch(t *testing.T) {
prepareContracts(t)

l2Cfg := rollupApp.Config.L2Config
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig, true, nil)
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig, true, false, nil)
assert.NoError(t, err)
assert.NotNil(t, l2Relayer)

Expand Down Expand Up @@ -56,7 +56,7 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {

// Create L2Relayer
l2Cfg := rollupApp.Config.L2Config
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig, false, nil)
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig, false, false, nil)
assert.NoError(t, err)

// Create L1Watcher
Expand Down

0 comments on commit f70f390

Please sign in to comment.