From 6cbf2a840f694f75daf8b876db34af5de671bad1 Mon Sep 17 00:00:00 2001 From: Ron Date: Mon, 19 Aug 2024 20:49:03 +0800 Subject: [PATCH] Decentralize parachain relayer with linear timeout (#1266) * Decentralize relayer with linear timeout * Fix modulo calculation * Rename as TotalRelayerCount * Restore start-relayer * Make SleepInterval configurable --- relayer/chain/parachain/schedule_test.go | 31 ++++++++ relayer/relays/parachain/beefy-listener.go | 52 +++++++++--- relayer/relays/parachain/config.go | 33 +++++++- relayer/relays/parachain/main.go | 3 + relayer/relays/parachain/scanner.go | 42 +++++----- web/packages/test/config/parachain-relay.json | 5 ++ web/packages/test/scripts/start-relayer.sh | 79 +++++++++++++++++-- 7 files changed, 209 insertions(+), 36 deletions(-) create mode 100644 relayer/chain/parachain/schedule_test.go diff --git a/relayer/chain/parachain/schedule_test.go b/relayer/chain/parachain/schedule_test.go new file mode 100644 index 0000000000..61b7de74f3 --- /dev/null +++ b/relayer/chain/parachain/schedule_test.go @@ -0,0 +1,31 @@ +package parachain_test + +import ( + "fmt" + "testing" +) + +// 100 messages distrubuted to 10 relayers, check waitingPeriod for each relayer +func TestModuloSchedule(t *testing.T) { + message_count := 100 + total_count := 10 + var waitingPeriod uint64 + for nonce := 1; nonce < message_count; nonce++ { + for id := 0; id < total_count; id++ { + waitingPeriod = uint64((nonce + total_count - id) % total_count) + fmt.Printf("algorithm 1: relay %d waiting for nonce %d for %d\n", id, nonce, waitingPeriod) + } + } + for nonce := 1; nonce < message_count; nonce++ { + for id := 0; id < total_count; id++ { + modNonce := nonce % total_count + if modNonce > id { + waitingPeriod = uint64(modNonce - id) + } else { + waitingPeriod = uint64(id - modNonce) + } + fmt.Printf("algorithm 2: relay %d waiting for nonce %d for %d\n", id, nonce, waitingPeriod) + } + } + +} diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index dd9bc0f988..d6689a5582 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -22,6 +23,7 @@ import ( type BeefyListener struct { config *SourceConfig + scheduleConfig *ScheduleConfig ethereumConn *ethereum.Connection beefyClientContract *contracts.BeefyClient relaychainConn *relaychain.Connection @@ -33,6 +35,7 @@ type BeefyListener struct { func NewBeefyListener( config *SourceConfig, + scheduleConfig *ScheduleConfig, ethereumConn *ethereum.Connection, relaychainConn *relaychain.Connection, parachainConnection *parachain.Connection, @@ -40,6 +43,7 @@ func NewBeefyListener( ) *BeefyListener { return &BeefyListener{ config: config, + scheduleConfig: scheduleConfig, ethereumConn: ethereumConn, relaychainConn: relaychainConn, parachainConnection: parachainConnection, @@ -154,18 +158,12 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er if err != nil { return err } - for _, task := range tasks { - // do final proof generation right before sending. The proof needs to be fresh. - task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) + paraNonce := (*task.MessageProofs)[0].Message.Nonce + waitingPeriod := (paraNonce + li.scheduleConfig.TotalRelayerCount - li.scheduleConfig.ID) % li.scheduleConfig.TotalRelayerCount + err = li.waitAndSend(ctx, task, waitingPeriod) if err != nil { - return err - } - select { - case <-ctx.Done(): - return ctx.Err() - case li.tasks <- task: - log.Info("Beefy Listener emitted new task") + return fmt.Errorf("wait task for nonce %d: %w", paraNonce, err) } } @@ -285,3 +283,37 @@ func (li *BeefyListener) generateProof(ctx context.Context, input *ProofInput, h return &output, nil } + +func (li *BeefyListener) waitAndSend(ctx context.Context, task *Task, waitingPeriod uint64) error { + paraNonce := (*task.MessageProofs)[0].Message.Nonce + log.Info(fmt.Sprintf("waiting for nonce %d to be picked up by another relayer", paraNonce)) + var cnt uint64 + var err error + for { + ethInboundNonce, err := li.scanner.findLatestNonce(ctx) + if err != nil { + return err + } + if ethInboundNonce >= paraNonce { + log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce)) + return nil + } + if cnt == waitingPeriod { + break + } + time.Sleep(time.Duration(li.scheduleConfig.SleepInterval) * time.Second) + cnt++ + } + log.Info(fmt.Sprintf("nonce %d is not picked up by any one, submit anyway", paraNonce)) + task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case li.tasks <- task: + log.Info("Beefy Listener emitted new task") + } + return nil +} diff --git a/relayer/relays/parachain/config.go b/relayer/relays/parachain/config.go index fcd8a32e40..97a9ebd3dd 100644 --- a/relayer/relays/parachain/config.go +++ b/relayer/relays/parachain/config.go @@ -1,13 +1,16 @@ package parachain import ( + "errors" "fmt" + "github.com/snowfork/snowbridge/relayer/config" ) type Config struct { - Source SourceConfig `mapstructure:"source"` - Sink SinkConfig `mapstructure:"sink"` + Source SourceConfig `mapstructure:"source"` + Sink SinkConfig `mapstructure:"sink"` + Schedule ScheduleConfig `mapstructure:"schedule"` } type SourceConfig struct { @@ -32,6 +35,25 @@ type SinkContractsConfig struct { Gateway string `mapstructure:"Gateway"` } +type ScheduleConfig struct { + // ID of current relayer, starting from 0 + ID uint64 `mapstructure:"id"` + // Number of total count of all relayers + TotalRelayerCount uint64 `mapstructure:"totalRelayerCount"` + // Sleep interval(in seconds) to check if message(nonce) has already been relayed + SleepInterval uint64 `mapstructure:"sleepInterval"` +} + +func (r ScheduleConfig) Validate() error { + if r.TotalRelayerCount < 1 { + return errors.New("Number of relayer is not set") + } + if r.ID >= r.TotalRelayerCount { + return errors.New("ID of the Number of relayer is not set") + } + return nil +} + type ChannelID [32]byte func (c Config) Validate() error { @@ -66,5 +88,12 @@ func (c Config) Validate() error { if c.Sink.Contracts.Gateway == "" { return fmt.Errorf("sink contracts setting [Gateway] is not set") } + + // Relay + err = c.Schedule.Validate() + if err != nil { + return fmt.Errorf("relay config: %w", err) + } + return nil } diff --git a/relayer/relays/parachain/main.go b/relayer/relays/parachain/main.go index 50bb95dcd0..5ce1c1b9c6 100644 --- a/relayer/relays/parachain/main.go +++ b/relayer/relays/parachain/main.go @@ -47,6 +47,7 @@ func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) { beefyListener := NewBeefyListener( &config.Source, + &config.Schedule, ethereumConnBeefy, relaychainConn, parachainConn, @@ -97,5 +98,7 @@ func (relay *Relay) Start(ctx context.Context, eg *errgroup.Group) error { return err } + log.Info("Current relay's ID:", relay.config.Schedule.ID) + return nil } diff --git a/relayer/relays/parachain/scanner.go b/relayer/relays/parachain/scanner.go index b71687309d..b884ed7d03 100644 --- a/relayer/relays/parachain/scanner.go +++ b/relayer/relays/parachain/scanner.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/snowfork/go-substrate-rpc-client/v4/scale" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -71,30 +72,13 @@ func (s *Scanner) findTasks( paraHash types.Hash, ) ([]*Task, error) { // Fetch latest nonce in ethereum gateway - gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway) - gatewayContract, err := contracts.NewGateway( - gatewayAddress, - s.ethConn.Client(), - ) - if err != nil { - return nil, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err) - } - - options := bind.CallOpts{ - Pending: true, - Context: ctx, - } - ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID) - if err != nil { - return nil, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err) - } + ethInboundNonce, err := s.findLatestNonce(ctx) log.WithFields(log.Fields{ "nonce": ethInboundNonce, "channelID": s.config.ChannelID, }).Info("Checked latest nonce delivered to ethereum gateway") // Fetch latest nonce in parachain outbound queue - paraNonceKey, err := types.CreateStorageKey(s.paraConn.Metadata(), "EthereumOutboundQueue", "Nonce", s.config.ChannelID[:], nil) if err != nil { return nil, fmt.Errorf("create storage key for parachain outbound queue nonce with channelID '%v': %w", s.config.ChannelID, err) @@ -457,3 +441,25 @@ func fetchMessageProof( return MessageProof{Message: message, Proof: proof}, nil } + +func (s *Scanner) findLatestNonce(ctx context.Context) (uint64, error) { + // Fetch latest nonce in ethereum gateway + gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway) + gatewayContract, err := contracts.NewGateway( + gatewayAddress, + s.ethConn.Client(), + ) + if err != nil { + return 0, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err) + } + + options := bind.CallOpts{ + Pending: true, + Context: ctx, + } + ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID) + if err != nil { + return 0, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err) + } + return ethInboundNonce, err +} diff --git a/web/packages/test/config/parachain-relay.json b/web/packages/test/config/parachain-relay.json index 8af6306073..8fd1c73cb3 100644 --- a/web/packages/test/config/parachain-relay.json +++ b/web/packages/test/config/parachain-relay.json @@ -24,5 +24,10 @@ "contracts": { "Gateway": null } + }, + "schedule": { + "id": null, + "totalRelayerCount": 3, + "sleepInterval": 45 } } diff --git a/web/packages/test/scripts/start-relayer.sh b/web/packages/test/scripts/start-relayer.sh index 5977de9f5e..c4a24876cc 100755 --- a/web/packages/test/scripts/start-relayer.sh +++ b/web/packages/test/scripts/start-relayer.sh @@ -54,7 +54,7 @@ config_relayer() { ' \ config/parachain-relay.json >$output_dir/parachain-relay-bridge-hub-02.json - # Configure parachain relay (asset hub) + # Configure parachain relay (asset hub)-0 jq \ --arg k1 "$(address_for GatewayProxy)" \ --arg k2 "$(address_for BeefyClient)" \ @@ -70,8 +70,49 @@ config_relayer() { | .sink.ethereum.endpoint = $eth_writer_endpoint | .sink.ethereum."gas-limit" = $eth_gas_limit | .source."channel-id" = $channelID + | .schedule.id = 0 ' \ - config/parachain-relay.json >$output_dir/parachain-relay-asset-hub.json + config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-0.json + + # Configure parachain relay (asset hub)-1 + jq \ + --arg k1 "$(address_for GatewayProxy)" \ + --arg k2 "$(address_for BeefyClient)" \ + --arg eth_endpoint_ws $eth_endpoint_ws \ + --arg eth_writer_endpoint $eth_writer_endpoint \ + --arg channelID $ASSET_HUB_CHANNEL_ID \ + --arg eth_gas_limit $eth_gas_limit \ + ' + .source.contracts.Gateway = $k1 + | .source.contracts.BeefyClient = $k2 + | .sink.contracts.Gateway = $k1 + | .source.ethereum.endpoint = $eth_endpoint_ws + | .sink.ethereum.endpoint = $eth_writer_endpoint + | .sink.ethereum."gas-limit" = $eth_gas_limit + | .source."channel-id" = $channelID + | .schedule.id = 1 + ' \ + config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-1.json + + # Configure parachain relay (asset hub)-2 + jq \ + --arg k1 "$(address_for GatewayProxy)" \ + --arg k2 "$(address_for BeefyClient)" \ + --arg eth_endpoint_ws $eth_endpoint_ws \ + --arg eth_writer_endpoint $eth_writer_endpoint \ + --arg channelID $ASSET_HUB_CHANNEL_ID \ + --arg eth_gas_limit $eth_gas_limit \ + ' + .source.contracts.Gateway = $k1 + | .source.contracts.BeefyClient = $k2 + | .sink.contracts.Gateway = $k1 + | .source.ethereum.endpoint = $eth_endpoint_ws + | .sink.ethereum.endpoint = $eth_writer_endpoint + | .sink.ethereum."gas-limit" = $eth_gas_limit + | .source."channel-id" = $channelID + | .schedule.id = 2 + ' \ + config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-2.json # Configure parachain relay (penpal) jq \ @@ -172,15 +213,41 @@ start_relayer() { done ) & - # Launch parachain relay for assethub + # Launch parachain relay 0 for assethub ( - : >"$output_dir"/parachain-relay-asset-hub.log + : >"$output_dir"/parachain-relay-asset-hub-0.log while :; do echo "Starting parachain relay (asset-hub) at $(date)" "${relay_bin}" run parachain \ - --config "$output_dir/parachain-relay-asset-hub.json" \ + --config "$output_dir/parachain-relay-asset-hub-0.json" \ --ethereum.private-key $parachain_relay_assethub_eth_key \ - >>"$output_dir"/parachain-relay-asset-hub.log 2>&1 || true + >>"$output_dir"/parachain-relay-asset-hub-0.log 2>&1 || true + sleep 20 + done + ) & + + # Launch parachain relay 1 for assethub + ( + : >"$output_dir"/parachain-relay-asset-hub-1.log + while :; do + echo "Starting parachain relay (asset-hub) at $(date)" + "${relay_bin}" run parachain \ + --config "$output_dir/parachain-relay-asset-hub-1.json" \ + --ethereum.private-key $parachain_relay_primary_gov_eth_key \ + >>"$output_dir"/parachain-relay-asset-hub-1.log 2>&1 || true + sleep 20 + done + ) & + + # Launch parachain relay 2 for assethub + ( + : >"$output_dir"/parachain-relay-asset-hub-2.log + while :; do + echo "Starting parachain relay (asset-hub) at $(date)" + "${relay_bin}" run parachain \ + --config "$output_dir/parachain-relay-asset-hub-2.json" \ + --ethereum.private-key $parachain_relay_secondary_gov_eth_key \ + >>"$output_dir"/parachain-relay-asset-hub-2.log 2>&1 || true sleep 20 done ) &