Skip to content

Commit

Permalink
coin-distribution: use single worker (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
ice-dionysos authored Dec 29, 2023
1 parent 172801f commit e4663d0
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 430 deletions.
29 changes: 23 additions & 6 deletions coin-distribution/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"syscall"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
Expand Down Expand Up @@ -105,14 +106,13 @@ main:
}

retryAfter := handleRPCError(ctx, err)
if retryAfter > 0 {
log.Error(errors.Wrapf(err, "failed to call ethereum RPC (attempt %v), retrying after %v", attempt, retryAfter.String()))
} else {
if retryAfter == 0 {
log.Error(errors.Wrapf(err, "failed to call ethereum RPC (attempt %v), unrecoverable error", attempt))

return val, multierror.Append(errClientUncoverable, err)
}

log.Error(errors.Wrapf(err, "failed to call ethereum RPC (attempt %v), retrying after %v", attempt, retryAfter.String()))
retryTimer := time.NewTimer(retryAfter)
select {
case <-ctx.Done():
Expand Down Expand Up @@ -149,8 +149,6 @@ func (ec *ethClientImpl) AirdropToWallets(opts *bind.TransactOpts, recipients []
tx.Gas(),
len(recipients),
))
// Wait a little to avoid nonce collision with pending transactions.
time.Sleep(time.Second)
}

return tx, err //nolint:wrapcheck //.
Expand All @@ -177,7 +175,7 @@ func (ec *ethClientImpl) Airdrop(ctx context.Context, chanID *big.Int, gas gasGe
opts := ec.CreateTransactionOpts(ctx, gasPrice, chanID, gasLimit)
tx, err := ec.AirdropToWallets(opts, recipients, amounts)
if err != nil {
return "", err //nolint:wrapcheck //.
return "", err
}

return tx.Hash().String(), nil
Expand All @@ -186,6 +184,25 @@ func (ec *ethClientImpl) Airdrop(ctx context.Context, chanID *big.Int, gas gasGe
return maybeRetryRPCRequest(ctx, fn)
}

func (ec *ethClientImpl) TransactionStatus(ctx context.Context, hash string) (ethTxStatus, error) {
return maybeRetryRPCRequest(ctx, func() (ethTxStatus, error) {
receipt, err := ec.RPC.TransactionReceipt(ctx, common.HexToHash(hash))
if err != nil {
if errors.Is(err, ethereum.NotFound) {
return ethTxStatusPending, nil
}

return "", err //nolint:wrapcheck //.
}

if receipt.Status == types.ReceiptStatusSuccessful {
return ethTxStatusSuccessful, nil
}

return ethTxStatusFailed, nil
})
}

func (ec *ethClientImpl) TransactionsStatus(ctx context.Context, hashes []*string) (statuses map[ethTxStatus][]string, err error) { //nolint:funlen //.
elements := make([]rpc.BatchElem, len(hashes)) //nolint:makezero //.
results := make([]*types.Receipt, len(hashes)) //nolint:makezero //.
Expand Down
8 changes: 6 additions & 2 deletions coin-distribution/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,17 @@ func (*mockedDummyEthClient) TransactionsStatus(context.Context, []*string) (map
return nil, nil //nolint:nilnil //.
}

func (*mockedDummyEthClient) TransactionStatus(context.Context, string) (ethTxStatus, error) {
return ethTxStatusSuccessful, nil
}

func (m *mockedAirDropper) AirdropToWallets(opts *bind.TransactOpts, _ []common.Address, _ []*big.Int) (*types.Transaction, error) {
if m.errBefore > 0 {
m.errBefore--

log.Info(fmt.Sprintf("airdropper: error(s) left: %v", m.errBefore))

return nil, &net.OpError{Err: syscall.ECONNRESET} //nolint:wrapcheck //.
return nil, &net.OpError{Err: syscall.ECONNRESET}
}

log.Info(fmt.Sprintf("airdropper: gas price %v, limit %v", opts.GasPrice.String(), opts.GasLimit))
Expand All @@ -79,7 +83,7 @@ func (m *mockedAirDropper) AirdropToWallets(opts *bind.TransactOpts, _ []common.
big.NewInt(0),
nil,
),
nil //nolint:nilnil //.
nil
}

func (m *mockedGasGetter) GetGasOptions(context.Context) (*big.Int, uint64, error) {
Expand Down
7 changes: 2 additions & 5 deletions coin-distribution/coin_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func MustStartCoinDistribution(ctx context.Context, _ context.CancelFunc) Client
eth := mustNewEthClient(ctx, cfg.Ethereum.RPC, cfg.Ethereum.PrivateKey, cfg.Ethereum.ContractAddress)

cd := mustCreateCoinDistributionFromConfig(ctx, &cfg, eth)
cd.MustStart(ctx, nil, nil)
cd.MustStart(ctx, nil)

go startPrepareCoinDistributionsForReviewMonitor(ctx, cd.DB)

Expand All @@ -142,22 +142,19 @@ func mustCreateCoinDistributionFromConfig(ctx context.Context, conf *config, eth
cd := &coinDistributer{
Client: ethClient,
Processor: newCoinProcessor(ethClient, db, conf),
Tracker: newCoinTracker(ethClient, db, conf),
DB: db,
}

return cd
}

func (cd *coinDistributer) MustStart(ctx context.Context, notifyProcessed chan<- *batch, notifyTracked chan<- []*string) {
cd.Tracker.Start(ctx, notifyTracked)
func (cd *coinDistributer) MustStart(ctx context.Context, notifyProcessed chan<- *batch) {
cd.Processor.Start(ctx, notifyProcessed)
}

func (cd *coinDistributer) Close() error {
return multierror.Append( //nolint:wrapcheck //.
errors.Wrap(cd.Processor.Close(), "failed to close processor"),
errors.Wrap(cd.Tracker.Close(), "failed to close tracker"),
errors.Wrap(cd.Client.Close(), "failed to close eth client"),
errors.Wrap(cd.DB.Close(), "failed to close db"),
).ErrorOrNil()
Expand Down
40 changes: 8 additions & 32 deletions coin-distribution/coin_distribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ func TestFullCoinDistribution(t *testing.T) { //nolint:paralleltest,funlen //.

conf := new(config)
conf.Ethereum.ContractAddress = contractAddr
conf.Ethereum.ChainID = 5
conf.Ethereum.ChainID = 97
conf.Ethereum.RPC = rpc
conf.Ethereum.PrivateKey = privateKey
conf.Workers = 2

t.Run("AddPendingEntry", func(t *testing.T) {
db := storage.MustConnect(context.TODO(), ddl, applicationYamlKey)
Expand All @@ -68,39 +67,16 @@ func TestFullCoinDistribution(t *testing.T) { //nolint:paralleltest,funlen //.
defer cd.Close()

chBatches := make(chan *batch, 1)
chTracker := make(chan []*string, 1)
cd.MustStart(context.TODO(), chBatches, chTracker)
cd.MustStart(context.TODO(), chBatches)

t.Logf("waiting for batch to be processed")
var processedBatch *batch
for {
select {
case b := <-chBatches:
t.Logf("batch: %+v processed", b)
for i := range b.Records {
t.Logf("record: %v processed: %v", pointerToString(b.Records[i].EthTX), b.Records[i].EthStatus)
require.Equal(t, ethApiStatusAccepted, b.Records[i].EthStatus)
}
processedBatch = b

case txs := <-chTracker:
require.NotNil(t, processedBatch)
found := false
for i := range txs {
t.Logf("transaction: %v processed", *txs[i])
for recordNum := range processedBatch.Records {
if processedBatch.Records[recordNum].EthTX != nil && *processedBatch.Records[recordNum].EthTX == *txs[i] {
t.Logf("transaction: %+v found in batch", *txs[i])
found = true
}
}
}

require.True(t, found)

return
}
processedBatch := <-chBatches
t.Logf("batch: %+v processed: status %v", processedBatch, processedBatch.Status)
for i := range processedBatch.Records {
t.Logf("record: %v processed: %v", pointerToString(processedBatch.Records[i].EthTX), processedBatch.Records[i].EthStatus)
require.Equal(t, ethApiStatusAccepted, processedBatch.Records[i].EthStatus)
}
require.Equal(t, ethTxStatusSuccessful, processedBatch.Status)
}

func TestDatabaseSetGetValues(t *testing.T) {
Expand Down
3 changes: 0 additions & 3 deletions coin-distribution/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ import (
)

func (cfg *config) EnsureValid() {
if cfg.Workers == 0 {
log.Panic("workers must be > 0")
}
if cfg.Ethereum.ChainID == 0 {
log.Panic("ethereum.chainID must be > 0")
}
Expand Down
27 changes: 8 additions & 19 deletions coin-distribution/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync"
stdlibtime "time"

"github.com/alitto/pond"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -98,7 +97,7 @@ const (
applicationYamlKey = "coin-distribution"
requestDeadline = 25 * stdlibtime.Second

batchSize = 1320
batchSize = 700

gasPriceCacheTTL = stdlibtime.Minute

Expand All @@ -114,6 +113,7 @@ const (

ethTxStatusSuccessful ethTxStatus = "SUCCESSFUL"
ethTxStatusFailed ethTxStatus = "FAILED"
ethTxStatusPending ethTxStatus = "PENDING"

configKeyCoinDistributerEnabled = "coin_distributer_enabled"
configKeyCoinDistributerOnDemand = "coin_distributer_forced_execution"
Expand Down Expand Up @@ -141,6 +141,7 @@ type (
ethClient interface {
SuggestGasPrice(ctx context.Context) (*big.Int, error)
TransactionsStatus(ctx context.Context, hashes []*string) (statuses map[ethTxStatus][]string, err error)
TransactionStatus(ctx context.Context, hash string) (status ethTxStatus, err error)
Airdrop(ctx context.Context, chanID *big.Int, gas gasGetter, recipients []common.Address, amounts []*big.Int) (string, error)
io.Closer
}
Expand All @@ -159,28 +160,18 @@ type (
}
batch struct {
ID string
TX string
Status ethTxStatus
Records []*batchRecord
}
databaseConfig struct {
DB *storage.DB
}
coinTracker struct {
*databaseConfig
Client ethClient
Conf *config
Workers *pond.WorkerPool
CancelSignal chan struct{}
}
coinProcessorWorkerTask struct {
Context context.Context
Result chan error
}
coinProcessor struct {
*databaseConfig
Client ethClient
Conf *config
WG *sync.WaitGroup
ProcessSignal []chan coinProcessorWorkerTask
CancelSignal chan struct{}
gasPriceCache struct {
price *big.Int
Expand All @@ -198,7 +189,6 @@ type (
Client ethClient
DB *storage.DB
Processor *coinProcessor
Tracker *coinTracker
}
repository struct {
cfg *config
Expand All @@ -214,9 +204,8 @@ type (
ContractAddress string `yaml:"contractAddress" mapstructure:"contract-address"`
ChainID int64 `yaml:"chainId" mapstructure:"chain-id"`
} `yaml:"ethereum" mapstructure:"ethereum"`
StartHours int `yaml:"startHours" mapstructure:"start-hours"`
EndHours int `yaml:"endHours" mapstructure:"end-hours"`
Workers int64 `yaml:"workers" mapstructure:"workers"`
Development bool `yaml:"development" mapstructure:"development"`
StartHours int `yaml:"startHours" mapstructure:"start-hours"`
EndHours int `yaml:"endHours" mapstructure:"end-hours"`
Development bool `yaml:"development" mapstructure:"development"`
}
)
Loading

0 comments on commit e4663d0

Please sign in to comment.