diff --git a/bridge-history-api/conf/config.json b/bridge-history-api/conf/config.json index f93044d48e..bcbf6384f2 100644 --- a/bridge-history-api/conf/config.json +++ b/bridge-history-api/conf/config.json @@ -3,8 +3,8 @@ "confirmation": 0, "endpoint": "https://rpc.ankr.com/eth", "startHeight": 18306000, - "blockTime": 10, - "fetchLimit": 30, + "blockTime": 12, + "fetchLimit": 16, "MessengerAddr": "0x6774Bcbd5ceCeF1336b5300fb5186a12DDD8b367", "ETHGatewayAddr": "0x7F2b8C31F88B6006c382775eea88297Ec1e3E905", "WETHGatewayAddr": "0x7AC440cAe8EB6328de4fA621163a792c1EA9D4fE", @@ -23,7 +23,7 @@ "confirmation": 0, "endpoint": "https://rpc.scroll.io", "blockTime": 3, - "fetchLimit": 100, + "fetchLimit": 64, "MessengerAddr": "0x781e90f1c8Fc4611c9b7497C3B47F99Ef6969CbC", "ETHGatewayAddr": "0x6EA73e05AdC79974B931123675ea8F78FfdacDF0", "WETHGatewayAddr": "0x7003E7B7186f0E6601203b99F7B8DECBfA391cf9", diff --git a/bridge-history-api/go.mod b/bridge-history-api/go.mod index b026dd2f1f..15951c2ed3 100644 --- a/bridge-history-api/go.mod +++ b/bridge-history-api/go.mod @@ -6,7 +6,6 @@ require ( github.com/gin-contrib/cors v1.5.0 github.com/gin-gonic/gin v1.9.1 github.com/go-redis/redis/v8 v8.11.5 - github.com/google/uuid v1.4.0 github.com/pressly/goose/v3 v3.16.0 github.com/prometheus/client_golang v1.14.0 github.com/scroll-tech/go-ethereum v1.10.14-0.20231130005111-38a3a9c9198c @@ -40,6 +39,7 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/google/go-cmp v0.6.0 // indirect + github.com/google/uuid v1.4.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/holiman/uint256 v1.2.4 // indirect github.com/huin/goupnp v1.3.0 // indirect diff --git a/bridge-history-api/internal/config/config.go b/bridge-history-api/internal/config/config.go index 4b932c29e2..d52a112ba4 100644 --- a/bridge-history-api/internal/config/config.go +++ b/bridge-history-api/internal/config/config.go @@ -8,11 +8,11 @@ import ( "scroll-tech/common/database" ) -// LayerConfig is the configuration of Layer1/Layer2 -type LayerConfig struct { +// FetcherConfig is the configuration of Layer1 or Layer2 fetcher. +type FetcherConfig struct { Confirmation uint64 `json:"confirmation"` Endpoint string `json:"endpoint"` - StartHeight uint64 `json:"startHeight"` // Can only be configured to contract deployment height, otherwise in the current implementation, the message proof could not be successfully updated. + StartHeight uint64 `json:"startHeight"` // Can only be configured to contract deployment height, message proof should be updated from the very beginning. BlockTime int64 `json:"blockTime"` FetchLimit uint64 `json:"fetchLimit"` MessengerAddr string `json:"MessengerAddr"` @@ -43,8 +43,8 @@ type RedisConfig struct { // Config is the configuration of the bridge history backend type Config struct { - L1 *LayerConfig `json:"L1"` - L2 *LayerConfig `json:"L2"` + L1 *FetcherConfig `json:"L1"` + L2 *FetcherConfig `json:"L2"` DB *database.Config `json:"db"` Redis *RedisConfig `json:"redis"` } diff --git a/bridge-history-api/internal/controller/fetcher/l1_fetcher.go b/bridge-history-api/internal/controller/fetcher/l1_fetcher.go index 9d1125270a..7e30171d18 100644 --- a/bridge-history-api/internal/controller/fetcher/l1_fetcher.go +++ b/bridge-history-api/internal/controller/fetcher/l1_fetcher.go @@ -20,7 +20,7 @@ import ( // L1MessageFetcher fetches cross message events from L1 and saves them to database. type L1MessageFetcher struct { ctx context.Context - cfg *config.LayerConfig + cfg *config.FetcherConfig client *ethclient.Client l1SyncHeight uint64 @@ -35,7 +35,7 @@ type L1MessageFetcher struct { } // NewL1MessageFetcher creates a new L1MessageFetcher instance. -func NewL1MessageFetcher(ctx context.Context, cfg *config.LayerConfig, db *gorm.DB, client *ethclient.Client) *L1MessageFetcher { +func NewL1MessageFetcher(ctx context.Context, cfg *config.FetcherConfig, db *gorm.DB, client *ethclient.Client) *L1MessageFetcher { c := &L1MessageFetcher{ ctx: ctx, cfg: cfg, diff --git a/bridge-history-api/internal/controller/fetcher/l2_fetcher.go b/bridge-history-api/internal/controller/fetcher/l2_fetcher.go index 9a488f131f..79d965e835 100644 --- a/bridge-history-api/internal/controller/fetcher/l2_fetcher.go +++ b/bridge-history-api/internal/controller/fetcher/l2_fetcher.go @@ -20,7 +20,7 @@ import ( // L2MessageFetcher fetches cross message events from L2 and saves them to database. type L2MessageFetcher struct { ctx context.Context - cfg *config.LayerConfig + cfg *config.FetcherConfig db *gorm.DB client *ethclient.Client l2SyncHeight uint64 @@ -35,7 +35,7 @@ type L2MessageFetcher struct { } // NewL2MessageFetcher creates a new L2MessageFetcher instance. -func NewL2MessageFetcher(ctx context.Context, cfg *config.LayerConfig, db *gorm.DB, client *ethclient.Client) *L2MessageFetcher { +func NewL2MessageFetcher(ctx context.Context, cfg *config.FetcherConfig, db *gorm.DB, client *ethclient.Client) *L2MessageFetcher { c := &L2MessageFetcher{ ctx: ctx, cfg: cfg, diff --git a/bridge-history-api/internal/logic/event_update.go b/bridge-history-api/internal/logic/event_update.go index 2ec430a938..a5229feda9 100644 --- a/bridge-history-api/internal/logic/event_update.go +++ b/bridge-history-api/internal/logic/event_update.go @@ -76,39 +76,30 @@ func (b *EventUpdateLogic) GetL2MessageSyncedHeightInDB(ctx context.Context) (ui // L1InsertOrUpdate inserts or updates l1 messages func (b *EventUpdateLogic) L1InsertOrUpdate(ctx context.Context, l1FetcherResult *L1FilterResult) error { - err := b.db.Transaction(func(tx *gorm.DB) error { - if txErr := b.crossMessageOrm.InsertOrUpdateL1Messages(ctx, l1FetcherResult.DepositMessages, tx); txErr != nil { - log.Error("failed to insert L1 deposit messages", "err", txErr) - return txErr - } - - if txErr := b.crossMessageOrm.InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx, l1FetcherResult.RelayedMessages, tx); txErr != nil { - log.Error("failed to update L1 relayed messages of L2 withdrawals", "err", txErr) - return txErr - } - - if txErr := b.batchEventOrm.InsertOrUpdateBatchEvents(ctx, l1FetcherResult.BatchEvents, tx); txErr != nil { - log.Error("failed to insert or update batch events", "err", txErr) - return txErr - } + if err := b.crossMessageOrm.InsertOrUpdateL1Messages(ctx, l1FetcherResult.DepositMessages); err != nil { + log.Error("failed to insert L1 deposit messages", "err", err) + return err + } - if txErr := b.crossMessageOrm.UpdateL1MessageQueueEventsInfo(ctx, l1FetcherResult.MessageQueueEvents, tx); txErr != nil { - log.Error("failed to insert L1 message queue events", "err", txErr) - return txErr - } + if err := b.crossMessageOrm.InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx, l1FetcherResult.RelayedMessages); err != nil { + log.Error("failed to update L1 relayed messages of L2 withdrawals", "err", err) + return err + } - if txErr := b.crossMessageOrm.InsertFailedGatewayRouterTxs(ctx, l1FetcherResult.RevertedTxs, tx); txErr != nil { - log.Error("failed to insert L1 failed gateway router transactions", "err", txErr) - return txErr - } - return nil - }) + if err := b.batchEventOrm.InsertOrUpdateBatchEvents(ctx, l1FetcherResult.BatchEvents); err != nil { + log.Error("failed to insert or update batch events", "err", err) + return err + } - if err != nil { - log.Error("failed to update db of L1 events", "err", err) + if err := b.crossMessageOrm.UpdateL1MessageQueueEventsInfo(ctx, l1FetcherResult.MessageQueueEvents); err != nil { + log.Error("failed to insert L1 message queue events", "err", err) return err } + if err := b.crossMessageOrm.InsertFailedL1GatewayTxs(ctx, l1FetcherResult.RevertedTxs); err != nil { + log.Error("failed to insert failed L1 gateway transactions", "err", err) + return err + } return nil } @@ -186,24 +177,18 @@ func (b *EventUpdateLogic) UpdateL1BatchIndexAndStatus(ctx context.Context, heig // L2InsertOrUpdate inserts or updates L2 messages func (b *EventUpdateLogic) L2InsertOrUpdate(ctx context.Context, l2FetcherResult *L2FilterResult) error { - err := b.db.Transaction(func(tx *gorm.DB) error { - if txErr := b.crossMessageOrm.InsertOrUpdateL2Messages(ctx, l2FetcherResult.WithdrawMessages, tx); txErr != nil { - log.Error("failed to insert L2 withdrawal messages", "err", txErr) - return txErr - } - if txErr := b.crossMessageOrm.InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx, l2FetcherResult.RelayedMessages, tx); txErr != nil { - log.Error("failed to update L2 relayed messages of L1 deposits", "err", txErr) - return txErr - } - if txErr := b.crossMessageOrm.InsertFailedGatewayRouterTxs(ctx, l2FetcherResult.OtherRevertedTxs, tx); txErr != nil { - log.Error("failed to insert L2 failed gateway router transactions", "err", txErr) - return txErr - } - return nil - }) + if err := b.crossMessageOrm.InsertOrUpdateL2Messages(ctx, l2FetcherResult.WithdrawMessages); err != nil { + log.Error("failed to insert L2 withdrawal messages", "err", err) + return err + } - if err != nil { - log.Error("failed to update db of L2 events", "err", err) + if err := b.crossMessageOrm.InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx, l2FetcherResult.RelayedMessages); err != nil { + log.Error("failed to update L2 relayed messages of L1 deposits", "err", err) + return err + } + + if err := b.crossMessageOrm.InsertFailedL2GatewayTxs(ctx, l2FetcherResult.OtherRevertedTxs); err != nil { + log.Error("failed to insert failed L2 gateway transactions", "err", err) return err } return nil diff --git a/bridge-history-api/internal/logic/l1_event_parser.go b/bridge-history-api/internal/logic/l1_event_parser.go index bf5dd1779c..5919176200 100644 --- a/bridge-history-api/internal/logic/l1_event_parser.go +++ b/bridge-history-api/internal/logic/l1_event_parser.go @@ -2,6 +2,7 @@ package logic import ( "context" + "math/big" "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/core/types" @@ -10,21 +11,27 @@ import ( "github.com/scroll-tech/go-ethereum/log" backendabi "scroll-tech/bridge-history-api/abi" + "scroll-tech/bridge-history-api/internal/config" "scroll-tech/bridge-history-api/internal/orm" "scroll-tech/bridge-history-api/internal/utils" ) // L1EventParser the l1 event parser type L1EventParser struct { + cfg *config.FetcherConfig + client *ethclient.Client } // NewL1EventParser creates l1 event parser -func NewL1EventParser() *L1EventParser { - return &L1EventParser{} +func NewL1EventParser(cfg *config.FetcherConfig, client *ethclient.Client) *L1EventParser { + return &L1EventParser{ + cfg: cfg, + client: client, + } } // ParseL1CrossChainEventLogs parses L1 watched cross chain events. -func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimestampsMap map[uint64]uint64) ([]*orm.CrossMessage, []*orm.CrossMessage, error) { +func (e *L1EventParser) ParseL1CrossChainEventLogs(ctx context.Context, logs []types.Log, blockTimestampsMap map[uint64]uint64) ([]*orm.CrossMessage, []*orm.CrossMessage, error) { var l1DepositMessages []*orm.CrossMessage var l1RelayedMessages []*orm.CrossMessage for _, vlog := range logs { @@ -32,7 +39,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest case backendabi.L1DepositETHSig: event := backendabi.ETHMessageEvent{} if err := utils.UnpackLog(backendabi.IL1ETHGatewayABI, &event, "DepositETH", vlog); err != nil { - log.Warn("Failed to unpack DepositETH event", "err", err) + log.Error("Failed to unpack DepositETH event", "err", err) return nil, nil, err } lastMessage := l1DepositMessages[len(l1DepositMessages)-1] @@ -44,7 +51,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest event := backendabi.ERC20MessageEvent{} err := utils.UnpackLog(backendabi.IL1ERC20GatewayABI, &event, "DepositERC20", vlog) if err != nil { - log.Warn("Failed to unpack DepositERC20 event", "err", err) + log.Error("Failed to unpack DepositERC20 event", "err", err) return nil, nil, err } lastMessage := l1DepositMessages[len(l1DepositMessages)-1] @@ -57,7 +64,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest case backendabi.L1DepositERC721Sig: event := backendabi.ERC721MessageEvent{} if err := utils.UnpackLog(backendabi.IL1ERC721GatewayABI, &event, "DepositERC721", vlog); err != nil { - log.Warn("Failed to unpack DepositERC721 event", "err", err) + log.Error("Failed to unpack DepositERC721 event", "err", err) return nil, nil, err } lastMessage := l1DepositMessages[len(l1DepositMessages)-1] @@ -70,7 +77,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest case backendabi.L1BatchDepositERC721Sig: event := backendabi.BatchERC721MessageEvent{} if err := utils.UnpackLog(backendabi.IL1ERC721GatewayABI, &event, "BatchDepositERC721", vlog); err != nil { - log.Warn("Failed to unpack BatchDepositERC721 event", "err", err) + log.Error("Failed to unpack BatchDepositERC721 event", "err", err) return nil, nil, err } lastMessage := l1DepositMessages[len(l1DepositMessages)-1] @@ -83,7 +90,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest case backendabi.L1DepositERC1155Sig: event := backendabi.ERC1155MessageEvent{} if err := utils.UnpackLog(backendabi.IL1ERC1155GatewayABI, &event, "DepositERC1155", vlog); err != nil { - log.Warn("Failed to unpack DepositERC1155 event", "err", err) + log.Error("Failed to unpack DepositERC1155 event", "err", err) return nil, nil, err } lastMessage := l1DepositMessages[len(l1DepositMessages)-1] @@ -97,7 +104,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest case backendabi.L1BatchDepositERC1155Sig: event := backendabi.BatchERC1155MessageEvent{} if err := utils.UnpackLog(backendabi.IL1ERC1155GatewayABI, &event, "BatchDepositERC1155", vlog); err != nil { - log.Warn("Failed to unpack BatchDepositERC1155 event", "err", err) + log.Error("Failed to unpack BatchDepositERC1155 event", "err", err) return nil, nil, err } lastMessage := l1DepositMessages[len(l1DepositMessages)-1] @@ -111,12 +118,17 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest case backendabi.L1SentMessageEventSig: event := backendabi.L1SentMessageEvent{} if err := utils.UnpackLog(backendabi.IL1ScrollMessengerABI, &event, "SentMessage", vlog); err != nil { - log.Warn("Failed to unpack SentMessage event", "err", err) + log.Error("Failed to unpack SentMessage event", "err", err) + return nil, nil, err + } + from, err := getRealFromAddress(ctx, event.Sender, e.client, vlog.TxHash, e.cfg.GatewayRouterAddr) + if err != nil { + log.Error("Failed to get real 'from' address", "err", err) return nil, nil, err } l1DepositMessages = append(l1DepositMessages, &orm.CrossMessage{ L1BlockNumber: vlog.BlockNumber, - Sender: event.Sender.String(), + Sender: from, Receiver: event.Target.String(), TokenType: int(orm.TokenTypeETH), L1TxHash: vlog.TxHash.String(), @@ -130,7 +142,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest case backendabi.L1RelayedMessageEventSig: event := backendabi.L1RelayedMessageEvent{} if err := utils.UnpackLog(backendabi.IL1ScrollMessengerABI, &event, "RelayedMessage", vlog); err != nil { - log.Warn("Failed to unpack RelayedMessage event", "err", err) + log.Error("Failed to unpack RelayedMessage event", "err", err) return nil, nil, err } l1RelayedMessages = append(l1RelayedMessages, &orm.CrossMessage{ @@ -143,7 +155,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest case backendabi.L1FailedRelayedMessageEventSig: event := backendabi.L1FailedRelayedMessageEvent{} if err := utils.UnpackLog(backendabi.IL1ScrollMessengerABI, &event, "FailedRelayedMessage", vlog); err != nil { - log.Warn("Failed to unpack FailedRelayedMessage event", "err", err) + log.Error("Failed to unpack FailedRelayedMessage event", "err", err) return nil, nil, err } l1RelayedMessages = append(l1RelayedMessages, &orm.CrossMessage{ @@ -166,17 +178,17 @@ func (e *L1EventParser) ParseL1BatchEventLogs(ctx context.Context, logs []types. case backendabi.L1CommitBatchEventSig: event := backendabi.L1CommitBatchEvent{} if err := utils.UnpackLog(backendabi.IScrollChainABI, &event, "CommitBatch", vlog); err != nil { - log.Warn("Failed to unpack CommitBatch event", "err", err) + log.Error("Failed to unpack CommitBatch event", "err", err) return nil, err } commitTx, isPending, err := client.TransactionByHash(ctx, vlog.TxHash) if err != nil || isPending { - log.Warn("Failed to get commit Batch tx receipt or the tx is still pending", "err", err) + log.Error("Failed to get commit batch tx or the tx is still pending", "err", err, "isPending", isPending) return nil, err } startBlock, endBlock, err := utils.GetBatchRangeFromCalldata(commitTx.Data()) if err != nil { - log.Warn("Failed to get batch range from calldata", "hash", commitTx.Hash().String(), "height", vlog.BlockNumber) + log.Error("Failed to get batch range from calldata", "hash", commitTx.Hash().String(), "height", vlog.BlockNumber) return nil, err } l1BatchEvents = append(l1BatchEvents, &orm.BatchEvent{ @@ -190,7 +202,7 @@ func (e *L1EventParser) ParseL1BatchEventLogs(ctx context.Context, logs []types. case backendabi.L1RevertBatchEventSig: event := backendabi.L1RevertBatchEvent{} if err := utils.UnpackLog(backendabi.IScrollChainABI, &event, "RevertBatch", vlog); err != nil { - log.Warn("Failed to unpack RevertBatch event", "err", err) + log.Error("Failed to unpack RevertBatch event", "err", err) return nil, err } l1BatchEvents = append(l1BatchEvents, &orm.BatchEvent{ @@ -202,7 +214,7 @@ func (e *L1EventParser) ParseL1BatchEventLogs(ctx context.Context, logs []types. case backendabi.L1FinalizeBatchEventSig: event := backendabi.L1FinalizeBatchEvent{} if err := utils.UnpackLog(backendabi.IScrollChainABI, &event, "FinalizeBatch", vlog); err != nil { - log.Warn("Failed to unpack FinalizeBatch event", "err", err) + log.Error("Failed to unpack FinalizeBatch event", "err", err) return nil, err } l1BatchEvents = append(l1BatchEvents, &orm.BatchEvent{ @@ -229,7 +241,7 @@ func (e *L1EventParser) ParseL1MessageQueueEventLogs(logs []types.Log, l1Deposit case backendabi.L1QueueTransactionEventSig: event := backendabi.L1QueueTransactionEvent{} if err := utils.UnpackLog(backendabi.IL1MessageQueueABI, &event, "QueueTransaction", vlog); err != nil { - log.Warn("Failed to unpack QueueTransaction event", "err", err) + log.Error("Failed to unpack QueueTransaction event", "err", err) return nil, err } messageHash := common.BytesToHash(crypto.Keccak256(event.Data)) @@ -245,7 +257,7 @@ func (e *L1EventParser) ParseL1MessageQueueEventLogs(logs []types.Log, l1Deposit case backendabi.L1DequeueTransactionEventSig: event := backendabi.L1DequeueTransactionEvent{} if err := utils.UnpackLog(backendabi.IL1MessageQueueABI, &event, "DequeueTransaction", vlog); err != nil { - log.Warn("Failed to unpack DequeueTransaction event", "err", err) + log.Error("Failed to unpack DequeueTransaction event", "err", err) return nil, err } skippedIndices := utils.GetSkippedQueueIndices(event.StartIndex.Uint64(), event.SkippedBitmap) @@ -258,7 +270,7 @@ func (e *L1EventParser) ParseL1MessageQueueEventLogs(logs []types.Log, l1Deposit case backendabi.L1DropTransactionEventSig: event := backendabi.L1DropTransactionEvent{} if err := utils.UnpackLog(backendabi.IL1MessageQueueABI, &event, "DropTransaction", vlog); err != nil { - log.Warn("Failed to unpack DropTransaction event", "err", err) + log.Error("Failed to unpack DropTransaction event", "err", err) return nil, err } l1MessageQueueEvents = append(l1MessageQueueEvents, &orm.MessageQueueEvent{ @@ -270,3 +282,27 @@ func (e *L1EventParser) ParseL1MessageQueueEventLogs(logs []types.Log, l1Deposit } return l1MessageQueueEvents, nil } + +func getRealFromAddress(ctx context.Context, eventSender common.Address, client *ethclient.Client, txHash common.Hash, gatewayRouterAddr string) (string, error) { + from := eventSender.String() + if from == gatewayRouterAddr { + tx, isPending, rpcErr := client.TransactionByHash(ctx, txHash) + if rpcErr != nil || isPending { + log.Error("Failed to get transaction or the transaction is still pending", "rpcErr", rpcErr, "isPending", isPending) + return "", rpcErr + } + // Case 1: deposit/withdraw ETH: EOA -> multisig -> gateway router -> messenger. + if tx.To() != nil && (*tx.To()).String() != gatewayRouterAddr { + return (*tx.To()).String(), nil + } + // Case 2: deposit/withdraw ETH: EOA -> gateway router -> messenger. + signer := types.LatestSignerForChainID(new(big.Int).SetUint64(tx.ChainId().Uint64())) + sender, err := signer.Sender(tx) + if err != nil { + log.Error("Get sender failed", "chain id", tx.ChainId().Uint64(), "tx hash", tx.Hash().String(), "err", err) + return "", err + } + return sender.String(), nil + } + return from, nil +} diff --git a/bridge-history-api/internal/logic/l1_fetcher.go b/bridge-history-api/internal/logic/l1_fetcher.go index 49e297002f..d9e42a48df 100644 --- a/bridge-history-api/internal/logic/l1_fetcher.go +++ b/bridge-history-api/internal/logic/l1_fetcher.go @@ -34,9 +34,10 @@ type L1FilterResult struct { // L1FetcherLogic the L1 fetcher logic type L1FetcherLogic struct { - cfg *config.LayerConfig + cfg *config.FetcherConfig client *ethclient.Client addressList []common.Address + gatewayList []common.Address parser *L1EventParser db *gorm.DB crossMessageOrm *orm.CrossMessage @@ -46,7 +47,7 @@ type L1FetcherLogic struct { } // NewL1FetcherLogic creates L1 fetcher logic -func NewL1FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.Client) *L1FetcherLogic { +func NewL1FetcherLogic(cfg *config.FetcherConfig, db *gorm.DB, client *ethclient.Client) *L1FetcherLogic { addressList := []common.Address{ common.HexToAddress(cfg.ETHGatewayAddr), @@ -65,16 +66,34 @@ func NewL1FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.C common.HexToAddress(cfg.MessageQueueAddr), } + gatewayList := []common.Address{ + common.HexToAddress(cfg.ETHGatewayAddr), + + common.HexToAddress(cfg.StandardERC20GatewayAddr), + common.HexToAddress(cfg.CustomERC20GatewayAddr), + common.HexToAddress(cfg.WETHGatewayAddr), + common.HexToAddress(cfg.DAIGatewayAddr), + + common.HexToAddress(cfg.ERC721GatewayAddr), + common.HexToAddress(cfg.ERC1155GatewayAddr), + + common.HexToAddress(cfg.MessengerAddr), + + common.HexToAddress(cfg.GatewayRouterAddr), + } + // Optional erc20 gateways. if common.HexToAddress(cfg.USDCGatewayAddr) != (common.Address{}) { addressList = append(addressList, common.HexToAddress(cfg.USDCGatewayAddr)) + gatewayList = append(gatewayList, common.HexToAddress(cfg.USDCGatewayAddr)) } if common.HexToAddress(cfg.LIDOGatewayAddr) != (common.Address{}) { addressList = append(addressList, common.HexToAddress(cfg.LIDOGatewayAddr)) + gatewayList = append(gatewayList, common.HexToAddress(cfg.LIDOGatewayAddr)) } - log.Info("L1 Fetcher configured with the following address list", "addresses", addressList) + log.Info("L1 Fetcher configured with the following address list", "addresses", addressList, "gateways", gatewayList) f := &L1FetcherLogic{ db: db, @@ -83,7 +102,8 @@ func NewL1FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.C cfg: cfg, client: client, addressList: addressList, - parser: NewL1EventParser(), + gatewayList: gatewayList, + parser: NewL1EventParser(cfg, client), } reg := prometheus.DefaultRegisterer @@ -131,15 +151,9 @@ func (f *L1FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl blockTimestampsMap[block.NumberU64()] = block.Time() for _, tx := range block.Transactions() { - txTo := tx.To() - if txTo == nil { - continue - } - toAddress := txTo.String() - - // GatewayRouter: L1 deposit. + // Gateways: L1 deposit. // Messenger: L1 deposit retry (replayMessage), L1 deposit refund (dropMessage), L2 withdrawal's claim (relayMessageWithProof). - if toAddress != f.cfg.GatewayRouterAddr && toAddress != f.cfg.MessengerAddr { + if !isTransactionToGateway(tx, f.gatewayList) { continue } @@ -233,7 +247,7 @@ func (f *L1FetcherLogic) L1Fetcher(ctx context.Context, from, to uint64, lastBlo return false, 0, common.Hash{}, nil, err } - l1DepositMessages, l1RelayedMessages, err := f.parser.ParseL1CrossChainEventLogs(eventLogs, blockTimestampsMap) + l1DepositMessages, l1RelayedMessages, err := f.parser.ParseL1CrossChainEventLogs(ctx, eventLogs, blockTimestampsMap) if err != nil { log.Error("failed to parse L1 cross chain event logs", "from", from, "to", to, "err", err) return false, 0, common.Hash{}, nil, err diff --git a/bridge-history-api/internal/logic/l2_event_parser.go b/bridge-history-api/internal/logic/l2_event_parser.go index 20df49d13c..69e44a11db 100644 --- a/bridge-history-api/internal/logic/l2_event_parser.go +++ b/bridge-history-api/internal/logic/l2_event_parser.go @@ -1,26 +1,35 @@ package logic import ( + "context" + "github.com/scroll-tech/go-ethereum/common/hexutil" "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/ethclient" "github.com/scroll-tech/go-ethereum/log" backendabi "scroll-tech/bridge-history-api/abi" + "scroll-tech/bridge-history-api/internal/config" "scroll-tech/bridge-history-api/internal/orm" "scroll-tech/bridge-history-api/internal/utils" ) // L2EventParser the L2 event parser type L2EventParser struct { + cfg *config.FetcherConfig + client *ethclient.Client } // NewL2EventParser creates the L2 event parser -func NewL2EventParser() *L2EventParser { - return &L2EventParser{} +func NewL2EventParser(cfg *config.FetcherConfig, client *ethclient.Client) *L2EventParser { + return &L2EventParser{ + cfg: cfg, + client: client, + } } // ParseL2EventLogs parses L2 watched events -func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap map[uint64]uint64) ([]*orm.CrossMessage, []*orm.CrossMessage, error) { +func (e *L2EventParser) ParseL2EventLogs(ctx context.Context, logs []types.Log, blockTimestampsMap map[uint64]uint64) ([]*orm.CrossMessage, []*orm.CrossMessage, error) { var l2WithdrawMessages []*orm.CrossMessage var l2RelayedMessages []*orm.CrossMessage for _, vlog := range logs { @@ -29,7 +38,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma event := backendabi.ETHMessageEvent{} err := utils.UnpackLog(backendabi.IL2ETHGatewayABI, &event, "WithdrawETH", vlog) if err != nil { - log.Warn("Failed to unpack WithdrawETH event", "err", err) + log.Error("Failed to unpack WithdrawETH event", "err", err) return nil, nil, err } lastMessage := l2WithdrawMessages[len(l2WithdrawMessages)-1] @@ -41,7 +50,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma event := backendabi.ERC20MessageEvent{} err := utils.UnpackLog(backendabi.IL2ERC20GatewayABI, &event, "WithdrawERC20", vlog) if err != nil { - log.Warn("Failed to unpack WithdrawERC20 event", "err", err) + log.Error("Failed to unpack WithdrawERC20 event", "err", err) return nil, nil, err } lastMessage := l2WithdrawMessages[len(l2WithdrawMessages)-1] @@ -55,7 +64,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma event := backendabi.ERC721MessageEvent{} err := utils.UnpackLog(backendabi.IL2ERC721GatewayABI, &event, "WithdrawERC721", vlog) if err != nil { - log.Warn("Failed to unpack WithdrawERC721 event", "err", err) + log.Error("Failed to unpack WithdrawERC721 event", "err", err) return nil, nil, err } lastMessage := l2WithdrawMessages[len(l2WithdrawMessages)-1] @@ -69,7 +78,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma event := backendabi.BatchERC721MessageEvent{} err := utils.UnpackLog(backendabi.IL2ERC721GatewayABI, &event, "BatchWithdrawERC721", vlog) if err != nil { - log.Warn("Failed to unpack BatchWithdrawERC721 event", "err", err) + log.Error("Failed to unpack BatchWithdrawERC721 event", "err", err) return nil, nil, err } lastMessage := l2WithdrawMessages[len(l2WithdrawMessages)-1] @@ -83,7 +92,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma event := backendabi.ERC1155MessageEvent{} err := utils.UnpackLog(backendabi.IL2ERC1155GatewayABI, &event, "WithdrawERC1155", vlog) if err != nil { - log.Warn("Failed to unpack WithdrawERC1155 event", "err", err) + log.Error("Failed to unpack WithdrawERC1155 event", "err", err) return nil, nil, err } lastMessage := l2WithdrawMessages[len(l2WithdrawMessages)-1] @@ -98,7 +107,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma event := backendabi.BatchERC1155MessageEvent{} err := utils.UnpackLog(backendabi.IL2ERC1155GatewayABI, &event, "BatchWithdrawERC1155", vlog) if err != nil { - log.Warn("Failed to unpack BatchWithdrawERC1155 event", "err", err) + log.Error("Failed to unpack BatchWithdrawERC1155 event", "err", err) return nil, nil, err } lastMessage := l2WithdrawMessages[len(l2WithdrawMessages)-1] @@ -113,12 +122,17 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma event := backendabi.L2SentMessageEvent{} err := utils.UnpackLog(backendabi.IL2ScrollMessengerABI, &event, "SentMessage", vlog) if err != nil { - log.Warn("Failed to unpack SentMessage event", "err", err) + log.Error("Failed to unpack SentMessage event", "err", err) + return nil, nil, err + } + from, err := getRealFromAddress(ctx, event.Sender, e.client, vlog.TxHash, e.cfg.GatewayRouterAddr) + if err != nil { + log.Error("Failed to get real 'from' address", "err", err) return nil, nil, err } l2WithdrawMessages = append(l2WithdrawMessages, &orm.CrossMessage{ MessageHash: utils.ComputeMessageHash(event.Sender, event.Target, event.Value, event.MessageNonce, event.Message).String(), - Sender: event.Sender.String(), + Sender: from, Receiver: event.Target.String(), TokenType: int(orm.TokenTypeETH), L2TxHash: vlog.TxHash.String(), @@ -137,7 +151,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma event := backendabi.L2RelayedMessageEvent{} err := utils.UnpackLog(backendabi.IL2ScrollMessengerABI, &event, "RelayedMessage", vlog) if err != nil { - log.Warn("Failed to unpack RelayedMessage event", "err", err) + log.Error("Failed to unpack RelayedMessage event", "err", err) return nil, nil, err } l2RelayedMessages = append(l2RelayedMessages, &orm.CrossMessage{ @@ -151,7 +165,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma event := backendabi.L2RelayedMessageEvent{} err := utils.UnpackLog(backendabi.IL2ScrollMessengerABI, &event, "FailedRelayedMessage", vlog) if err != nil { - log.Warn("Failed to unpack FailedRelayedMessage event", "err", err) + log.Error("Failed to unpack FailedRelayedMessage event", "err", err) return nil, nil, err } l2RelayedMessages = append(l2RelayedMessages, &orm.CrossMessage{ diff --git a/bridge-history-api/internal/logic/l2_fetcher.go b/bridge-history-api/internal/logic/l2_fetcher.go index ab1d0d6c04..ec13e70ba6 100644 --- a/bridge-history-api/internal/logic/l2_fetcher.go +++ b/bridge-history-api/internal/logic/l2_fetcher.go @@ -33,9 +33,10 @@ type L2FilterResult struct { // L2FetcherLogic the L2 fetcher logic type L2FetcherLogic struct { - cfg *config.LayerConfig + cfg *config.FetcherConfig client *ethclient.Client addressList []common.Address + gatewayList []common.Address parser *L2EventParser db *gorm.DB crossMessageOrm *orm.CrossMessage @@ -45,7 +46,7 @@ type L2FetcherLogic struct { } // NewL2FetcherLogic create L2 fetcher logic -func NewL2FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.Client) *L2FetcherLogic { +func NewL2FetcherLogic(cfg *config.FetcherConfig, db *gorm.DB, client *ethclient.Client) *L2FetcherLogic { addressList := []common.Address{ common.HexToAddress(cfg.ETHGatewayAddr), @@ -60,16 +61,34 @@ func NewL2FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.C common.HexToAddress(cfg.MessengerAddr), } + gatewayList := []common.Address{ + common.HexToAddress(cfg.ETHGatewayAddr), + + common.HexToAddress(cfg.StandardERC20GatewayAddr), + common.HexToAddress(cfg.CustomERC20GatewayAddr), + common.HexToAddress(cfg.WETHGatewayAddr), + common.HexToAddress(cfg.DAIGatewayAddr), + + common.HexToAddress(cfg.ERC721GatewayAddr), + common.HexToAddress(cfg.ERC1155GatewayAddr), + + common.HexToAddress(cfg.MessengerAddr), + + common.HexToAddress(cfg.GatewayRouterAddr), + } + // Optional erc20 gateways. if common.HexToAddress(cfg.USDCGatewayAddr) != (common.Address{}) { addressList = append(addressList, common.HexToAddress(cfg.USDCGatewayAddr)) + gatewayList = append(gatewayList, common.HexToAddress(cfg.USDCGatewayAddr)) } if common.HexToAddress(cfg.LIDOGatewayAddr) != (common.Address{}) { addressList = append(addressList, common.HexToAddress(cfg.LIDOGatewayAddr)) + gatewayList = append(gatewayList, common.HexToAddress(cfg.USDCGatewayAddr)) } - log.Info("L2 Fetcher configured with the following address list", "addresses", addressList) + log.Info("L2 Fetcher configured with the following address list", "addresses", addressList, "gateways", gatewayList) f := &L2FetcherLogic{ db: db, @@ -78,7 +97,8 @@ func NewL2FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.C cfg: cfg, client: client, addressList: addressList, - parser: NewL2EventParser(), + gatewayList: gatewayList, + parser: NewL2EventParser(cfg, client), } reg := prometheus.DefaultRegisterer @@ -127,42 +147,7 @@ func (f *L2FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl blockTimestampsMap[block.NumberU64()] = block.Time() for _, tx := range block.Transactions() { - txTo := tx.To() - if txTo == nil { - continue - } - toAddress := txTo.String() - - // GatewayRouter: L2 withdrawal. - if toAddress == f.cfg.GatewayRouterAddr { - receipt, receiptErr := f.client.TransactionReceipt(ctx, tx.Hash()) - if receiptErr != nil { - log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", receiptErr) - return nil, nil, nil, receiptErr - } - - // Check if the transaction is failed - if receipt.Status == types.ReceiptStatusFailed { - signer := types.LatestSignerForChainID(new(big.Int).SetUint64(tx.ChainId().Uint64())) - sender, signerErr := signer.Sender(tx) - if signerErr != nil { - log.Error("get sender failed", "chain id", tx.ChainId().Uint64(), "tx hash", tx.Hash().String(), "err", signerErr) - return nil, nil, nil, signerErr - } - - l2RevertedUserTxs = append(l2RevertedUserTxs, &orm.CrossMessage{ - L2TxHash: tx.Hash().String(), - MessageType: int(orm.MessageTypeL2SentMessage), - Sender: sender.String(), - Receiver: (*tx.To()).String(), - L2BlockNumber: receipt.BlockNumber.Uint64(), - BlockTimestamp: block.Time(), - TxStatus: int(orm.TxStatusTypeSentTxReverted), - }) - } - } - - if tx.Type() == types.L1MessageTxType { + if tx.IsL1MessageTx() { receipt, receiptErr := f.client.TransactionReceipt(ctx, tx.Hash()) if receiptErr != nil { log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", receiptErr) @@ -179,6 +164,38 @@ func (f *L2FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl MessageType: int(orm.MessageTypeL1SentMessage), }) } + continue + } + + // Gateways: L2 withdrawal. + if !isTransactionToGateway(tx, f.gatewayList) { + continue + } + + receipt, receiptErr := f.client.TransactionReceipt(ctx, tx.Hash()) + if receiptErr != nil { + log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", receiptErr) + return nil, nil, nil, receiptErr + } + + // Check if the transaction is failed + if receipt.Status == types.ReceiptStatusFailed { + signer := types.LatestSignerForChainID(new(big.Int).SetUint64(tx.ChainId().Uint64())) + sender, signerErr := signer.Sender(tx) + if signerErr != nil { + log.Error("get sender failed", "chain id", tx.ChainId().Uint64(), "tx hash", tx.Hash().String(), "err", signerErr) + return nil, nil, nil, signerErr + } + + l2RevertedUserTxs = append(l2RevertedUserTxs, &orm.CrossMessage{ + L2TxHash: tx.Hash().String(), + MessageType: int(orm.MessageTypeL2SentMessage), + Sender: sender.String(), + Receiver: (*tx.To()).String(), + L2BlockNumber: receipt.BlockNumber.Uint64(), + BlockTimestamp: block.Time(), + TxStatus: int(orm.TxStatusTypeSentTxReverted), + }) } } } @@ -235,7 +252,7 @@ func (f *L2FetcherLogic) L2Fetcher(ctx context.Context, from, to uint64, lastBlo return false, 0, common.Hash{}, nil, err } - l2WithdrawMessages, l2RelayedMessages, err := f.parser.ParseL2EventLogs(eventLogs, blockTimestampsMap) + l2WithdrawMessages, l2RelayedMessages, err := f.parser.ParseL2EventLogs(ctx, eventLogs, blockTimestampsMap) if err != nil { log.Error("failed to parse L2 event logs", "from", from, "to", to, "err", err) return false, 0, common.Hash{}, nil, err @@ -279,3 +296,15 @@ func (f *L2FetcherLogic) updateMetrics(res L2FilterResult) { } } } + +func isTransactionToGateway(tx *types.Transaction, gatewayList []common.Address) bool { + if tx.To() == nil { + return false + } + for _, gateway := range gatewayList { + if *tx.To() == gateway { + return true + } + } + return false +} diff --git a/bridge-history-api/internal/orm/batch_event.go b/bridge-history-api/internal/orm/batch_event.go index d50fcb76a1..67135561f6 100644 --- a/bridge-history-api/internal/orm/batch_event.go +++ b/bridge-history-api/internal/orm/batch_event.go @@ -6,6 +6,7 @@ import ( "time" "gorm.io/gorm" + "gorm.io/gorm/clause" ) // BatchStatusType represents the type of batch status. @@ -89,19 +90,21 @@ func (c *BatchEvent) GetFinalizedBatchesLEBlockHeight(ctx context.Context, block } // InsertOrUpdateBatchEvents inserts a new batch event or updates an existing one based on the BatchStatusType. -func (c *BatchEvent) InsertOrUpdateBatchEvents(ctx context.Context, l1BatchEvents []*BatchEvent, dbTX ...*gorm.DB) error { +func (c *BatchEvent) InsertOrUpdateBatchEvents(ctx context.Context, l1BatchEvents []*BatchEvent) error { for _, l1BatchEvent := range l1BatchEvents { db := c.db - if len(dbTX) > 0 && dbTX[0] != nil { - db = dbTX[0] - } db = db.WithContext(ctx) db = db.Model(&BatchEvent{}) updateFields := make(map[string]interface{}) switch BatchStatusType(l1BatchEvent.BatchStatus) { case BatchStatusTypeCommitted: + // Use the clause to either insert or ignore on conflict + db = db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "batch_hash"}}, + DoNothing: true, + }) if err := db.Create(l1BatchEvent).Error; err != nil { - return fmt.Errorf("failed to insert batch event, error: %w", err) + return fmt.Errorf("failed to insert or ignore batch event, error: %w", err) } case BatchStatusTypeFinalized: db = db.Where("batch_index = ?", l1BatchEvent.BatchIndex) diff --git a/bridge-history-api/internal/orm/cross_message.go b/bridge-history-api/internal/orm/cross_message.go index 27a9f4c889..fbe4c15eb0 100644 --- a/bridge-history-api/internal/orm/cross_message.go +++ b/bridge-history-api/internal/orm/cross_message.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "github.com/google/uuid" "github.com/scroll-tech/go-ethereum/common" "gorm.io/gorm" "gorm.io/gorm/clause" @@ -47,9 +46,9 @@ const ( TxStatusTypeSent TxStatusType = iota TxStatusTypeSentTxReverted // Not track message hash, thus will not be processed again anymore. TxStatusTypeRelayed // Terminal status. - // FailedRelayedMessage event: encoded tx failed, cannot retry. e.g., https://sepolia.scrollscan.com/tx/0xfc7d3ea5ec8dc9b664a5a886c3b33d21e665355057601033481a439498efb79a - TxStatusTypeFailedRelayed // Terminal status. - // In some cases, user can retry with a larger gas limit. e.g., https://sepolia.scrollscan.com/tx/0x7323a7ba29492cb47d92206411be99b27896f2823cee0633a596b646b73f1b5b + // Retry: this often occurs due to an out of gas (OOG) issue if the transaction was initiated via the frontend. + TxStatusTypeFailedRelayed + // Retry: this often occurs due to an out of gas (OOG) issue if the transaction was initiated via the frontend. TxStatusTypeRelayTxReverted TxStatusTypeSkipped TxStatusTypeDropped // Terminal status. @@ -254,18 +253,14 @@ func (c *CrossMessage) GetTxsByAddress(ctx context.Context, sender string) ([]*C } // UpdateL1MessageQueueEventsInfo updates the information about L1 message queue events in the database. -func (c *CrossMessage) UpdateL1MessageQueueEventsInfo(ctx context.Context, l1MessageQueueEvents []*MessageQueueEvent, dbTX ...*gorm.DB) error { +func (c *CrossMessage) UpdateL1MessageQueueEventsInfo(ctx context.Context, l1MessageQueueEvents []*MessageQueueEvent) error { // update tx statuses. for _, l1MessageQueueEvent := range l1MessageQueueEvents { db := c.db - if len(dbTX) > 0 && dbTX[0] != nil { - db = dbTX[0] - } db = db.WithContext(ctx) db = db.Model(&CrossMessage{}) // do not over-write terminal statuses. db = db.Where("tx_status != ?", TxStatusTypeRelayed) - db = db.Where("tx_status != ?", TxStatusTypeFailedRelayed) db = db.Where("tx_status != ?", TxStatusTypeDropped) txStatusUpdateFields := make(map[string]interface{}) switch l1MessageQueueEvent.EventType { @@ -298,13 +293,12 @@ func (c *CrossMessage) UpdateL1MessageQueueEventsInfo(ctx context.Context, l1Mes // update tx hashes of replay and refund. for _, l1MessageQueueEvent := range l1MessageQueueEvents { db := c.db - if len(dbTX) > 0 && dbTX[0] != nil { - db = dbTX[0] - } db = db.WithContext(ctx) db = db.Model(&CrossMessage{}) txHashUpdateFields := make(map[string]interface{}) switch l1MessageQueueEvent.EventType { + case MessageQueueEventTypeDequeueTransaction: + continue case MessageQueueEventTypeQueueTransaction: // only replayMessages or enforced txs (whose message hashes would not be found), sentMessages have been filtered out. db = db.Where("message_hash = ?", l1MessageQueueEvent.MessageHash.String()) @@ -314,11 +308,8 @@ func (c *CrossMessage) UpdateL1MessageQueueEventsInfo(ctx context.Context, l1Mes db = db.Where("message_type = ?", MessageTypeL1SentMessage) txHashUpdateFields["l1_refund_tx_hash"] = l1MessageQueueEvent.TxHash.String() } - // Check if there are fields to update to avoid empty update operation (skip message). - if len(txHashUpdateFields) > 0 { - if err := db.Updates(txHashUpdateFields).Error; err != nil { - return fmt.Errorf("failed to update tx hashes of replay and refund in L1 message queue events info, update fields: %v, error: %w", txHashUpdateFields, err) - } + if err := db.Updates(txHashUpdateFields).Error; err != nil { + return fmt.Errorf("failed to update tx hashes of replay and refund in L1 message queue events info, update fields: %v, error: %w", txHashUpdateFields, err) } } return nil @@ -362,14 +353,11 @@ func (c *CrossMessage) UpdateBatchIndexRollupStatusMerkleProofOfL2Messages(ctx c } // InsertOrUpdateL1Messages inserts or updates a list of L1 cross messages into the database. -func (c *CrossMessage) InsertOrUpdateL1Messages(ctx context.Context, messages []*CrossMessage, dbTX ...*gorm.DB) error { +func (c *CrossMessage) InsertOrUpdateL1Messages(ctx context.Context, messages []*CrossMessage) error { if len(messages) == 0 { return nil } db := c.db - if len(dbTX) > 0 && dbTX[0] != nil { - db = dbTX[0] - } db = db.WithContext(ctx) db = db.Model(&CrossMessage{}) // 'tx_status' column is not explicitly assigned during the update to prevent a later status from being overwritten back to "sent". @@ -384,18 +372,14 @@ func (c *CrossMessage) InsertOrUpdateL1Messages(ctx context.Context, messages [] } // InsertOrUpdateL2Messages inserts or updates a list of L2 cross messages into the database. -func (c *CrossMessage) InsertOrUpdateL2Messages(ctx context.Context, messages []*CrossMessage, dbTX ...*gorm.DB) error { +func (c *CrossMessage) InsertOrUpdateL2Messages(ctx context.Context, messages []*CrossMessage) error { if len(messages) == 0 { return nil } db := c.db - if len(dbTX) > 0 && dbTX[0] != nil { - db = dbTX[0] - } db = db.WithContext(ctx) db = db.Model(&CrossMessage{}) // 'tx_status' column is not explicitly assigned during the update to prevent a later status from being overwritten back to "sent". - // The merkle_proof is updated separately in batch status updates and hence is not included here. db = db.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "message_hash"}}, DoUpdates: clause.AssignmentColumns([]string{"sender", "receiver", "token_type", "l2_block_number", "l2_tx_hash", "l1_token_address", "l2_token_address", "token_ids", "token_amounts", "message_type", "block_timestamp", "message_from", "message_to", "message_value", "message_data", "message_nonce"}), @@ -406,31 +390,60 @@ func (c *CrossMessage) InsertOrUpdateL2Messages(ctx context.Context, messages [] return nil } -// InsertFailedGatewayRouterTxs inserts a list of transactions that failed to interact with the gateway router into the database. -// These failed transactions are only fetched once, so they are inserted without checking for duplicates. -// To resolve unique index confliction, a random UUID will be generated and used as the MessageHash. -func (c *CrossMessage) InsertFailedGatewayRouterTxs(ctx context.Context, messages []*CrossMessage, dbTX ...*gorm.DB) error { +// InsertFailedL2GatewayTxs inserts a list of transactions that failed to interact with the L2 gateways into the database. +// To resolve unique index confliction, L2 tx hash is used as the MessageHash. +// The OnConflict clause is used to prevent inserting same failed transactions multiple times. +func (c *CrossMessage) InsertFailedL2GatewayTxs(ctx context.Context, messages []*CrossMessage) error { if len(messages) == 0 { return nil } - db := c.db - if len(dbTX) > 0 && dbTX[0] != nil { - db = dbTX[0] + + for _, message := range messages { + message.MessageHash = message.L2TxHash } + db := c.db db = db.WithContext(ctx) db = db.Model(&CrossMessage{}) + db = db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "message_hash"}}, + DoNothing: true, + }) + + if err := db.Create(&messages).Error; err != nil { + return fmt.Errorf("failed to insert failed gateway router txs, error: %w", err) + } + return nil +} + +// InsertFailedL1GatewayTxs inserts a list of transactions that failed to interact with the L1 gateways into the database. +// To resolve unique index confliction, L1 tx hash is used as the MessageHash. +// The OnConflict clause is used to prevent inserting same failed transactions multiple times. +func (c *CrossMessage) InsertFailedL1GatewayTxs(ctx context.Context, messages []*CrossMessage) error { + if len(messages) == 0 { + return nil + } + for _, message := range messages { - message.MessageHash = uuid.New().String() + message.MessageHash = message.L1TxHash } - if err := db.Create(messages).Error; err != nil { + + db := c.db + db = db.WithContext(ctx) + db = db.Model(&CrossMessage{}) + db = db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "message_hash"}}, + DoNothing: true, + }) + + if err := db.Create(&messages).Error; err != nil { return fmt.Errorf("failed to insert failed gateway router txs, error: %w", err) } return nil } // InsertOrUpdateL2RelayedMessagesOfL1Deposits inserts or updates the database with a list of L2 relayed messages related to L1 deposits. -func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.Context, l2RelayedMessages []*CrossMessage, dbTX ...*gorm.DB) error { +func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.Context, l2RelayedMessages []*CrossMessage) error { if len(l2RelayedMessages) == 0 { return nil } @@ -459,7 +472,7 @@ func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.C for _, msg := range mergedL2RelayedMessages { uniqueL2RelayedMessages = append(uniqueL2RelayedMessages, msg) } - // Do not update tx status of successfully or failed relayed messages, + // Do not update tx status of successfully relayed messages, // because if a message is handled, the later relayed message tx would be reverted. // ref: https://github.com/scroll-tech/scroll/blob/v4.3.44/contracts/src/L2/L2ScrollMessenger.sol#L102 // e.g., @@ -476,7 +489,6 @@ func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.C clause.And( // do not over-write terminal statuses. clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeRelayed}, - clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeFailedRelayed}, clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeDropped}, ), }, @@ -489,7 +501,7 @@ func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.C } // InsertOrUpdateL1RelayedMessagesOfL2Withdrawals inserts or updates the database with a list of L1 relayed messages related to L2 withdrawals. -func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx context.Context, l1RelayedMessages []*CrossMessage, dbTX ...*gorm.DB) error { +func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx context.Context, l1RelayedMessages []*CrossMessage) error { if len(l1RelayedMessages) == 0 { return nil } @@ -519,9 +531,6 @@ func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx contex uniqueL1RelayedMessages = append(uniqueL1RelayedMessages, msg) } db := c.db - if len(dbTX) > 0 && dbTX[0] != nil { - db = dbTX[0] - } db = db.WithContext(ctx) db = db.Model(&CrossMessage{}) db = db.Clauses(clause.OnConflict{ @@ -532,7 +541,6 @@ func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx contex clause.And( // do not over-write terminal statuses. clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeRelayed}, - clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeFailedRelayed}, clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeDropped}, ), }, diff --git a/bridge-history-api/internal/orm/migrate/migrations/00002_batch_event_v2.sql b/bridge-history-api/internal/orm/migrate/migrations/00002_batch_event_v2.sql index ba42500f32..4ffa4e3d55 100644 --- a/bridge-history-api/internal/orm/migrate/migrations/00002_batch_event_v2.sql +++ b/bridge-history-api/internal/orm/migrate/migrations/00002_batch_event_v2.sql @@ -15,6 +15,7 @@ CREATE TABLE batch_event_v2 deleted_at TIMESTAMP(0) DEFAULT NULL ); +CREATE UNIQUE INDEX IF NOT EXISTS unique_idx_be_batch_hash ON batch_event_v2 (batch_hash); CREATE INDEX IF NOT EXISTS idx_be_l1_block_number ON batch_event_v2 (l1_block_number); CREATE INDEX IF NOT EXISTS idx_be_batch_index ON batch_event_v2 (batch_index); CREATE INDEX IF NOT EXISTS idx_be_batch_index_batch_hash ON batch_event_v2 (batch_index, batch_hash); diff --git a/common/version/version.go b/common/version/version.go index c101276330..e56890807b 100644 --- a/common/version/version.go +++ b/common/version/version.go @@ -5,7 +5,7 @@ import ( "runtime/debug" ) -var tag = "v4.3.55" +var tag = "v4.3.56" var commit = func() string { if info, ok := debug.ReadBuildInfo(); ok {