Skip to content

Commit

Permalink
Merge pull request #53 from mysteriumnetwork/reconnectable-ethclient
Browse files Browse the repository at this point in the history
Add reconnactable ethereum client
  • Loading branch information
soffokl authored Apr 14, 2020
2 parents 45a9b2d + 239e10b commit 777ab18
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 31 deletions.
68 changes: 37 additions & 31 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
)

// DefaultBackoff is the default backoff for the client
var DefaultBackoff = time.Second * 3
const DefaultBackoff = time.Second * 3

// ProviderChannel represents the provider channel
type ProviderChannel struct {
Expand All @@ -48,36 +48,42 @@ type ProviderChannel struct {
Timelock *big.Int
}

type ethClientGetter interface {
Client() *ethclient.Client
}

// Blockchain contains all the useful blockchain utilities for the payment off chain messaging
type Blockchain struct {
client *ethclient.Client
ethClient ethClientGetter
bcTimeout time.Duration
nonceFunc nonceFunc
}

type nonceFunc func(ctx context.Context, account common.Address) (uint64, error)

// NewBlockchain returns a new instance of blockchain
func NewBlockchain(c *ethclient.Client, timeout time.Duration) *Blockchain {
func NewBlockchain(ethClient ethClientGetter, timeout time.Duration) *Blockchain {
return &Blockchain{
client: c,
ethClient: ethClient,
bcTimeout: timeout,
nonceFunc: c.PendingNonceAt,
nonceFunc: func(ctx context.Context, account common.Address) (uint64, error) {
return ethClient.Client().PendingNonceAt(ctx, account)
},
}
}

// NewBlockchainWithCustomNonceTracker returns a new instance of blockchain with the provided nonce tracking func
func NewBlockchainWithCustomNonceTracker(c *ethclient.Client, timeout time.Duration, nonceFunc nonceFunc) *Blockchain {
func NewBlockchainWithCustomNonceTracker(ethClient ethClientGetter, timeout time.Duration, nonceFunc nonceFunc) *Blockchain {
return &Blockchain{
client: c,
ethClient: ethClient,
bcTimeout: timeout,
nonceFunc: nonceFunc,
}
}

// GetAccountantFee fetches the accountant fee from blockchain
func (bc *Blockchain) GetAccountantFee(accountantAddress common.Address) (uint16, error) {
caller, err := bindings.NewAccountantImplementationCaller(accountantAddress, bc.client)
caller, err := bindings.NewAccountantImplementationCaller(accountantAddress, bc.ethClient.Client())
if err != nil {
return 0, errors.Wrap(err, "could not create accountant implementation caller")
}
Expand All @@ -97,7 +103,7 @@ func (bc *Blockchain) GetAccountantFee(accountantAddress common.Address) (uint16

// CalculateAccountantFee calls blockchain for calculation of accountant fee
func (bc *Blockchain) CalculateAccountantFee(accountantAddress common.Address, value uint64) (*big.Int, error) {
caller, err := bindings.NewAccountantImplementationCaller(accountantAddress, bc.client)
caller, err := bindings.NewAccountantImplementationCaller(accountantAddress, bc.ethClient.Client())
if err != nil {
return nil, errors.Wrap(err, "could not create accountant implementation caller")
}
Expand Down Expand Up @@ -132,7 +138,7 @@ func (bc *Blockchain) IsRegisteredAsProvider(accountantAddress, registryAddress,
// SubscribeToMystTokenTransfers subscribes to myst token transfers
func (bc *Blockchain) SubscribeToMystTokenTransfers(mystSCAddress common.Address) (chan *bindings.MystTokenTransfer, func(), error) {
sink := make(chan *bindings.MystTokenTransfer)
mtc, err := bindings.NewMystTokenFilterer(mystSCAddress, bc.client)
mtc, err := bindings.NewMystTokenFilterer(mystSCAddress, bc.ethClient.Client())
if err != nil {
return sink, nil, err
}
Expand All @@ -155,7 +161,7 @@ func (bc *Blockchain) SubscribeToMystTokenTransfers(mystSCAddress common.Address
// SubscribeToConsumerBalanceEvent subscribes to balance change events in blockchain
func (bc *Blockchain) SubscribeToConsumerBalanceEvent(channel, mystSCAddress common.Address, timeout time.Duration) (chan *bindings.MystTokenTransfer, func(), error) {
sink := make(chan *bindings.MystTokenTransfer)
mtc, err := bindings.NewMystTokenFilterer(mystSCAddress, bc.client)
mtc, err := bindings.NewMystTokenFilterer(mystSCAddress, bc.ethClient.Client())
if err != nil {
return sink, nil, err
}
Expand Down Expand Up @@ -188,7 +194,7 @@ func (bc *Blockchain) GetProviderChannel(accountantAddress common.Address, addre
if err != nil {
return ProviderChannel{}, errors.Wrap(err, "could not calculate provider channel address")
}
caller, err := bindings.NewAccountantImplementationCaller(accountantAddress, bc.client)
caller, err := bindings.NewAccountantImplementationCaller(accountantAddress, bc.ethClient.Client())
if err != nil {
return ProviderChannel{}, errors.Wrap(err, "could not create accountant caller")
}
Expand Down Expand Up @@ -231,7 +237,7 @@ func (bc *Blockchain) SubscribeToPromiseSettledEvent(providerID, accountantID co

// IsRegistered checks wether the given identity is registered or not
func (bc *Blockchain) IsRegistered(registryAddress, addressToCheck common.Address) (bool, error) {
caller, err := bindings.NewRegistryCaller(registryAddress, bc.client)
caller, err := bindings.NewRegistryCaller(registryAddress, bc.ethClient.Client())
if err != nil {
return false, errors.Wrap(err, "could not create registry caller")
}
Expand All @@ -247,7 +253,7 @@ func (bc *Blockchain) IsRegistered(registryAddress, addressToCheck common.Addres

// GetMystBalance returns myst balance
func (bc *Blockchain) GetMystBalance(mystAddress, identity common.Address) (*big.Int, error) {
c, err := bindings.NewMystTokenCaller(mystAddress, bc.client)
c, err := bindings.NewMystTokenCaller(mystAddress, bc.ethClient.Client())
if err != nil {
return nil, err
}
Expand All @@ -262,7 +268,7 @@ func (bc *Blockchain) GetMystBalance(mystAddress, identity common.Address) (*big
// GetRegistrationFee returns fee required by registry
func (bc *Blockchain) GetRegistrationFee(registryAddress common.Address) (*big.Int, error) {
// TODO to reduce amount of blockchain calls, it could get registration fee from cache (updated once in a day)
c, err := bindings.NewRegistryCaller(registryAddress, bc.client)
c, err := bindings.NewRegistryCaller(registryAddress, bc.ethClient.Client())
if err != nil {
return nil, errors.Wrap(err, "could not get registration fee")
}
Expand Down Expand Up @@ -304,12 +310,12 @@ func (bc *Blockchain) EstimateGas(msg ethereum.CallMsg) (uint64, error) {
parent := context.Background()
ctx, cancel := context.WithTimeout(parent, bc.bcTimeout)
defer cancel()
return bc.client.EstimateGas(ctx, msg)
return bc.ethClient.Client().EstimateGas(ctx, msg)
}

// RegisterIdentity registers the given identity on blockchain
func (bc *Blockchain) RegisterIdentity(rr RegistrationRequest) (*types.Transaction, error) {
transactor, err := bindings.NewRegistryTransactor(rr.RegistryAddress, bc.client)
transactor, err := bindings.NewRegistryTransactor(rr.RegistryAddress, bc.ethClient.Client())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -348,7 +354,7 @@ type TransferRequest struct {

// TransferMyst transfers myst
func (bc *Blockchain) TransferMyst(req TransferRequest) (tx *types.Transaction, err error) {
transactor, err := bindings.NewMystTokenTransactor(req.MystAddress, bc.client)
transactor, err := bindings.NewMystTokenTransactor(req.MystAddress, bc.ethClient.Client())
if err != nil {
return tx, err
}
Expand All @@ -369,7 +375,7 @@ func (bc *Blockchain) TransferMyst(req TransferRequest) (tx *types.Transaction,

// IsAccountantRegistered checks if given accountant is registered and returns true or false.
func (bc *Blockchain) IsAccountantRegistered(registryAddress, acccountantID common.Address) (bool, error) {
caller, err := bindings.NewRegistryCaller(registryAddress, bc.client)
caller, err := bindings.NewRegistryCaller(registryAddress, bc.ethClient.Client())
if err != nil {
return false, err
}
Expand All @@ -384,7 +390,7 @@ func (bc *Blockchain) IsAccountantRegistered(registryAddress, acccountantID comm

// GetAccountantOperator returns operator address of given accountant
func (bc *Blockchain) GetAccountantOperator(accountantID common.Address) (common.Address, error) {
caller, err := bindings.NewAccountantImplementationCaller(accountantID, bc.client)
caller, err := bindings.NewAccountantImplementationCaller(accountantID, bc.ethClient.Client())
if err != nil {
return common.Address{}, err
}
Expand All @@ -407,7 +413,7 @@ type SettleAndRebalanceRequest struct {

// SettleAndRebalance is settling given accountant issued promise
func (bc *Blockchain) SettleAndRebalance(req SettleAndRebalanceRequest) (*types.Transaction, error) {
transactor, err := bindings.NewAccountantImplementationTransactor(req.AccountantID, bc.client)
transactor, err := bindings.NewAccountantImplementationTransactor(req.AccountantID, bc.ethClient.Client())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -442,7 +448,7 @@ func toBytes32(arr []byte) (res [32]byte) {

// GetProviderChannelByID returns the given provider channel information
func (bc *Blockchain) GetProviderChannelByID(acc common.Address, chID []byte) (ProviderChannel, error) {
caller, err := bindings.NewAccountantImplementationCaller(acc, bc.client)
caller, err := bindings.NewAccountantImplementationCaller(acc, bc.ethClient.Client())
if err != nil {
return ProviderChannel{}, err
}
Expand All @@ -465,7 +471,7 @@ type ConsumersAccountant struct {

// GetConsumerChannelsAccountant returns the consumer channels accountant
func (bc *Blockchain) GetConsumerChannelsAccountant(channelAddress common.Address) (ConsumersAccountant, error) {
c, err := bindings.NewChannelImplementationCaller(channelAddress, bc.client)
c, err := bindings.NewChannelImplementationCaller(channelAddress, bc.ethClient.Client())
if err != nil {
return ConsumersAccountant{}, err
}
Expand All @@ -478,7 +484,7 @@ func (bc *Blockchain) GetConsumerChannelsAccountant(channelAddress common.Addres

// GetConsumerChannelOperator returns the consumer channel operator/identity
func (bc *Blockchain) GetConsumerChannelOperator(channelAddress common.Address) (common.Address, error) {
c, err := bindings.NewChannelImplementationCaller(channelAddress, bc.client)
c, err := bindings.NewChannelImplementationCaller(channelAddress, bc.ethClient.Client())
if err != nil {
return common.Address{}, err
}
Expand All @@ -491,7 +497,7 @@ func (bc *Blockchain) GetConsumerChannelOperator(channelAddress common.Address)

// SubscribeToIdentityRegistrationEvents subscribes to identity registration events
func (bc *Blockchain) SubscribeToIdentityRegistrationEvents(registryAddress common.Address, accountantIDs []common.Address) (sink chan *bindings.RegistryRegisteredIdentity, cancel func(), err error) {
filterer, err := bindings.NewRegistryFilterer(registryAddress, bc.client)
filterer, err := bindings.NewRegistryFilterer(registryAddress, bc.ethClient.Client())
if err != nil {
return sink, cancel, errors.Wrap(err, "could not create registry filterer")
}
Expand All @@ -513,7 +519,7 @@ func (bc *Blockchain) SubscribeToIdentityRegistrationEvents(registryAddress comm

// SubscribeToConsumerChannelBalanceUpdate subscribes to consumer channel balance update events
func (bc *Blockchain) SubscribeToConsumerChannelBalanceUpdate(mystSCAddress common.Address, channelAddresses []common.Address) (sink chan *bindings.MystTokenTransfer, cancel func(), err error) {
filterer, err := bindings.NewMystTokenFilterer(mystSCAddress, bc.client)
filterer, err := bindings.NewMystTokenFilterer(mystSCAddress, bc.ethClient.Client())
if err != nil {
return sink, cancel, errors.Wrap(err, "could not create myst token filterer")
}
Expand All @@ -536,7 +542,7 @@ func (bc *Blockchain) SubscribeToConsumerChannelBalanceUpdate(mystSCAddress comm

// SubscribeToProviderChannelBalanceUpdate subscribes to provider channel balance update events
func (bc *Blockchain) SubscribeToProviderChannelBalanceUpdate(accountantAddress common.Address, channelAddresses [][32]byte) (sink chan *bindings.AccountantImplementationChannelBalanceUpdated, cancel func(), err error) {
filterer, err := bindings.NewAccountantImplementationFilterer(accountantAddress, bc.client)
filterer, err := bindings.NewAccountantImplementationFilterer(accountantAddress, bc.ethClient.Client())
if err != nil {
return sink, cancel, errors.Wrap(err, "could not create accountant implementation filterer")
}
Expand Down Expand Up @@ -566,7 +572,7 @@ type SettleRequest struct {

// SettlePromise is settling the given consumer issued promise
func (bc *Blockchain) SettlePromise(req SettleRequest) (*types.Transaction, error) {
transactor, err := bindings.NewChannelImplementationTransactor(req.ChannelID, bc.client)
transactor, err := bindings.NewChannelImplementationTransactor(req.ChannelID, bc.ethClient.Client())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -603,7 +609,7 @@ func (bc *Blockchain) getNonce(identity common.Address) (uint64, error) {

// SubscribeToChannelOpenedEvents subscribes to provider channel opened events
func (bc *Blockchain) SubscribeToChannelOpenedEvents(accountantAddress common.Address) (sink chan *bindings.AccountantImplementationChannelOpened, cancel func(), err error) {
filterer, err := bindings.NewAccountantImplementationFilterer(accountantAddress, bc.client)
filterer, err := bindings.NewAccountantImplementationFilterer(accountantAddress, bc.ethClient.Client())
if err != nil {
return sink, cancel, errors.Wrap(err, "could not create accountant implementation filterer")
}
Expand All @@ -626,7 +632,7 @@ func (bc *Blockchain) SubscribeToChannelOpenedEvents(accountantAddress common.Ad

// SubscribeToPromiseSettledEventByChannelID subscribes to promise settled events
func (bc *Blockchain) SubscribeToPromiseSettledEventByChannelID(accountantID common.Address, providerAddresses [][32]byte) (sink chan *bindings.AccountantImplementationPromiseSettled, cancel func(), err error) {
caller, err := bindings.NewAccountantImplementationFilterer(accountantID, bc.client)
caller, err := bindings.NewAccountantImplementationFilterer(accountantID, bc.ethClient.Client())
if err != nil {
return sink, cancel, errors.Wrap(err, "could not create accountant caller")
}
Expand All @@ -653,7 +659,7 @@ func (bc *Blockchain) SubscribeToPromiseSettledEventByChannelID(accountantID com
func (bc *Blockchain) NetworkID() (*big.Int, error) {
ctx, cancel := context.WithTimeout(context.Background(), bc.bcTimeout)
defer cancel()
return bc.client.NetworkID(ctx)
return bc.ethClient.Client().NetworkID(ctx)
}

// ConsumerChannel represents the consumer channel
Expand Down
69 changes: 69 additions & 0 deletions client/ethclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (C) 2020 The "MysteriumNetwork/payments" Authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package client

import (
"fmt"
"sync"

"github.com/ethereum/go-ethereum/ethclient"
)

// NewReconnectableEthClient creates new ethereum client that can reconnect.
func NewReconnectableEthClient(address string) (*ReconnectableEthClient, error) {
ec, err := ethclient.Dial(address)
if err != nil {
return nil, fmt.Errorf("ethereum client failed to connect: %w", err)
}

return &ReconnectableEthClient{
address: address,
client: ec,
}, nil
}

// ReconnectableEthClient is a ethereum client that can reconnect.
type ReconnectableEthClient struct {
address string
mu sync.Mutex
client *ethclient.Client
}

// Client returns the currently connected ethereum client.
func (c *ReconnectableEthClient) Client() *ethclient.Client {
c.mu.Lock()
defer c.mu.Unlock()

return c.client
}

// Reconnect creates new ethereum client and replaces the current one.
func (c *ReconnectableEthClient) Reconnect() error {
c.mu.Lock()
defer c.mu.Unlock()

client, err := ethclient.Dial(c.address)
if err != nil {
return fmt.Errorf("ethereum client failed to dial: %w", err)
}

c.client.Close()
c.client = client

return nil
}
39 changes: 39 additions & 0 deletions client/ethclient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (C) 2020 The "MysteriumNetwork/payments" Authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package client

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestReconnectableEthClientCreatesNewClient(t *testing.T) {
client, err := NewReconnectableEthClient("http://127.0.0.1:1234")
assert.Nil(t, err)

c1 := client.Client()
c2 := client.Client()
assert.Equal(t, c1, c2)

err = client.Reconnect()
assert.Nil(t, err)

c3 := client.Client()
assert.NotEqual(t, c1, c3)
}

0 comments on commit 777ab18

Please sign in to comment.