Skip to content

Commit

Permalink
Decentralize parachain relayer with linear timeout (#1266)
Browse files Browse the repository at this point in the history
* Decentralize relayer with linear timeout

* Fix modulo calculation

* Rename as TotalRelayerCount

* Restore start-relayer

* Make SleepInterval configurable
  • Loading branch information
yrong authored Aug 19, 2024
1 parent 96565f2 commit 6cbf2a8
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 36 deletions.
31 changes: 31 additions & 0 deletions relayer/chain/parachain/schedule_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package parachain_test

import (
"fmt"
"testing"
)

// 100 messages distrubuted to 10 relayers, check waitingPeriod for each relayer
func TestModuloSchedule(t *testing.T) {
message_count := 100
total_count := 10
var waitingPeriod uint64
for nonce := 1; nonce < message_count; nonce++ {
for id := 0; id < total_count; id++ {
waitingPeriod = uint64((nonce + total_count - id) % total_count)
fmt.Printf("algorithm 1: relay %d waiting for nonce %d for %d\n", id, nonce, waitingPeriod)
}
}
for nonce := 1; nonce < message_count; nonce++ {
for id := 0; id < total_count; id++ {
modNonce := nonce % total_count
if modNonce > id {
waitingPeriod = uint64(modNonce - id)
} else {
waitingPeriod = uint64(id - modNonce)
}
fmt.Printf("algorithm 2: relay %d waiting for nonce %d for %d\n", id, nonce, waitingPeriod)
}
}

}
52 changes: 42 additions & 10 deletions relayer/relays/parachain/beefy-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -22,6 +23,7 @@ import (

type BeefyListener struct {
config *SourceConfig
scheduleConfig *ScheduleConfig
ethereumConn *ethereum.Connection
beefyClientContract *contracts.BeefyClient
relaychainConn *relaychain.Connection
Expand All @@ -33,13 +35,15 @@ type BeefyListener struct {

func NewBeefyListener(
config *SourceConfig,
scheduleConfig *ScheduleConfig,
ethereumConn *ethereum.Connection,
relaychainConn *relaychain.Connection,
parachainConnection *parachain.Connection,
tasks chan<- *Task,
) *BeefyListener {
return &BeefyListener{
config: config,
scheduleConfig: scheduleConfig,
ethereumConn: ethereumConn,
relaychainConn: relaychainConn,
parachainConnection: parachainConnection,
Expand Down Expand Up @@ -154,18 +158,12 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er
if err != nil {
return err
}

for _, task := range tasks {
// do final proof generation right before sending. The proof needs to be fresh.
task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header)
paraNonce := (*task.MessageProofs)[0].Message.Nonce
waitingPeriod := (paraNonce + li.scheduleConfig.TotalRelayerCount - li.scheduleConfig.ID) % li.scheduleConfig.TotalRelayerCount
err = li.waitAndSend(ctx, task, waitingPeriod)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case li.tasks <- task:
log.Info("Beefy Listener emitted new task")
return fmt.Errorf("wait task for nonce %d: %w", paraNonce, err)
}
}

Expand Down Expand Up @@ -285,3 +283,37 @@ func (li *BeefyListener) generateProof(ctx context.Context, input *ProofInput, h

return &output, nil
}

func (li *BeefyListener) waitAndSend(ctx context.Context, task *Task, waitingPeriod uint64) error {
paraNonce := (*task.MessageProofs)[0].Message.Nonce
log.Info(fmt.Sprintf("waiting for nonce %d to be picked up by another relayer", paraNonce))
var cnt uint64
var err error
for {
ethInboundNonce, err := li.scanner.findLatestNonce(ctx)
if err != nil {
return err
}
if ethInboundNonce >= paraNonce {
log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce))
return nil
}
if cnt == waitingPeriod {
break
}
time.Sleep(time.Duration(li.scheduleConfig.SleepInterval) * time.Second)
cnt++
}
log.Info(fmt.Sprintf("nonce %d is not picked up by any one, submit anyway", paraNonce))
task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case li.tasks <- task:
log.Info("Beefy Listener emitted new task")
}
return nil
}
33 changes: 31 additions & 2 deletions relayer/relays/parachain/config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package parachain

import (
"errors"
"fmt"

"github.com/snowfork/snowbridge/relayer/config"
)

type Config struct {
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
Schedule ScheduleConfig `mapstructure:"schedule"`
}

type SourceConfig struct {
Expand All @@ -32,6 +35,25 @@ type SinkContractsConfig struct {
Gateway string `mapstructure:"Gateway"`
}

type ScheduleConfig struct {
// ID of current relayer, starting from 0
ID uint64 `mapstructure:"id"`
// Number of total count of all relayers
TotalRelayerCount uint64 `mapstructure:"totalRelayerCount"`
// Sleep interval(in seconds) to check if message(nonce) has already been relayed
SleepInterval uint64 `mapstructure:"sleepInterval"`
}

func (r ScheduleConfig) Validate() error {
if r.TotalRelayerCount < 1 {
return errors.New("Number of relayer is not set")
}
if r.ID >= r.TotalRelayerCount {
return errors.New("ID of the Number of relayer is not set")
}
return nil
}

type ChannelID [32]byte

func (c Config) Validate() error {
Expand Down Expand Up @@ -66,5 +88,12 @@ func (c Config) Validate() error {
if c.Sink.Contracts.Gateway == "" {
return fmt.Errorf("sink contracts setting [Gateway] is not set")
}

// Relay
err = c.Schedule.Validate()
if err != nil {
return fmt.Errorf("relay config: %w", err)
}

return nil
}
3 changes: 3 additions & 0 deletions relayer/relays/parachain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) {

beefyListener := NewBeefyListener(
&config.Source,
&config.Schedule,
ethereumConnBeefy,
relaychainConn,
parachainConn,
Expand Down Expand Up @@ -97,5 +98,7 @@ func (relay *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
return err
}

log.Info("Current relay's ID:", relay.config.Schedule.ID)

return nil
}
42 changes: 24 additions & 18 deletions relayer/relays/parachain/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"

"github.com/snowfork/go-substrate-rpc-client/v4/scale"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -71,30 +72,13 @@ func (s *Scanner) findTasks(
paraHash types.Hash,
) ([]*Task, error) {
// Fetch latest nonce in ethereum gateway
gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway)
gatewayContract, err := contracts.NewGateway(
gatewayAddress,
s.ethConn.Client(),
)
if err != nil {
return nil, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err)
}

options := bind.CallOpts{
Pending: true,
Context: ctx,
}
ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID)
if err != nil {
return nil, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err)
}
ethInboundNonce, err := s.findLatestNonce(ctx)
log.WithFields(log.Fields{
"nonce": ethInboundNonce,
"channelID": s.config.ChannelID,
}).Info("Checked latest nonce delivered to ethereum gateway")

// Fetch latest nonce in parachain outbound queue

paraNonceKey, err := types.CreateStorageKey(s.paraConn.Metadata(), "EthereumOutboundQueue", "Nonce", s.config.ChannelID[:], nil)
if err != nil {
return nil, fmt.Errorf("create storage key for parachain outbound queue nonce with channelID '%v': %w", s.config.ChannelID, err)
Expand Down Expand Up @@ -457,3 +441,25 @@ func fetchMessageProof(

return MessageProof{Message: message, Proof: proof}, nil
}

func (s *Scanner) findLatestNonce(ctx context.Context) (uint64, error) {
// Fetch latest nonce in ethereum gateway
gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway)
gatewayContract, err := contracts.NewGateway(
gatewayAddress,
s.ethConn.Client(),
)
if err != nil {
return 0, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err)
}

options := bind.CallOpts{
Pending: true,
Context: ctx,
}
ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID)
if err != nil {
return 0, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err)
}
return ethInboundNonce, err
}
5 changes: 5 additions & 0 deletions web/packages/test/config/parachain-relay.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,10 @@
"contracts": {
"Gateway": null
}
},
"schedule": {
"id": null,
"totalRelayerCount": 3,
"sleepInterval": 45
}
}
79 changes: 73 additions & 6 deletions web/packages/test/scripts/start-relayer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ config_relayer() {
' \
config/parachain-relay.json >$output_dir/parachain-relay-bridge-hub-02.json

# Configure parachain relay (asset hub)
# Configure parachain relay (asset hub)-0
jq \
--arg k1 "$(address_for GatewayProxy)" \
--arg k2 "$(address_for BeefyClient)" \
Expand All @@ -70,8 +70,49 @@ config_relayer() {
| .sink.ethereum.endpoint = $eth_writer_endpoint
| .sink.ethereum."gas-limit" = $eth_gas_limit
| .source."channel-id" = $channelID
| .schedule.id = 0
' \
config/parachain-relay.json >$output_dir/parachain-relay-asset-hub.json
config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-0.json

# Configure parachain relay (asset hub)-1
jq \
--arg k1 "$(address_for GatewayProxy)" \
--arg k2 "$(address_for BeefyClient)" \
--arg eth_endpoint_ws $eth_endpoint_ws \
--arg eth_writer_endpoint $eth_writer_endpoint \
--arg channelID $ASSET_HUB_CHANNEL_ID \
--arg eth_gas_limit $eth_gas_limit \
'
.source.contracts.Gateway = $k1
| .source.contracts.BeefyClient = $k2
| .sink.contracts.Gateway = $k1
| .source.ethereum.endpoint = $eth_endpoint_ws
| .sink.ethereum.endpoint = $eth_writer_endpoint
| .sink.ethereum."gas-limit" = $eth_gas_limit
| .source."channel-id" = $channelID
| .schedule.id = 1
' \
config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-1.json

# Configure parachain relay (asset hub)-2
jq \
--arg k1 "$(address_for GatewayProxy)" \
--arg k2 "$(address_for BeefyClient)" \
--arg eth_endpoint_ws $eth_endpoint_ws \
--arg eth_writer_endpoint $eth_writer_endpoint \
--arg channelID $ASSET_HUB_CHANNEL_ID \
--arg eth_gas_limit $eth_gas_limit \
'
.source.contracts.Gateway = $k1
| .source.contracts.BeefyClient = $k2
| .sink.contracts.Gateway = $k1
| .source.ethereum.endpoint = $eth_endpoint_ws
| .sink.ethereum.endpoint = $eth_writer_endpoint
| .sink.ethereum."gas-limit" = $eth_gas_limit
| .source."channel-id" = $channelID
| .schedule.id = 2
' \
config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-2.json

# Configure parachain relay (penpal)
jq \
Expand Down Expand Up @@ -172,15 +213,41 @@ start_relayer() {
done
) &

# Launch parachain relay for assethub
# Launch parachain relay 0 for assethub
(
: >"$output_dir"/parachain-relay-asset-hub.log
: >"$output_dir"/parachain-relay-asset-hub-0.log
while :; do
echo "Starting parachain relay (asset-hub) at $(date)"
"${relay_bin}" run parachain \
--config "$output_dir/parachain-relay-asset-hub.json" \
--config "$output_dir/parachain-relay-asset-hub-0.json" \
--ethereum.private-key $parachain_relay_assethub_eth_key \
>>"$output_dir"/parachain-relay-asset-hub.log 2>&1 || true
>>"$output_dir"/parachain-relay-asset-hub-0.log 2>&1 || true
sleep 20
done
) &

# Launch parachain relay 1 for assethub
(
: >"$output_dir"/parachain-relay-asset-hub-1.log
while :; do
echo "Starting parachain relay (asset-hub) at $(date)"
"${relay_bin}" run parachain \
--config "$output_dir/parachain-relay-asset-hub-1.json" \
--ethereum.private-key $parachain_relay_primary_gov_eth_key \
>>"$output_dir"/parachain-relay-asset-hub-1.log 2>&1 || true
sleep 20
done
) &

# Launch parachain relay 2 for assethub
(
: >"$output_dir"/parachain-relay-asset-hub-2.log
while :; do
echo "Starting parachain relay (asset-hub) at $(date)"
"${relay_bin}" run parachain \
--config "$output_dir/parachain-relay-asset-hub-2.json" \
--ethereum.private-key $parachain_relay_secondary_gov_eth_key \
>>"$output_dir"/parachain-relay-asset-hub-2.log 2>&1 || true
sleep 20
done
) &
Expand Down

0 comments on commit 6cbf2a8

Please sign in to comment.