Skip to content

Commit

Permalink
remove unused watcher code to increase coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
colinlyguo committed Feb 4, 2024
1 parent 80afd4e commit 341d5be
Show file tree
Hide file tree
Showing 8 changed files with 8 additions and 318 deletions.
12 changes: 0 additions & 12 deletions rollup/cmd/event_watcher/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,27 +67,15 @@ func action(ctx *cli.Context) error {
log.Crit("failed to connect l1 geth", "config file", cfgFile, "error", err)
}

l2client, err := ethclient.Dial(cfg.L2Config.Endpoint)
if err != nil {
log.Crit("failed to connect l2 geth", "config file", cfgFile, "error", err)
}

l1watcher := watcher.NewL1WatcherClient(ctx.Context, l1client, cfg.L1Config.StartHeight, cfg.L1Config.Confirmations,
cfg.L1Config.L1MessageQueueAddress, cfg.L1Config.ScrollChainContractAddress, db, registry)

l2watcher := watcher.NewL2WatcherClient(ctx.Context, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessengerAddress,
cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, db, registry)

go utils.Loop(subCtx, 10*time.Second, func() {
if loopErr := l1watcher.FetchContractEvent(); loopErr != nil {
log.Error("Failed to fetch bridge contract", "err", loopErr)
}
})

// Start l2 watcher process
go utils.Loop(subCtx, 2*time.Second, l2watcher.FetchContractEvent)
// Finish start all l2 functions

log.Info("Start event-watcher successfully")

// Catch CTRL-C to ensure a graceful shutdown.
Expand Down
3 changes: 1 addition & 2 deletions rollup/cmd/rollup_relayer/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ func action(ctx *cli.Context) error {
log.Crit("failed to create batchProposer", "config file", cfgFile, "error", err)
}

l2watcher := watcher.NewL2WatcherClient(subCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessengerAddress,
cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, db, registry)
l2watcher := watcher.NewL2WatcherClient(subCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, db, registry)

// Watcher loop to fetch missing blocks
go utils.LoopWithContext(subCtx, 2*time.Second, func(ctx context.Context) {
Expand Down
1 change: 0 additions & 1 deletion rollup/conf/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
"l2_config": {
"confirmations": "0x1",
"endpoint": "https://rpc.scroll.io",
"l2_messenger_address": "0x0000000000000000000000000000000000000000",
"l2_message_queue_address": "0x0000000000000000000000000000000000000000",
"relayer_config": {
"rollup_contract_address": "0x0000000000000000000000000000000000000000",
Expand Down
2 changes: 0 additions & 2 deletions rollup/internal/config/l2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ type L2Config struct {
Confirmations rpc.BlockNumber `json:"confirmations"`
// l2geth node url.
Endpoint string `json:"endpoint"`
// The messenger contract address deployed on layer 2 chain.
L2MessengerAddress common.Address `json:"l2_messenger_address"`
// The L2MessageQueue contract address deployed on layer 2 chain.
L2MessageQueueAddress common.Address `json:"l2_message_queue_address"`
// The WithdrawTrieRootSlot in L2MessageQueue contract.
Expand Down
8 changes: 0 additions & 8 deletions rollup/internal/controller/watcher/common.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
package watcher

import "github.com/scroll-tech/go-ethereum/common"

const contractEventsBlocksFetchLimit = int64(10)

type relayedMessage struct {
msgHash common.Hash
txHash common.Hash
isSuccessful bool
}
168 changes: 5 additions & 163 deletions rollup/internal/controller/watcher/l2_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,17 @@ import (
"math/big"

"github.com/prometheus/client_golang/prometheus"
geth "github.com/scroll-tech/go-ethereum"
"github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/common"
gethTypes "github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/event"
"github.com/scroll-tech/go-ethereum/log"
rollupTypes "github.com/scroll-tech/go-ethereum/rollup/types"
"github.com/scroll-tech/go-ethereum/rpc"
"gorm.io/gorm"

"scroll-tech/common/types"

bridgeAbi "scroll-tech/rollup/abi"
"scroll-tech/rollup/internal/orm"
"scroll-tech/rollup/internal/utils"
)

// L2WatcherClient provide APIs which support others to subscribe to various event from l2geth
Expand All @@ -31,65 +26,33 @@ type L2WatcherClient struct {

*ethclient.Client

l2BlockOrm *orm.L2Block
l1MessageOrm *orm.L1Message
l2BlockOrm *orm.L2Block

confirmations rpc.BlockNumber

messengerAddress common.Address
messengerABI *abi.ABI

messageQueueAddress common.Address
messageQueueABI *abi.ABI
withdrawTrieRootSlot common.Hash

// The height of the block that the watcher has retrieved event logs
processedMsgHeight uint64

stopped uint64

metrics *l2WatcherMetrics
}

// NewL2WatcherClient take a l2geth instance to generate a l2watcherclient instance
func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmations rpc.BlockNumber, messengerAddress, messageQueueAddress common.Address, withdrawTrieRootSlot common.Hash, db *gorm.DB, reg prometheus.Registerer) *L2WatcherClient {
l1MessageOrm := orm.NewL1Message(db)
var savedHeight uint64
l1msg, err := l1MessageOrm.GetLayer1LatestMessageWithLayer2Hash()
if err != nil || l1msg == nil {
log.Warn("fetch height from db failed", "err", err)
savedHeight = 0
} else {
receipt, err := client.TransactionReceipt(ctx, common.HexToHash(l1msg.Layer2Hash))
if err != nil || receipt == nil {
log.Warn("get tx from l2 failed", "err", err)
savedHeight = 0
} else {
savedHeight = receipt.BlockNumber.Uint64()
}
}

w := L2WatcherClient{
func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmations rpc.BlockNumber, messageQueueAddress common.Address, withdrawTrieRootSlot common.Hash, db *gorm.DB, reg prometheus.Registerer) *L2WatcherClient {
return &L2WatcherClient{
ctx: ctx,
Client: client,

l2BlockOrm: orm.NewL2Block(db),
l1MessageOrm: orm.NewL1Message(db),
processedMsgHeight: savedHeight,
confirmations: confirmations,
l2BlockOrm: orm.NewL2Block(db),

messengerAddress: messengerAddress,
messengerABI: bridgeAbi.L2ScrollMessengerABI,
confirmations: confirmations,

messageQueueAddress: messageQueueAddress,
messageQueueABI: bridgeAbi.L2MessageQueueABI,
withdrawTrieRootSlot: withdrawTrieRootSlot,

stopped: 0,
metrics: initL2WatcherMetrics(reg),
}

return &w
}

const blocksFetchLimit = uint64(10)
Expand Down Expand Up @@ -157,124 +120,3 @@ func (w *L2WatcherClient) getAndStoreBlocks(ctx context.Context, from, to uint64

return nil
}

// FetchContractEvent pull latest event logs from given contract address and save in DB
func (w *L2WatcherClient) FetchContractEvent() {
defer func() {
log.Info("l2 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight)
}()

w.metrics.fetchContractEventTotal.Inc()
blockHeight, err := utils.GetLatestConfirmedBlockNumber(w.ctx, w.Client, w.confirmations)
if err != nil {
log.Error("failed to get block number", "err", err)
return
}

fromBlock := int64(w.processedMsgHeight) + 1
toBlock := int64(blockHeight)

for from := fromBlock; from <= toBlock; from += contractEventsBlocksFetchLimit {
to := from + contractEventsBlocksFetchLimit - 1

if to > toBlock {
to = toBlock
}

// warning: uint int conversion...
query := geth.FilterQuery{
FromBlock: big.NewInt(from), // inclusive
ToBlock: big.NewInt(to), // inclusive
Addresses: []common.Address{
w.messengerAddress,
w.messageQueueAddress,
},
Topics: make([][]common.Hash, 1),
}
query.Topics[0] = make([]common.Hash, 4)
query.Topics[0][0] = bridgeAbi.L2SentMessageEventSignature
query.Topics[0][1] = bridgeAbi.L2RelayedMessageEventSignature
query.Topics[0][2] = bridgeAbi.L2FailedRelayedMessageEventSignature
query.Topics[0][3] = bridgeAbi.L2AppendMessageEventSignature

logs, err := w.FilterLogs(w.ctx, query)
if err != nil {
log.Error("failed to get event logs", "err", err)
return
}
if len(logs) == 0 {
w.processedMsgHeight = uint64(to)
w.metrics.fetchContractEventHeight.Set(float64(to))
continue
}
log.Info("received new L2 messages", "fromBlock", from, "toBlock", to, "cnt", len(logs))

relayedMessageEvents, err := w.parseBridgeEventLogs(logs)
if err != nil {
log.Error("failed to parse emitted event log", "err", err)
return
}

relayedMessageCount := int64(len(relayedMessageEvents))
w.metrics.rollupL2MsgsRelayedEventsTotal.Add(float64(relayedMessageCount))
log.Info("L2 events types", "RelayedMessageCount", relayedMessageCount)

// Update relayed message first to make sure we don't forget to update submited message.
// Since, we always start sync from the latest unprocessed message.
for _, msg := range relayedMessageEvents {
var msgStatus types.MsgStatus
if msg.isSuccessful {
msgStatus = types.MsgConfirmed
} else {
msgStatus = types.MsgFailed
}
if err = w.l1MessageOrm.UpdateLayer1StatusAndLayer2Hash(w.ctx, msg.msgHash.String(), msgStatus, msg.txHash.String()); err != nil {
log.Error("Failed to update layer1 status and layer2 hash", "err", err)
return
}
}

w.processedMsgHeight = uint64(to)
w.metrics.fetchContractEventHeight.Set(float64(to))
}
}

func (w *L2WatcherClient) parseBridgeEventLogs(logs []gethTypes.Log) ([]relayedMessage, error) {
// Need use contract abi to parse event Log
// Can only be tested after we have our contracts set up

var relayedMessages []relayedMessage
for _, vLog := range logs {
switch vLog.Topics[0] {
case bridgeAbi.L2RelayedMessageEventSignature:
event := bridgeAbi.L2RelayedMessageEvent{}
err := utils.UnpackLog(w.messengerABI, &event, "RelayedMessage", vLog)
if err != nil {
log.Warn("Failed to unpack layer2 RelayedMessage event", "err", err)
return relayedMessages, err
}

relayedMessages = append(relayedMessages, relayedMessage{
msgHash: event.MessageHash,
txHash: vLog.TxHash,
isSuccessful: true,
})
case bridgeAbi.L2FailedRelayedMessageEventSignature:
event := bridgeAbi.L2FailedRelayedMessageEvent{}
err := utils.UnpackLog(w.messengerABI, &event, "FailedRelayedMessage", vLog)
if err != nil {
log.Warn("Failed to unpack layer2 FailedRelayedMessage event", "err", err)
return relayedMessages, err
}

relayedMessages = append(relayedMessages, relayedMessage{
msgHash: event.MessageHash,
txHash: vLog.TxHash,
isSuccessful: false,
})
log.Error("Unknown event", "topic", vLog.Topics[0], "txHash", vLog.TxHash)
}
}

return relayedMessages, nil
}
Loading

0 comments on commit 341d5be

Please sign in to comment.