Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
colinlyguo committed Jan 31, 2024
1 parent 19b89d5 commit 4124f86
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 54 deletions.
2 changes: 1 addition & 1 deletion common/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"runtime/debug"
)

var tag = "v4.3.60"
var tag = "v4.3.61"

var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {
Expand Down
4 changes: 2 additions & 2 deletions rollup/cmd/gas_oracle/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ func action(ctx *cli.Context) error {

l1watcher := watcher.NewL1WatcherClient(ctx.Context, l1client, cfg.L1Config.StartHeight, cfg.L1Config.Confirmations, cfg.L1Config.L1MessageQueueAddress, cfg.L1Config.ScrollChainContractAddress, db, registry)

l1relayer, err := relayer.NewLayer1Relayer(ctx.Context, db, cfg.L1Config.RelayerConfig, registry)
l1relayer, err := relayer.NewLayer1Relayer(ctx.Context, db, cfg.L1Config.RelayerConfig, true, registry)
if err != nil {
log.Crit("failed to create new l1 relayer", "config file", cfgFile, "error", err)
}
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, false /* initGenesis */, registry)
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, false /* initGenesis */, true, registry)
if err != nil {
log.Crit("failed to create new l2 relayer", "config file", cfgFile, "error", err)
}
Expand Down
2 changes: 1 addition & 1 deletion rollup/cmd/rollup_relayer/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func action(ctx *cli.Context) error {
}

initGenesis := ctx.Bool(utils.ImportGenesisFlag.Name)
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, initGenesis, registry)
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, initGenesis, false, registry)
if err != nil {
log.Crit("failed to create l2 relayer", "config file", cfgFile, "error", err)
}
Expand Down
4 changes: 2 additions & 2 deletions rollup/internal/controller/relayer/l1_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type Layer1Relayer struct {
}

// NewLayer1Relayer will return a new instance of Layer1RelayerClient
func NewLayer1Relayer(ctx context.Context, db *gorm.DB, cfg *config.RelayerConfig, reg prometheus.Registerer) (*Layer1Relayer, error) {
gasOracleSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.GasOracleSenderPrivateKey, "l1_relayer", "gas_oracle_sender", types.SenderTypeL1GasOracle, db, reg)
func NewLayer1Relayer(ctx context.Context, db *gorm.DB, cfg *config.RelayerConfig, gasOracle bool, reg prometheus.Registerer) (*Layer1Relayer, error) {
gasOracleSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.GasOracleSenderPrivateKey, "l1_relayer", "gas_oracle_sender", types.SenderTypeL1GasOracle, db, gasOracle, reg)
if err != nil {
addr := crypto.PubkeyToAddress(cfg.GasOracleSenderPrivateKey.PublicKey)
return nil, fmt.Errorf("new gas oracle sender failed for address %s, err: %v", addr.Hex(), err)
Expand Down
6 changes: 3 additions & 3 deletions rollup/internal/controller/relayer/l1_relayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func setupL1RelayerDB(t *testing.T) *gorm.DB {
func testCreateNewL1Relayer(t *testing.T) {
db := setupL1RelayerDB(t)
defer database.CloseDB(db)
relayer, err := NewLayer1Relayer(context.Background(), db, cfg.L2Config.RelayerConfig, nil)
relayer, err := NewLayer1Relayer(context.Background(), db, cfg.L2Config.RelayerConfig, true, nil)
assert.NoError(t, err)
assert.NotNil(t, relayer)
}
Expand All @@ -56,7 +56,7 @@ func testL1RelayerGasOracleConfirm(t *testing.T) {
l1Cfg := cfg.L1Config
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l1Relayer, err := NewLayer1Relayer(ctx, db, l1Cfg.RelayerConfig, nil)
l1Relayer, err := NewLayer1Relayer(ctx, db, l1Cfg.RelayerConfig, true, nil)
assert.NoError(t, err)

// Simulate message confirmations.
Expand Down Expand Up @@ -88,7 +88,7 @@ func testL1RelayerProcessGasPriceOracle(t *testing.T) {
l1Cfg := cfg.L1Config
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l1Relayer, err := NewLayer1Relayer(ctx, db, l1Cfg.RelayerConfig, nil)
l1Relayer, err := NewLayer1Relayer(ctx, db, l1Cfg.RelayerConfig, true, nil)
assert.NoError(t, err)
assert.NotNil(t, l1Relayer)

Expand Down
8 changes: 4 additions & 4 deletions rollup/internal/controller/relayer/l2_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ type Layer2Relayer struct {
}

// NewLayer2Relayer will return a new instance of Layer2RelayerClient
func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db *gorm.DB, cfg *config.RelayerConfig, initGenesis bool, reg prometheus.Registerer) (*Layer2Relayer, error) {
commitSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.CommitSenderPrivateKey, "l2_relayer", "commit_sender", types.SenderTypeCommitBatch, db, reg)
func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db *gorm.DB, cfg *config.RelayerConfig, initGenesis bool, gasOracle bool, reg prometheus.Registerer) (*Layer2Relayer, error) {
commitSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.CommitSenderPrivateKey, "l2_relayer", "commit_sender", types.SenderTypeCommitBatch, db, !gasOracle, reg)
if err != nil {
addr := crypto.PubkeyToAddress(cfg.CommitSenderPrivateKey.PublicKey)
return nil, fmt.Errorf("new commit sender failed for address %s, err: %w", addr.Hex(), err)
}
finalizeSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.FinalizeSenderPrivateKey, "l2_relayer", "finalize_sender", types.SenderTypeFinalizeBatch, db, reg)
finalizeSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.FinalizeSenderPrivateKey, "l2_relayer", "finalize_sender", types.SenderTypeFinalizeBatch, db, !gasOracle, reg)
if err != nil {
addr := crypto.PubkeyToAddress(cfg.FinalizeSenderPrivateKey.PublicKey)
return nil, fmt.Errorf("new finalize sender failed for address %s, err: %w", addr.Hex(), err)
}

gasOracleSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.GasOracleSenderPrivateKey, "l2_relayer", "gas_oracle_sender", types.SenderTypeL2GasOracle, db, reg)
gasOracleSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.GasOracleSenderPrivateKey, "l2_relayer", "gas_oracle_sender", types.SenderTypeL2GasOracle, db, gasOracle, reg)
if err != nil {
addr := crypto.PubkeyToAddress(cfg.GasOracleSenderPrivateKey.PublicKey)
return nil, fmt.Errorf("new gas oracle sender failed for address %s, err: %w", addr.Hex(), err)
Expand Down
18 changes: 9 additions & 9 deletions rollup/internal/controller/relayer/l2_relayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func setupL2RelayerDB(t *testing.T) *gorm.DB {
func testCreateNewRelayer(t *testing.T) {
db := setupL2RelayerDB(t)
defer database.CloseDB(db)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, nil)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, false, nil)
assert.NoError(t, err)
assert.NotNil(t, relayer)
}
Expand All @@ -48,7 +48,7 @@ func testL2RelayerProcessPendingBatches(t *testing.T) {
defer database.CloseDB(db)

l2Cfg := cfg.L2Config
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, nil)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, false, nil)
assert.NoError(t, err)

l2BlockOrm := orm.NewL2Block(db)
Expand Down Expand Up @@ -82,7 +82,7 @@ func testL2RelayerProcessCommittedBatches(t *testing.T) {
defer database.CloseDB(db)

l2Cfg := cfg.L2Config
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, nil)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, false, nil)
assert.NoError(t, err)
batchMeta := &types.BatchMeta{
StartChunkIndex: 0,
Expand Down Expand Up @@ -128,7 +128,7 @@ func testL2RelayerFinalizeTimeoutBatches(t *testing.T) {
l2Cfg := cfg.L2Config
l2Cfg.RelayerConfig.EnableTestEnvBypassFeatures = true
l2Cfg.RelayerConfig.FinalizeBatchWithoutProofTimeoutSec = 0
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, nil)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, false, nil)
assert.NoError(t, err)
batchMeta := &types.BatchMeta{
StartChunkIndex: 0,
Expand Down Expand Up @@ -160,7 +160,7 @@ func testL2RelayerCommitConfirm(t *testing.T) {
l2Cfg := cfg.L2Config
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, nil)
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, false, nil)
assert.NoError(t, err)

// Simulate message confirmations.
Expand Down Expand Up @@ -214,7 +214,7 @@ func testL2RelayerFinalizeConfirm(t *testing.T) {
l2Cfg := cfg.L2Config
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, nil)
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, false, nil)
assert.NoError(t, err)

// Simulate message confirmations.
Expand Down Expand Up @@ -287,7 +287,7 @@ func testL2RelayerGasOracleConfirm(t *testing.T) {
l2Cfg := cfg.L2Config
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, nil)
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, false, nil)
assert.NoError(t, err)

// Simulate message confirmations.
Expand Down Expand Up @@ -326,7 +326,7 @@ func testLayer2RelayerProcessGasPriceOracle(t *testing.T) {
db := setupL2RelayerDB(t)
defer database.CloseDB(db)

relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, nil)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, true, nil)
assert.NoError(t, err)
assert.NotNil(t, relayer)

Expand Down Expand Up @@ -439,7 +439,7 @@ func testGetBatchStatusByIndex(t *testing.T) {
assert.NoError(t, err)

cfg.L2Config.RelayerConfig.ChainMonitor.Enabled = true
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, nil)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, false, nil)
assert.NoError(t, err)
assert.NotNil(t, relayer)

Expand Down
28 changes: 17 additions & 11 deletions rollup/internal/controller/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Sender struct {
}

// NewSender returns a new instance of transaction sender
func NewSender(ctx context.Context, config *config.SenderConfig, priv *ecdsa.PrivateKey, service, name string, senderType types.SenderType, db *gorm.DB, reg prometheus.Registerer) (*Sender, error) {
func NewSender(ctx context.Context, config *config.SenderConfig, priv *ecdsa.PrivateKey, service, name string, senderType types.SenderType, db *gorm.DB, enableSender bool, reg prometheus.Registerer) (*Sender, error) {
if config.EscalateMultipleNum <= config.EscalateMultipleDen {
return nil, fmt.Errorf("invalid params, EscalateMultipleNum; %v, EscalateMultipleDen: %v", config.EscalateMultipleNum, config.EscalateMultipleDen)
}
Expand Down Expand Up @@ -126,7 +126,9 @@ func NewSender(ctx context.Context, config *config.SenderConfig, priv *ecdsa.Pri
}
sender.metrics = initSenderMetrics(reg)

go sender.loop(ctx)
if enableSender {
go sender.loop(ctx)
}

return sender, nil
}
Expand Down Expand Up @@ -298,15 +300,15 @@ func (s *Sender) resetNonce(ctx context.Context) {
s.auth.Nonce = big.NewInt(int64(nonce))
}

func (s *Sender) resubmitTransaction(auth *bind.TransactOpts, tx *gethTypes.Transaction, baseFee uint64) (*gethTypes.Transaction, error) {
func (s *Sender) resubmitTransaction(tx *gethTypes.Transaction, baseFee uint64) (*gethTypes.Transaction, error) {
escalateMultipleNum := new(big.Int).SetUint64(s.config.EscalateMultipleNum)
escalateMultipleDen := new(big.Int).SetUint64(s.config.EscalateMultipleDen)
maxGasPrice := new(big.Int).SetUint64(s.config.MaxGasPrice)

txInfo := map[string]interface{}{
"tx_hash": tx.Hash().String(),
"tx_type": s.config.TxType,
"from": auth.From.String(),
"from": s.auth.From.String(),
"nonce": tx.Nonce(),
}

Expand Down Expand Up @@ -375,7 +377,7 @@ func (s *Sender) resubmitTransaction(auth *bind.TransactOpts, tx *gethTypes.Tran
txInfo["adjusted_gas_fee_cap"] = gasFeeCap.Uint64()
}

log.Info("Transaction gas adjustment details", "txInfo", txInfo)
log.Info("Transaction gas adjustment details", "service", s.service, "name", s.name, "txInfo", txInfo)

nonce := tx.Nonce()
s.metrics.resubmitTransactionTotal.WithLabelValues(s.service, s.name).Inc()
Expand Down Expand Up @@ -434,7 +436,8 @@ func (s *Sender) checkPendingTransaction() {
return nil
})
if err != nil {
log.Error("db transaction failed", "err", err)
log.Error("db transaction failed after receiving confirmation", "err", err)
return
}

// send confirm message
Expand All @@ -456,18 +459,20 @@ func (s *Sender) checkPendingTransaction() {
}
if status == types.TxStatusConfirmedFailed {
log.Warn("transaction already marked as failed, skipping resubmission", "hash", tx.Hash().String())
return
continue
}

log.Info("resubmit transaction",
"service", s.service,
"name", s.name,
"hash", tx.Hash().String(),
"from", s.auth.From.String(),
"nonce", tx.Nonce(),
"submitBlockNumber", txnToCheck.SubmitBlockNumber,
"currentBlockNumber", blockNumber,
"escalateBlocks", s.config.EscalateBlocks)

if newTx, err := s.resubmitTransaction(s.auth, tx, baseFee); err != nil {
if newTx, err := s.resubmitTransaction(tx, baseFee); err != nil {
s.metrics.resubmitTransactionFailedTotal.WithLabelValues(s.service, s.name).Inc()
log.Error("failed to resubmit transaction", "context ID", txnToCheck.ContextID, "sender meta", s.getSenderMeta(), "from", s.auth.From.String(), "nonce", newTx.Nonce(), "err", err)
} else {
Expand All @@ -477,13 +482,14 @@ func (s *Sender) checkPendingTransaction() {
return fmt.Errorf("failed to update status of transaction with hash %s to TxStatusReplaced, err: %w", tx.Hash().String(), err)
}
// Record the new transaction that has replaced the original one.
if err := s.pendingTransactionOrm.InsertPendingTransaction(s.ctx, txnToCheck.ContextID, s.getSenderMeta(), newTx, txnToCheck.SubmitBlockNumber, dbTX); err != nil {
return fmt.Errorf("failed to insert new pending transaction with context ID: %s, nonce: %d, hash: %v, err: %w", txnToCheck.ContextID, newTx.Nonce(), newTx.Hash().String(), err)
if err := s.pendingTransactionOrm.InsertPendingTransaction(s.ctx, txnToCheck.ContextID, s.getSenderMeta(), newTx, blockNumber, dbTX); err != nil {
return fmt.Errorf("failed to insert new pending transaction with context ID: %s, nonce: %d, hash: %v, previous block number: %v, current block number: %v, err: %w", txnToCheck.ContextID, newTx.Nonce(), newTx.Hash().String(), txnToCheck.SubmitBlockNumber, blockNumber, err)
}
return nil
})
if err != nil {
log.Error("db transaction failed", "err", err)
log.Error("db transaction failed after resubmitting", "err", err)
return
}
}
}
Expand Down
Loading

0 comments on commit 4124f86

Please sign in to comment.