Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bridge-history): incorrect status update and duplicated failed txs #1088

Merged
merged 15 commits into from
Jan 23, 2024
2 changes: 1 addition & 1 deletion bridge-history-api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/gin-contrib/cors v1.5.0
github.com/gin-gonic/gin v1.9.1
github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.4.0
github.com/pressly/goose/v3 v3.16.0
github.com/prometheus/client_golang v1.14.0
github.com/scroll-tech/go-ethereum v1.10.14-0.20231130005111-38a3a9c9198c
Expand Down Expand Up @@ -40,6 +39,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/huin/goupnp v1.3.0 // indirect
Expand Down
73 changes: 28 additions & 45 deletions bridge-history-api/internal/logic/event_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,39 +76,30 @@ func (b *EventUpdateLogic) GetL2MessageSyncedHeightInDB(ctx context.Context) (ui

// L1InsertOrUpdate inserts or updates l1 messages
func (b *EventUpdateLogic) L1InsertOrUpdate(ctx context.Context, l1FetcherResult *L1FilterResult) error {
err := b.db.Transaction(func(tx *gorm.DB) error {
if txErr := b.crossMessageOrm.InsertOrUpdateL1Messages(ctx, l1FetcherResult.DepositMessages, tx); txErr != nil {
log.Error("failed to insert L1 deposit messages", "err", txErr)
return txErr
}

if txErr := b.crossMessageOrm.InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx, l1FetcherResult.RelayedMessages, tx); txErr != nil {
log.Error("failed to update L1 relayed messages of L2 withdrawals", "err", txErr)
return txErr
}

if txErr := b.batchEventOrm.InsertOrUpdateBatchEvents(ctx, l1FetcherResult.BatchEvents, tx); txErr != nil {
log.Error("failed to insert or update batch events", "err", txErr)
return txErr
}
if err := b.crossMessageOrm.InsertOrUpdateL1Messages(ctx, l1FetcherResult.DepositMessages); err != nil {
log.Error("failed to insert L1 deposit messages", "err", err)
return err
}

if txErr := b.crossMessageOrm.UpdateL1MessageQueueEventsInfo(ctx, l1FetcherResult.MessageQueueEvents, tx); txErr != nil {
log.Error("failed to insert L1 message queue events", "err", txErr)
return txErr
}
if err := b.crossMessageOrm.InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx, l1FetcherResult.RelayedMessages); err != nil {
log.Error("failed to update L1 relayed messages of L2 withdrawals", "err", err)
return err
}

if txErr := b.crossMessageOrm.InsertFailedGatewayRouterTxs(ctx, l1FetcherResult.RevertedTxs, tx); txErr != nil {
log.Error("failed to insert L1 failed gateway router transactions", "err", txErr)
return txErr
}
return nil
})
if err := b.batchEventOrm.InsertOrUpdateBatchEvents(ctx, l1FetcherResult.BatchEvents); err != nil {
log.Error("failed to insert or update batch events", "err", err)
return err
}

if err != nil {
log.Error("failed to update db of L1 events", "err", err)
if err := b.crossMessageOrm.UpdateL1MessageQueueEventsInfo(ctx, l1FetcherResult.MessageQueueEvents); err != nil {
log.Error("failed to insert L1 message queue events", "err", err)
return err
}

if err := b.crossMessageOrm.InsertFailedL1GatewayRouterAndL1MessengerTxs(ctx, l1FetcherResult.RevertedTxs); err != nil {
log.Error("failed to insert failed L1 gateway router and L1 messenger transactions", "err", err)
return err
}
return nil
}

Expand Down Expand Up @@ -186,24 +177,16 @@ func (b *EventUpdateLogic) UpdateL1BatchIndexAndStatus(ctx context.Context, heig

// L2InsertOrUpdate inserts or updates L2 messages
func (b *EventUpdateLogic) L2InsertOrUpdate(ctx context.Context, l2FetcherResult *L2FilterResult) error {
err := b.db.Transaction(func(tx *gorm.DB) error {
if txErr := b.crossMessageOrm.InsertOrUpdateL2Messages(ctx, l2FetcherResult.WithdrawMessages, tx); txErr != nil {
log.Error("failed to insert L2 withdrawal messages", "err", txErr)
return txErr
}
if txErr := b.crossMessageOrm.InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx, l2FetcherResult.RelayedMessages, tx); txErr != nil {
log.Error("failed to update L2 relayed messages of L1 deposits", "err", txErr)
return txErr
}
if txErr := b.crossMessageOrm.InsertFailedGatewayRouterTxs(ctx, l2FetcherResult.OtherRevertedTxs, tx); txErr != nil {
log.Error("failed to insert L2 failed gateway router transactions", "err", txErr)
return txErr
}
return nil
})

if err != nil {
log.Error("failed to update db of L2 events", "err", err)
if err := b.crossMessageOrm.InsertOrUpdateL2Messages(ctx, l2FetcherResult.WithdrawMessages); err != nil {
log.Error("failed to insert L2 withdrawal messages", "err", err)
return err
}
if err := b.crossMessageOrm.InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx, l2FetcherResult.RelayedMessages); err != nil {
log.Error("failed to update L2 relayed messages of L1 deposits", "err", err)
return err
}
if err := b.crossMessageOrm.InsertFailedL2GatewayRouterTxs(ctx, l2FetcherResult.OtherRevertedTxs); err != nil {
log.Error("failed to insert failed L2 gateway router transactions", "err", err)
return err
}
return nil
Expand Down
85 changes: 47 additions & 38 deletions bridge-history-api/internal/orm/cross_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"time"

"github.com/google/uuid"
"github.com/scroll-tech/go-ethereum/common"
"gorm.io/gorm"
"gorm.io/gorm/clause"
Expand Down Expand Up @@ -47,9 +46,9 @@ const (
TxStatusTypeSent TxStatusType = iota
TxStatusTypeSentTxReverted // Not track message hash, thus will not be processed again anymore.
TxStatusTypeRelayed // Terminal status.
// FailedRelayedMessage event: encoded tx failed, cannot retry. e.g., https://sepolia.scrollscan.com/tx/0xfc7d3ea5ec8dc9b664a5a886c3b33d21e665355057601033481a439498efb79a
TxStatusTypeFailedRelayed // Terminal status.
// In some cases, user can retry with a larger gas limit. e.g., https://sepolia.scrollscan.com/tx/0x7323a7ba29492cb47d92206411be99b27896f2823cee0633a596b646b73f1b5b
// Retry: this often occurs due to an out of gas (OOG) issue if the transaction was initiated via the frontend.
TxStatusTypeFailedRelayed
// Retry: this often occurs due to an out of gas (OOG) issue if the transaction was initiated via the frontend.
TxStatusTypeRelayTxReverted
TxStatusTypeSkipped
TxStatusTypeDropped // Terminal status.
Expand Down Expand Up @@ -254,18 +253,14 @@ func (c *CrossMessage) GetTxsByAddress(ctx context.Context, sender string) ([]*C
}

// UpdateL1MessageQueueEventsInfo updates the information about L1 message queue events in the database.
func (c *CrossMessage) UpdateL1MessageQueueEventsInfo(ctx context.Context, l1MessageQueueEvents []*MessageQueueEvent, dbTX ...*gorm.DB) error {
func (c *CrossMessage) UpdateL1MessageQueueEventsInfo(ctx context.Context, l1MessageQueueEvents []*MessageQueueEvent) error {
// update tx statuses.
for _, l1MessageQueueEvent := range l1MessageQueueEvents {
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
// do not over-write terminal statuses.
db = db.Where("tx_status != ?", TxStatusTypeRelayed)
db = db.Where("tx_status != ?", TxStatusTypeFailedRelayed)
db = db.Where("tx_status != ?", TxStatusTypeDropped)
txStatusUpdateFields := make(map[string]interface{})
switch l1MessageQueueEvent.EventType {
Expand Down Expand Up @@ -298,9 +293,6 @@ func (c *CrossMessage) UpdateL1MessageQueueEventsInfo(ctx context.Context, l1Mes
// update tx hashes of replay and refund.
for _, l1MessageQueueEvent := range l1MessageQueueEvents {
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
txHashUpdateFields := make(map[string]interface{})
Expand Down Expand Up @@ -362,14 +354,11 @@ func (c *CrossMessage) UpdateBatchIndexRollupStatusMerkleProofOfL2Messages(ctx c
}

// InsertOrUpdateL1Messages inserts or updates a list of L1 cross messages into the database.
func (c *CrossMessage) InsertOrUpdateL1Messages(ctx context.Context, messages []*CrossMessage, dbTX ...*gorm.DB) error {
func (c *CrossMessage) InsertOrUpdateL1Messages(ctx context.Context, messages []*CrossMessage) error {
if len(messages) == 0 {
return nil
}
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
// 'tx_status' column is not explicitly assigned during the update to prevent a later status from being overwritten back to "sent".
Expand All @@ -384,18 +373,14 @@ func (c *CrossMessage) InsertOrUpdateL1Messages(ctx context.Context, messages []
}

// InsertOrUpdateL2Messages inserts or updates a list of L2 cross messages into the database.
func (c *CrossMessage) InsertOrUpdateL2Messages(ctx context.Context, messages []*CrossMessage, dbTX ...*gorm.DB) error {
func (c *CrossMessage) InsertOrUpdateL2Messages(ctx context.Context, messages []*CrossMessage) error {
if len(messages) == 0 {
return nil
}
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
// 'tx_status' column is not explicitly assigned during the update to prevent a later status from being overwritten back to "sent".
// The merkle_proof is updated separately in batch status updates and hence is not included here.
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "message_hash"}},
DoUpdates: clause.AssignmentColumns([]string{"sender", "receiver", "token_type", "l2_block_number", "l2_tx_hash", "l1_token_address", "l2_token_address", "token_ids", "token_amounts", "message_type", "block_timestamp", "message_from", "message_to", "message_value", "message_data", "message_nonce"}),
Expand All @@ -406,31 +391,60 @@ func (c *CrossMessage) InsertOrUpdateL2Messages(ctx context.Context, messages []
return nil
}

// InsertFailedGatewayRouterTxs inserts a list of transactions that failed to interact with the gateway router into the database.
// These failed transactions are only fetched once, so they are inserted without checking for duplicates.
// To resolve unique index confliction, a random UUID will be generated and used as the MessageHash.
func (c *CrossMessage) InsertFailedGatewayRouterTxs(ctx context.Context, messages []*CrossMessage, dbTX ...*gorm.DB) error {
// InsertFailedL2GatewayRouterTxs inserts a list of transactions that failed to interact with the L2 gateway router into the database.
// To resolve unique index confliction, L2 tx hash is used as the MessageHash.
// The OnConflict clause is used to prevent inserting same failed transactions multiple times.
func (c *CrossMessage) InsertFailedL2GatewayRouterTxs(ctx context.Context, messages []*CrossMessage) error {
if len(messages) == 0 {
return nil
}
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]

for _, message := range messages {
message.MessageHash = message.L2TxHash
}

db := c.db
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "message_hash"}},
DoNothing: true,
})

if err := db.Create(&messages).Error; err != nil {
return fmt.Errorf("failed to insert failed gateway router txs, error: %w", err)
}
return nil
}

// InsertFailedL1GatewayRouterAndL1MessengerTxs inserts a list of transactions that failed to interact with the L1 gateway router and L1 messenger into the database.
// To resolve unique index confliction, L1 tx hash is used as the MessageHash.
// The OnConflict clause is used to prevent inserting same failed transactions multiple times.
func (c *CrossMessage) InsertFailedL1GatewayRouterAndL1MessengerTxs(ctx context.Context, messages []*CrossMessage) error {
if len(messages) == 0 {
return nil
}

for _, message := range messages {
message.MessageHash = uuid.New().String()
message.MessageHash = message.L1TxHash
}
if err := db.Create(messages).Error; err != nil {

db := c.db
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "message_hash"}},
DoNothing: true,
})

if err := db.Create(&messages).Error; err != nil {
return fmt.Errorf("failed to insert failed gateway router txs, error: %w", err)
}
return nil
}

// InsertOrUpdateL2RelayedMessagesOfL1Deposits inserts or updates the database with a list of L2 relayed messages related to L1 deposits.
func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.Context, l2RelayedMessages []*CrossMessage, dbTX ...*gorm.DB) error {
func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.Context, l2RelayedMessages []*CrossMessage) error {
if len(l2RelayedMessages) == 0 {
return nil
}
Expand Down Expand Up @@ -459,7 +473,7 @@ func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.C
for _, msg := range mergedL2RelayedMessages {
uniqueL2RelayedMessages = append(uniqueL2RelayedMessages, msg)
}
// Do not update tx status of successfully or failed relayed messages,
// Do not update tx status of "relayed" messages,
// because if a message is handled, the later relayed message tx would be reverted.
// ref: https://github.com/scroll-tech/scroll/blob/v4.3.44/contracts/src/L2/L2ScrollMessenger.sol#L102
// e.g.,
Expand All @@ -476,7 +490,6 @@ func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.C
clause.And(
// do not over-write terminal statuses.
clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeRelayed},
clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeFailedRelayed},
clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeDropped},
),
},
Expand All @@ -489,7 +502,7 @@ func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.C
}

// InsertOrUpdateL1RelayedMessagesOfL2Withdrawals inserts or updates the database with a list of L1 relayed messages related to L2 withdrawals.
func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx context.Context, l1RelayedMessages []*CrossMessage, dbTX ...*gorm.DB) error {
func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx context.Context, l1RelayedMessages []*CrossMessage) error {
if len(l1RelayedMessages) == 0 {
return nil
}
Expand Down Expand Up @@ -519,9 +532,6 @@ func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx contex
uniqueL1RelayedMessages = append(uniqueL1RelayedMessages, msg)
}
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
db = db.Clauses(clause.OnConflict{
Expand All @@ -532,7 +542,6 @@ func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx contex
clause.And(
// do not over-write terminal statuses.
clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeRelayed},
clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeFailedRelayed},
clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeDropped},
),
},
Expand Down
2 changes: 1 addition & 1 deletion common/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"runtime/debug"
)

var tag = "v4.3.55"
var tag = "v4.3.56"

var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {
Expand Down
Loading