Skip to content

Commit

Permalink
monitor failed transaction to all gateways
Browse files Browse the repository at this point in the history
  • Loading branch information
colinlyguo committed Jan 22, 2024
1 parent 7bb2c34 commit f7b669a
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 61 deletions.
8 changes: 4 additions & 4 deletions bridge-history-api/internal/logic/event_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (b *EventUpdateLogic) L1InsertOrUpdate(ctx context.Context, l1FetcherResult
return err
}

if err := b.crossMessageOrm.InsertFailedL1GatewayRouterAndL1MessengerTxs(ctx, l1FetcherResult.RevertedTxs); err != nil {
log.Error("failed to insert failed L1 gateway router and L1 messenger transactions", "err", err)
if err := b.crossMessageOrm.InsertFailedL1GatewayTxs(ctx, l1FetcherResult.RevertedTxs); err != nil {
log.Error("failed to insert failed L1 gateway transactions", "err", err)
return err
}
return nil
Expand Down Expand Up @@ -185,8 +185,8 @@ func (b *EventUpdateLogic) L2InsertOrUpdate(ctx context.Context, l2FetcherResult
log.Error("failed to update L2 relayed messages of L1 deposits", "err", err)
return err
}
if err := b.crossMessageOrm.InsertFailedL2GatewayRouterTxs(ctx, l2FetcherResult.OtherRevertedTxs); err != nil {
log.Error("failed to insert failed L2 gateway router transactions", "err", err)
if err := b.crossMessageOrm.InsertFailedL2GatewayTxs(ctx, l2FetcherResult.OtherRevertedTxs); err != nil {
log.Error("failed to insert failed L2 gateway transactions", "err", err)
return err
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions bridge-history-api/internal/logic/l1_event_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(ctx context.Context, logs []t
}
from := event.Sender.String()
if event.Sender.String() == e.cfg.GatewayRouterAddr {
tx, isPending, err := e.client.TransactionByHash(ctx, vlog.TxHash)
if err != nil || isPending {
log.Warn("Failed to get tx or the tx is still pending", "err", err, "isPending", isPending)
return nil, nil, err
tx, isPending, rpcErr := e.client.TransactionByHash(ctx, vlog.TxHash)
if rpcErr != nil || isPending {
log.Warn("Failed to get tx or the tx is still pending", "rpcErr", rpcErr, "isPending", isPending)
return nil, nil, rpcErr
}
signer := types.LatestSignerForChainID(new(big.Int).SetUint64(tx.ChainId().Uint64()))
sender, senderErr := signer.Sender(tx)
Expand Down
32 changes: 23 additions & 9 deletions bridge-history-api/internal/logic/l1_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type L1FetcherLogic struct {
cfg *config.FetcherConfig
client *ethclient.Client
addressList []common.Address
gatewayList []common.Address
parser *L1EventParser
db *gorm.DB
crossMessageOrm *orm.CrossMessage
Expand Down Expand Up @@ -65,16 +66,34 @@ func NewL1FetcherLogic(cfg *config.FetcherConfig, db *gorm.DB, client *ethclient
common.HexToAddress(cfg.MessageQueueAddr),
}

gatewayList := []common.Address{
common.HexToAddress(cfg.ETHGatewayAddr),

common.HexToAddress(cfg.StandardERC20GatewayAddr),
common.HexToAddress(cfg.CustomERC20GatewayAddr),
common.HexToAddress(cfg.WETHGatewayAddr),
common.HexToAddress(cfg.DAIGatewayAddr),

common.HexToAddress(cfg.ERC721GatewayAddr),
common.HexToAddress(cfg.ERC1155GatewayAddr),

common.HexToAddress(cfg.MessengerAddr),

common.HexToAddress(cfg.GatewayRouterAddr),
}

// Optional erc20 gateways.
if common.HexToAddress(cfg.USDCGatewayAddr) != (common.Address{}) {
addressList = append(addressList, common.HexToAddress(cfg.USDCGatewayAddr))
gatewayList = append(gatewayList, common.HexToAddress(cfg.USDCGatewayAddr))
}

if common.HexToAddress(cfg.LIDOGatewayAddr) != (common.Address{}) {
addressList = append(addressList, common.HexToAddress(cfg.LIDOGatewayAddr))
gatewayList = append(gatewayList, common.HexToAddress(cfg.LIDOGatewayAddr))
}

log.Info("L1 Fetcher configured with the following address list", "addresses", addressList)
log.Info("L1 Fetcher configured with the following address list", "addresses", addressList, "gateways", gatewayList)

f := &L1FetcherLogic{
db: db,
Expand All @@ -83,6 +102,7 @@ func NewL1FetcherLogic(cfg *config.FetcherConfig, db *gorm.DB, client *ethclient
cfg: cfg,
client: client,
addressList: addressList,
gatewayList: gatewayList,
parser: NewL1EventParser(cfg, client),
}

Expand Down Expand Up @@ -131,15 +151,9 @@ func (f *L1FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
blockTimestampsMap[block.NumberU64()] = block.Time()

for _, tx := range block.Transactions() {
txTo := tx.To()
if txTo == nil {
continue
}
toAddress := txTo.String()

// GatewayRouter: L1 deposit.
// Gateways: L1 deposit.
// Messenger: L1 deposit retry (replayMessage), L1 deposit refund (dropMessage), L2 withdrawal's claim (relayMessageWithProof).
if toAddress != f.cfg.GatewayRouterAddr && toAddress != f.cfg.MessengerAddr {
if !isTransactionToGateway(tx, f.gatewayList) {
continue
}

Expand Down
6 changes: 3 additions & 3 deletions bridge-history-api/internal/logic/l2_event_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ func (e *L2EventParser) ParseL2EventLogs(ctx context.Context, logs []types.Log,
}
from := event.Sender.String()
if event.Sender.String() == e.cfg.GatewayRouterAddr {
tx, isPending, err := e.client.TransactionByHash(ctx, vlog.TxHash)
tx, isPending, rpcErr := e.client.TransactionByHash(ctx, vlog.TxHash)
if err != nil || isPending {
log.Warn("Failed to get tx or the tx is still pending", "err", err, "isPending", isPending)
return nil, nil, err
log.Warn("Failed to get tx or the tx is still pending", "rpcErr", rpcErr, "isPending", isPending)
return nil, nil, rpcErr
}
signer := types.LatestSignerForChainID(new(big.Int).SetUint64(tx.ChainId().Uint64()))
sender, senderErr := signer.Sender(tx)
Expand Down
103 changes: 66 additions & 37 deletions bridge-history-api/internal/logic/l2_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type L2FetcherLogic struct {
cfg *config.FetcherConfig
client *ethclient.Client
addressList []common.Address
gatewayList []common.Address
parser *L2EventParser
db *gorm.DB
crossMessageOrm *orm.CrossMessage
Expand All @@ -60,16 +61,34 @@ func NewL2FetcherLogic(cfg *config.FetcherConfig, db *gorm.DB, client *ethclient
common.HexToAddress(cfg.MessengerAddr),
}

gatewayList := []common.Address{
common.HexToAddress(cfg.ETHGatewayAddr),

common.HexToAddress(cfg.StandardERC20GatewayAddr),
common.HexToAddress(cfg.CustomERC20GatewayAddr),
common.HexToAddress(cfg.WETHGatewayAddr),
common.HexToAddress(cfg.DAIGatewayAddr),

common.HexToAddress(cfg.ERC721GatewayAddr),
common.HexToAddress(cfg.ERC1155GatewayAddr),

common.HexToAddress(cfg.MessengerAddr),

common.HexToAddress(cfg.GatewayRouterAddr),
}

// Optional erc20 gateways.
if common.HexToAddress(cfg.USDCGatewayAddr) != (common.Address{}) {
addressList = append(addressList, common.HexToAddress(cfg.USDCGatewayAddr))
gatewayList = append(gatewayList, common.HexToAddress(cfg.USDCGatewayAddr))
}

if common.HexToAddress(cfg.LIDOGatewayAddr) != (common.Address{}) {
addressList = append(addressList, common.HexToAddress(cfg.LIDOGatewayAddr))
gatewayList = append(gatewayList, common.HexToAddress(cfg.USDCGatewayAddr))
}

log.Info("L2 Fetcher configured with the following address list", "addresses", addressList)
log.Info("L2 Fetcher configured with the following address list", "addresses", addressList, "gateways", gatewayList)

f := &L2FetcherLogic{
db: db,
Expand All @@ -78,6 +97,7 @@ func NewL2FetcherLogic(cfg *config.FetcherConfig, db *gorm.DB, client *ethclient
cfg: cfg,
client: client,
addressList: addressList,
gatewayList: gatewayList,
parser: NewL2EventParser(cfg, client),
}

Expand Down Expand Up @@ -127,42 +147,7 @@ func (f *L2FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
blockTimestampsMap[block.NumberU64()] = block.Time()

for _, tx := range block.Transactions() {
txTo := tx.To()
if txTo == nil {
continue
}
toAddress := txTo.String()

// GatewayRouter: L2 withdrawal.
if toAddress == f.cfg.GatewayRouterAddr {
receipt, receiptErr := f.client.TransactionReceipt(ctx, tx.Hash())
if receiptErr != nil {
log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", receiptErr)
return nil, nil, nil, receiptErr
}

// Check if the transaction is failed
if receipt.Status == types.ReceiptStatusFailed {
signer := types.LatestSignerForChainID(new(big.Int).SetUint64(tx.ChainId().Uint64()))
sender, signerErr := signer.Sender(tx)
if signerErr != nil {
log.Error("get sender failed", "chain id", tx.ChainId().Uint64(), "tx hash", tx.Hash().String(), "err", signerErr)
return nil, nil, nil, signerErr
}

l2RevertedUserTxs = append(l2RevertedUserTxs, &orm.CrossMessage{
L2TxHash: tx.Hash().String(),
MessageType: int(orm.MessageTypeL2SentMessage),
Sender: sender.String(),
Receiver: (*tx.To()).String(),
L2BlockNumber: receipt.BlockNumber.Uint64(),
BlockTimestamp: block.Time(),
TxStatus: int(orm.TxStatusTypeSentTxReverted),
})
}
}

if tx.Type() == types.L1MessageTxType {
if tx.IsL1MessageTx() {
receipt, receiptErr := f.client.TransactionReceipt(ctx, tx.Hash())
if receiptErr != nil {
log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", receiptErr)
Expand All @@ -179,6 +164,38 @@ func (f *L2FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
MessageType: int(orm.MessageTypeL1SentMessage),
})
}
continue
}

// Gateways: L2 withdrawal.
if !isTransactionToGateway(tx, f.gatewayList) {
continue
}

receipt, receiptErr := f.client.TransactionReceipt(ctx, tx.Hash())
if receiptErr != nil {
log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", receiptErr)
return nil, nil, nil, receiptErr
}

// Check if the transaction is failed
if receipt.Status == types.ReceiptStatusFailed {
signer := types.LatestSignerForChainID(new(big.Int).SetUint64(tx.ChainId().Uint64()))
sender, signerErr := signer.Sender(tx)
if signerErr != nil {
log.Error("get sender failed", "chain id", tx.ChainId().Uint64(), "tx hash", tx.Hash().String(), "err", signerErr)
return nil, nil, nil, signerErr
}

l2RevertedUserTxs = append(l2RevertedUserTxs, &orm.CrossMessage{
L2TxHash: tx.Hash().String(),
MessageType: int(orm.MessageTypeL2SentMessage),
Sender: sender.String(),
Receiver: (*tx.To()).String(),
L2BlockNumber: receipt.BlockNumber.Uint64(),
BlockTimestamp: block.Time(),
TxStatus: int(orm.TxStatusTypeSentTxReverted),
})
}
}
}
Expand Down Expand Up @@ -279,3 +296,15 @@ func (f *L2FetcherLogic) updateMetrics(res L2FilterResult) {
}
}
}

func isTransactionToGateway(tx *types.Transaction, gatewayList []common.Address) bool {
if tx.To() == nil {
return false
}
for _, gateway := range gatewayList {
if *tx.To() == gateway {
return true
}
}
return false
}
8 changes: 4 additions & 4 deletions bridge-history-api/internal/orm/cross_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,10 @@ func (c *CrossMessage) InsertOrUpdateL2Messages(ctx context.Context, messages []
return nil
}

// InsertFailedL2GatewayRouterTxs inserts a list of transactions that failed to interact with the L2 gateway router into the database.
// InsertFailedL2GatewayTxs inserts a list of transactions that failed to interact with the L2 gateways into the database.
// To resolve unique index confliction, L2 tx hash is used as the MessageHash.
// The OnConflict clause is used to prevent inserting same failed transactions multiple times.
func (c *CrossMessage) InsertFailedL2GatewayRouterTxs(ctx context.Context, messages []*CrossMessage) error {
func (c *CrossMessage) InsertFailedL2GatewayTxs(ctx context.Context, messages []*CrossMessage) error {
if len(messages) == 0 {
return nil
}
Expand All @@ -417,10 +417,10 @@ func (c *CrossMessage) InsertFailedL2GatewayRouterTxs(ctx context.Context, messa
return nil
}

// InsertFailedL1GatewayRouterAndL1MessengerTxs inserts a list of transactions that failed to interact with the L1 gateway router and L1 messenger into the database.
// InsertFailedL1GatewayTxs inserts a list of transactions that failed to interact with the L1 gateways into the database.
// To resolve unique index confliction, L1 tx hash is used as the MessageHash.
// The OnConflict clause is used to prevent inserting same failed transactions multiple times.
func (c *CrossMessage) InsertFailedL1GatewayRouterAndL1MessengerTxs(ctx context.Context, messages []*CrossMessage) error {
func (c *CrossMessage) InsertFailedL1GatewayTxs(ctx context.Context, messages []*CrossMessage) error {
if len(messages) == 0 {
return nil
}
Expand Down

0 comments on commit f7b669a

Please sign in to comment.