diff --git a/client/client.go b/client/client.go index 07a346a..c781f70 100644 --- a/client/client.go +++ b/client/client.go @@ -36,7 +36,7 @@ import ( ) // DefaultBackoff is the default backoff for the client -var DefaultBackoff = time.Second * 3 +const DefaultBackoff = time.Second * 3 // ProviderChannel represents the provider channel type ProviderChannel struct { @@ -48,9 +48,13 @@ type ProviderChannel struct { Timelock *big.Int } +type ethClientGetter interface { + Client() *ethclient.Client +} + // Blockchain contains all the useful blockchain utilities for the payment off chain messaging type Blockchain struct { - client *ethclient.Client + ethClient ethClientGetter bcTimeout time.Duration nonceFunc nonceFunc } @@ -58,18 +62,20 @@ type Blockchain struct { type nonceFunc func(ctx context.Context, account common.Address) (uint64, error) // NewBlockchain returns a new instance of blockchain -func NewBlockchain(c *ethclient.Client, timeout time.Duration) *Blockchain { +func NewBlockchain(ethClient ethClientGetter, timeout time.Duration) *Blockchain { return &Blockchain{ - client: c, + ethClient: ethClient, bcTimeout: timeout, - nonceFunc: c.PendingNonceAt, + nonceFunc: func(ctx context.Context, account common.Address) (uint64, error) { + return ethClient.Client().PendingNonceAt(ctx, account) + }, } } // NewBlockchainWithCustomNonceTracker returns a new instance of blockchain with the provided nonce tracking func -func NewBlockchainWithCustomNonceTracker(c *ethclient.Client, timeout time.Duration, nonceFunc nonceFunc) *Blockchain { +func NewBlockchainWithCustomNonceTracker(ethClient ethClientGetter, timeout time.Duration, nonceFunc nonceFunc) *Blockchain { return &Blockchain{ - client: c, + ethClient: ethClient, bcTimeout: timeout, nonceFunc: nonceFunc, } @@ -77,7 +83,7 @@ func NewBlockchainWithCustomNonceTracker(c *ethclient.Client, timeout time.Durat // GetAccountantFee fetches the accountant fee from blockchain func (bc *Blockchain) GetAccountantFee(accountantAddress common.Address) (uint16, error) { - caller, err := bindings.NewAccountantImplementationCaller(accountantAddress, bc.client) + caller, err := bindings.NewAccountantImplementationCaller(accountantAddress, bc.ethClient.Client()) if err != nil { return 0, errors.Wrap(err, "could not create accountant implementation caller") } @@ -97,7 +103,7 @@ func (bc *Blockchain) GetAccountantFee(accountantAddress common.Address) (uint16 // CalculateAccountantFee calls blockchain for calculation of accountant fee func (bc *Blockchain) CalculateAccountantFee(accountantAddress common.Address, value uint64) (*big.Int, error) { - caller, err := bindings.NewAccountantImplementationCaller(accountantAddress, bc.client) + caller, err := bindings.NewAccountantImplementationCaller(accountantAddress, bc.ethClient.Client()) if err != nil { return nil, errors.Wrap(err, "could not create accountant implementation caller") } @@ -132,7 +138,7 @@ func (bc *Blockchain) IsRegisteredAsProvider(accountantAddress, registryAddress, // SubscribeToMystTokenTransfers subscribes to myst token transfers func (bc *Blockchain) SubscribeToMystTokenTransfers(mystSCAddress common.Address) (chan *bindings.MystTokenTransfer, func(), error) { sink := make(chan *bindings.MystTokenTransfer) - mtc, err := bindings.NewMystTokenFilterer(mystSCAddress, bc.client) + mtc, err := bindings.NewMystTokenFilterer(mystSCAddress, bc.ethClient.Client()) if err != nil { return sink, nil, err } @@ -155,7 +161,7 @@ func (bc *Blockchain) SubscribeToMystTokenTransfers(mystSCAddress common.Address // SubscribeToConsumerBalanceEvent subscribes to balance change events in blockchain func (bc *Blockchain) SubscribeToConsumerBalanceEvent(channel, mystSCAddress common.Address, timeout time.Duration) (chan *bindings.MystTokenTransfer, func(), error) { sink := make(chan *bindings.MystTokenTransfer) - mtc, err := bindings.NewMystTokenFilterer(mystSCAddress, bc.client) + mtc, err := bindings.NewMystTokenFilterer(mystSCAddress, bc.ethClient.Client()) if err != nil { return sink, nil, err } @@ -188,7 +194,7 @@ func (bc *Blockchain) GetProviderChannel(accountantAddress common.Address, addre if err != nil { return ProviderChannel{}, errors.Wrap(err, "could not calculate provider channel address") } - caller, err := bindings.NewAccountantImplementationCaller(accountantAddress, bc.client) + caller, err := bindings.NewAccountantImplementationCaller(accountantAddress, bc.ethClient.Client()) if err != nil { return ProviderChannel{}, errors.Wrap(err, "could not create accountant caller") } @@ -231,7 +237,7 @@ func (bc *Blockchain) SubscribeToPromiseSettledEvent(providerID, accountantID co // IsRegistered checks wether the given identity is registered or not func (bc *Blockchain) IsRegistered(registryAddress, addressToCheck common.Address) (bool, error) { - caller, err := bindings.NewRegistryCaller(registryAddress, bc.client) + caller, err := bindings.NewRegistryCaller(registryAddress, bc.ethClient.Client()) if err != nil { return false, errors.Wrap(err, "could not create registry caller") } @@ -247,7 +253,7 @@ func (bc *Blockchain) IsRegistered(registryAddress, addressToCheck common.Addres // GetMystBalance returns myst balance func (bc *Blockchain) GetMystBalance(mystAddress, identity common.Address) (*big.Int, error) { - c, err := bindings.NewMystTokenCaller(mystAddress, bc.client) + c, err := bindings.NewMystTokenCaller(mystAddress, bc.ethClient.Client()) if err != nil { return nil, err } @@ -262,7 +268,7 @@ func (bc *Blockchain) GetMystBalance(mystAddress, identity common.Address) (*big // GetRegistrationFee returns fee required by registry func (bc *Blockchain) GetRegistrationFee(registryAddress common.Address) (*big.Int, error) { // TODO to reduce amount of blockchain calls, it could get registration fee from cache (updated once in a day) - c, err := bindings.NewRegistryCaller(registryAddress, bc.client) + c, err := bindings.NewRegistryCaller(registryAddress, bc.ethClient.Client()) if err != nil { return nil, errors.Wrap(err, "could not get registration fee") } @@ -304,12 +310,12 @@ func (bc *Blockchain) EstimateGas(msg ethereum.CallMsg) (uint64, error) { parent := context.Background() ctx, cancel := context.WithTimeout(parent, bc.bcTimeout) defer cancel() - return bc.client.EstimateGas(ctx, msg) + return bc.ethClient.Client().EstimateGas(ctx, msg) } // RegisterIdentity registers the given identity on blockchain func (bc *Blockchain) RegisterIdentity(rr RegistrationRequest) (*types.Transaction, error) { - transactor, err := bindings.NewRegistryTransactor(rr.RegistryAddress, bc.client) + transactor, err := bindings.NewRegistryTransactor(rr.RegistryAddress, bc.ethClient.Client()) if err != nil { return nil, err } @@ -348,7 +354,7 @@ type TransferRequest struct { // TransferMyst transfers myst func (bc *Blockchain) TransferMyst(req TransferRequest) (tx *types.Transaction, err error) { - transactor, err := bindings.NewMystTokenTransactor(req.MystAddress, bc.client) + transactor, err := bindings.NewMystTokenTransactor(req.MystAddress, bc.ethClient.Client()) if err != nil { return tx, err } @@ -369,7 +375,7 @@ func (bc *Blockchain) TransferMyst(req TransferRequest) (tx *types.Transaction, // IsAccountantRegistered checks if given accountant is registered and returns true or false. func (bc *Blockchain) IsAccountantRegistered(registryAddress, acccountantID common.Address) (bool, error) { - caller, err := bindings.NewRegistryCaller(registryAddress, bc.client) + caller, err := bindings.NewRegistryCaller(registryAddress, bc.ethClient.Client()) if err != nil { return false, err } @@ -384,7 +390,7 @@ func (bc *Blockchain) IsAccountantRegistered(registryAddress, acccountantID comm // GetAccountantOperator returns operator address of given accountant func (bc *Blockchain) GetAccountantOperator(accountantID common.Address) (common.Address, error) { - caller, err := bindings.NewAccountantImplementationCaller(accountantID, bc.client) + caller, err := bindings.NewAccountantImplementationCaller(accountantID, bc.ethClient.Client()) if err != nil { return common.Address{}, err } @@ -407,7 +413,7 @@ type SettleAndRebalanceRequest struct { // SettleAndRebalance is settling given accountant issued promise func (bc *Blockchain) SettleAndRebalance(req SettleAndRebalanceRequest) (*types.Transaction, error) { - transactor, err := bindings.NewAccountantImplementationTransactor(req.AccountantID, bc.client) + transactor, err := bindings.NewAccountantImplementationTransactor(req.AccountantID, bc.ethClient.Client()) if err != nil { return nil, err } @@ -442,7 +448,7 @@ func toBytes32(arr []byte) (res [32]byte) { // GetProviderChannelByID returns the given provider channel information func (bc *Blockchain) GetProviderChannelByID(acc common.Address, chID []byte) (ProviderChannel, error) { - caller, err := bindings.NewAccountantImplementationCaller(acc, bc.client) + caller, err := bindings.NewAccountantImplementationCaller(acc, bc.ethClient.Client()) if err != nil { return ProviderChannel{}, err } @@ -465,7 +471,7 @@ type ConsumersAccountant struct { // GetConsumerChannelsAccountant returns the consumer channels accountant func (bc *Blockchain) GetConsumerChannelsAccountant(channelAddress common.Address) (ConsumersAccountant, error) { - c, err := bindings.NewChannelImplementationCaller(channelAddress, bc.client) + c, err := bindings.NewChannelImplementationCaller(channelAddress, bc.ethClient.Client()) if err != nil { return ConsumersAccountant{}, err } @@ -478,7 +484,7 @@ func (bc *Blockchain) GetConsumerChannelsAccountant(channelAddress common.Addres // GetConsumerChannelOperator returns the consumer channel operator/identity func (bc *Blockchain) GetConsumerChannelOperator(channelAddress common.Address) (common.Address, error) { - c, err := bindings.NewChannelImplementationCaller(channelAddress, bc.client) + c, err := bindings.NewChannelImplementationCaller(channelAddress, bc.ethClient.Client()) if err != nil { return common.Address{}, err } @@ -491,7 +497,7 @@ func (bc *Blockchain) GetConsumerChannelOperator(channelAddress common.Address) // SubscribeToIdentityRegistrationEvents subscribes to identity registration events func (bc *Blockchain) SubscribeToIdentityRegistrationEvents(registryAddress common.Address, accountantIDs []common.Address) (sink chan *bindings.RegistryRegisteredIdentity, cancel func(), err error) { - filterer, err := bindings.NewRegistryFilterer(registryAddress, bc.client) + filterer, err := bindings.NewRegistryFilterer(registryAddress, bc.ethClient.Client()) if err != nil { return sink, cancel, errors.Wrap(err, "could not create registry filterer") } @@ -513,7 +519,7 @@ func (bc *Blockchain) SubscribeToIdentityRegistrationEvents(registryAddress comm // SubscribeToConsumerChannelBalanceUpdate subscribes to consumer channel balance update events func (bc *Blockchain) SubscribeToConsumerChannelBalanceUpdate(mystSCAddress common.Address, channelAddresses []common.Address) (sink chan *bindings.MystTokenTransfer, cancel func(), err error) { - filterer, err := bindings.NewMystTokenFilterer(mystSCAddress, bc.client) + filterer, err := bindings.NewMystTokenFilterer(mystSCAddress, bc.ethClient.Client()) if err != nil { return sink, cancel, errors.Wrap(err, "could not create myst token filterer") } @@ -536,7 +542,7 @@ func (bc *Blockchain) SubscribeToConsumerChannelBalanceUpdate(mystSCAddress comm // SubscribeToProviderChannelBalanceUpdate subscribes to provider channel balance update events func (bc *Blockchain) SubscribeToProviderChannelBalanceUpdate(accountantAddress common.Address, channelAddresses [][32]byte) (sink chan *bindings.AccountantImplementationChannelBalanceUpdated, cancel func(), err error) { - filterer, err := bindings.NewAccountantImplementationFilterer(accountantAddress, bc.client) + filterer, err := bindings.NewAccountantImplementationFilterer(accountantAddress, bc.ethClient.Client()) if err != nil { return sink, cancel, errors.Wrap(err, "could not create accountant implementation filterer") } @@ -566,7 +572,7 @@ type SettleRequest struct { // SettlePromise is settling the given consumer issued promise func (bc *Blockchain) SettlePromise(req SettleRequest) (*types.Transaction, error) { - transactor, err := bindings.NewChannelImplementationTransactor(req.ChannelID, bc.client) + transactor, err := bindings.NewChannelImplementationTransactor(req.ChannelID, bc.ethClient.Client()) if err != nil { return nil, err } @@ -603,7 +609,7 @@ func (bc *Blockchain) getNonce(identity common.Address) (uint64, error) { // SubscribeToChannelOpenedEvents subscribes to provider channel opened events func (bc *Blockchain) SubscribeToChannelOpenedEvents(accountantAddress common.Address) (sink chan *bindings.AccountantImplementationChannelOpened, cancel func(), err error) { - filterer, err := bindings.NewAccountantImplementationFilterer(accountantAddress, bc.client) + filterer, err := bindings.NewAccountantImplementationFilterer(accountantAddress, bc.ethClient.Client()) if err != nil { return sink, cancel, errors.Wrap(err, "could not create accountant implementation filterer") } @@ -626,7 +632,7 @@ func (bc *Blockchain) SubscribeToChannelOpenedEvents(accountantAddress common.Ad // SubscribeToPromiseSettledEventByChannelID subscribes to promise settled events func (bc *Blockchain) SubscribeToPromiseSettledEventByChannelID(accountantID common.Address, providerAddresses [][32]byte) (sink chan *bindings.AccountantImplementationPromiseSettled, cancel func(), err error) { - caller, err := bindings.NewAccountantImplementationFilterer(accountantID, bc.client) + caller, err := bindings.NewAccountantImplementationFilterer(accountantID, bc.ethClient.Client()) if err != nil { return sink, cancel, errors.Wrap(err, "could not create accountant caller") } @@ -653,7 +659,7 @@ func (bc *Blockchain) SubscribeToPromiseSettledEventByChannelID(accountantID com func (bc *Blockchain) NetworkID() (*big.Int, error) { ctx, cancel := context.WithTimeout(context.Background(), bc.bcTimeout) defer cancel() - return bc.client.NetworkID(ctx) + return bc.ethClient.Client().NetworkID(ctx) } // ConsumerChannel represents the consumer channel diff --git a/client/ethclient.go b/client/ethclient.go new file mode 100644 index 0000000..8fd21fc --- /dev/null +++ b/client/ethclient.go @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2020 The "MysteriumNetwork/payments" Authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package client + +import ( + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/ethclient" +) + +// NewReconnectableEthClient creates new ethereum client that can reconnect. +func NewReconnectableEthClient(address string) (*ReconnectableEthClient, error) { + ec, err := ethclient.Dial(address) + if err != nil { + return nil, fmt.Errorf("ethereum client failed to connect: %w", err) + } + + return &ReconnectableEthClient{ + address: address, + client: ec, + }, nil +} + +// ReconnectableEthClient is a ethereum client that can reconnect. +type ReconnectableEthClient struct { + address string + mu sync.Mutex + client *ethclient.Client +} + +// Client returns the currently connected ethereum client. +func (c *ReconnectableEthClient) Client() *ethclient.Client { + c.mu.Lock() + defer c.mu.Unlock() + + return c.client +} + +// Reconnect creates new ethereum client and replaces the current one. +func (c *ReconnectableEthClient) Reconnect() error { + c.mu.Lock() + defer c.mu.Unlock() + + client, err := ethclient.Dial(c.address) + if err != nil { + return fmt.Errorf("ethereum client failed to dial: %w", err) + } + + c.client.Close() + c.client = client + + return nil +} diff --git a/client/ethclient_test.go b/client/ethclient_test.go new file mode 100644 index 0000000..ca42dd8 --- /dev/null +++ b/client/ethclient_test.go @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2020 The "MysteriumNetwork/payments" Authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package client + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReconnectableEthClientCreatesNewClient(t *testing.T) { + client, err := NewReconnectableEthClient("http://127.0.0.1:1234") + assert.Nil(t, err) + + c1 := client.Client() + c2 := client.Client() + assert.Equal(t, c1, c2) + + err = client.Reconnect() + assert.Nil(t, err) + + c3 := client.Client() + assert.NotEqual(t, c1, c3) +}