diff --git a/coin-distribution/client.go b/coin-distribution/client.go index 8e40ac6..79d7854 100644 --- a/coin-distribution/client.go +++ b/coin-distribution/client.go @@ -13,6 +13,7 @@ import ( "syscall" "time" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -105,14 +106,13 @@ main: } retryAfter := handleRPCError(ctx, err) - if retryAfter > 0 { - log.Error(errors.Wrapf(err, "failed to call ethereum RPC (attempt %v), retrying after %v", attempt, retryAfter.String())) - } else { + if retryAfter == 0 { log.Error(errors.Wrapf(err, "failed to call ethereum RPC (attempt %v), unrecoverable error", attempt)) return val, multierror.Append(errClientUncoverable, err) } + log.Error(errors.Wrapf(err, "failed to call ethereum RPC (attempt %v), retrying after %v", attempt, retryAfter.String())) retryTimer := time.NewTimer(retryAfter) select { case <-ctx.Done(): @@ -149,8 +149,6 @@ func (ec *ethClientImpl) AirdropToWallets(opts *bind.TransactOpts, recipients [] tx.Gas(), len(recipients), )) - // Wait a little to avoid nonce collision with pending transactions. - time.Sleep(time.Second) } return tx, err //nolint:wrapcheck //. @@ -177,7 +175,7 @@ func (ec *ethClientImpl) Airdrop(ctx context.Context, chanID *big.Int, gas gasGe opts := ec.CreateTransactionOpts(ctx, gasPrice, chanID, gasLimit) tx, err := ec.AirdropToWallets(opts, recipients, amounts) if err != nil { - return "", err //nolint:wrapcheck //. + return "", err } return tx.Hash().String(), nil @@ -186,6 +184,25 @@ func (ec *ethClientImpl) Airdrop(ctx context.Context, chanID *big.Int, gas gasGe return maybeRetryRPCRequest(ctx, fn) } +func (ec *ethClientImpl) TransactionStatus(ctx context.Context, hash string) (ethTxStatus, error) { + return maybeRetryRPCRequest(ctx, func() (ethTxStatus, error) { + receipt, err := ec.RPC.TransactionReceipt(ctx, common.HexToHash(hash)) + if err != nil { + if errors.Is(err, ethereum.NotFound) { + return ethTxStatusPending, nil + } + + return "", err //nolint:wrapcheck //. + } + + if receipt.Status == types.ReceiptStatusSuccessful { + return ethTxStatusSuccessful, nil + } + + return ethTxStatusFailed, nil + }) +} + func (ec *ethClientImpl) TransactionsStatus(ctx context.Context, hashes []*string) (statuses map[ethTxStatus][]string, err error) { //nolint:funlen //. elements := make([]rpc.BatchElem, len(hashes)) //nolint:makezero //. results := make([]*types.Receipt, len(hashes)) //nolint:makezero //. diff --git a/coin-distribution/client_test.go b/coin-distribution/client_test.go index 05c6c45..806db18 100644 --- a/coin-distribution/client_test.go +++ b/coin-distribution/client_test.go @@ -60,13 +60,17 @@ func (*mockedDummyEthClient) TransactionsStatus(context.Context, []*string) (map return nil, nil //nolint:nilnil //. } +func (*mockedDummyEthClient) TransactionStatus(context.Context, string) (ethTxStatus, error) { + return ethTxStatusSuccessful, nil +} + func (m *mockedAirDropper) AirdropToWallets(opts *bind.TransactOpts, _ []common.Address, _ []*big.Int) (*types.Transaction, error) { if m.errBefore > 0 { m.errBefore-- log.Info(fmt.Sprintf("airdropper: error(s) left: %v", m.errBefore)) - return nil, &net.OpError{Err: syscall.ECONNRESET} //nolint:wrapcheck //. + return nil, &net.OpError{Err: syscall.ECONNRESET} } log.Info(fmt.Sprintf("airdropper: gas price %v, limit %v", opts.GasPrice.String(), opts.GasLimit)) @@ -79,7 +83,7 @@ func (m *mockedAirDropper) AirdropToWallets(opts *bind.TransactOpts, _ []common. big.NewInt(0), nil, ), - nil //nolint:nilnil //. + nil } func (m *mockedGasGetter) GetGasOptions(context.Context) (*big.Int, uint64, error) { diff --git a/coin-distribution/coin_distribution.go b/coin-distribution/coin_distribution.go index b486651..cd7c71c 100644 --- a/coin-distribution/coin_distribution.go +++ b/coin-distribution/coin_distribution.go @@ -130,7 +130,7 @@ func MustStartCoinDistribution(ctx context.Context, _ context.CancelFunc) Client eth := mustNewEthClient(ctx, cfg.Ethereum.RPC, cfg.Ethereum.PrivateKey, cfg.Ethereum.ContractAddress) cd := mustCreateCoinDistributionFromConfig(ctx, &cfg, eth) - cd.MustStart(ctx, nil, nil) + cd.MustStart(ctx, nil) go startPrepareCoinDistributionsForReviewMonitor(ctx, cd.DB) @@ -142,22 +142,19 @@ func mustCreateCoinDistributionFromConfig(ctx context.Context, conf *config, eth cd := &coinDistributer{ Client: ethClient, Processor: newCoinProcessor(ethClient, db, conf), - Tracker: newCoinTracker(ethClient, db, conf), DB: db, } return cd } -func (cd *coinDistributer) MustStart(ctx context.Context, notifyProcessed chan<- *batch, notifyTracked chan<- []*string) { - cd.Tracker.Start(ctx, notifyTracked) +func (cd *coinDistributer) MustStart(ctx context.Context, notifyProcessed chan<- *batch) { cd.Processor.Start(ctx, notifyProcessed) } func (cd *coinDistributer) Close() error { return multierror.Append( //nolint:wrapcheck //. errors.Wrap(cd.Processor.Close(), "failed to close processor"), - errors.Wrap(cd.Tracker.Close(), "failed to close tracker"), errors.Wrap(cd.Client.Close(), "failed to close eth client"), errors.Wrap(cd.DB.Close(), "failed to close db"), ).ErrorOrNil() diff --git a/coin-distribution/coin_distribution_test.go b/coin-distribution/coin_distribution_test.go index 1b0399f..2a6cdca 100644 --- a/coin-distribution/coin_distribution_test.go +++ b/coin-distribution/coin_distribution_test.go @@ -41,10 +41,9 @@ func TestFullCoinDistribution(t *testing.T) { //nolint:paralleltest,funlen //. conf := new(config) conf.Ethereum.ContractAddress = contractAddr - conf.Ethereum.ChainID = 5 + conf.Ethereum.ChainID = 97 conf.Ethereum.RPC = rpc conf.Ethereum.PrivateKey = privateKey - conf.Workers = 2 t.Run("AddPendingEntry", func(t *testing.T) { db := storage.MustConnect(context.TODO(), ddl, applicationYamlKey) @@ -68,39 +67,16 @@ func TestFullCoinDistribution(t *testing.T) { //nolint:paralleltest,funlen //. defer cd.Close() chBatches := make(chan *batch, 1) - chTracker := make(chan []*string, 1) - cd.MustStart(context.TODO(), chBatches, chTracker) + cd.MustStart(context.TODO(), chBatches) t.Logf("waiting for batch to be processed") - var processedBatch *batch - for { - select { - case b := <-chBatches: - t.Logf("batch: %+v processed", b) - for i := range b.Records { - t.Logf("record: %v processed: %v", pointerToString(b.Records[i].EthTX), b.Records[i].EthStatus) - require.Equal(t, ethApiStatusAccepted, b.Records[i].EthStatus) - } - processedBatch = b - - case txs := <-chTracker: - require.NotNil(t, processedBatch) - found := false - for i := range txs { - t.Logf("transaction: %v processed", *txs[i]) - for recordNum := range processedBatch.Records { - if processedBatch.Records[recordNum].EthTX != nil && *processedBatch.Records[recordNum].EthTX == *txs[i] { - t.Logf("transaction: %+v found in batch", *txs[i]) - found = true - } - } - } - - require.True(t, found) - - return - } + processedBatch := <-chBatches + t.Logf("batch: %+v processed: status %v", processedBatch, processedBatch.Status) + for i := range processedBatch.Records { + t.Logf("record: %v processed: %v", pointerToString(processedBatch.Records[i].EthTX), processedBatch.Records[i].EthStatus) + require.Equal(t, ethApiStatusAccepted, processedBatch.Records[i].EthStatus) } + require.Equal(t, ethTxStatusSuccessful, processedBatch.Status) } func TestDatabaseSetGetValues(t *testing.T) { diff --git a/coin-distribution/config.go b/coin-distribution/config.go index bf6174f..2008b69 100644 --- a/coin-distribution/config.go +++ b/coin-distribution/config.go @@ -10,9 +10,6 @@ import ( ) func (cfg *config) EnsureValid() { - if cfg.Workers == 0 { - log.Panic("workers must be > 0") - } if cfg.Ethereum.ChainID == 0 { log.Panic("ethereum.chainID must be > 0") } diff --git a/coin-distribution/contract.go b/coin-distribution/contract.go index 942d8e9..cbee323 100644 --- a/coin-distribution/contract.go +++ b/coin-distribution/contract.go @@ -12,7 +12,6 @@ import ( "sync" stdlibtime "time" - "github.com/alitto/pond" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -98,7 +97,7 @@ const ( applicationYamlKey = "coin-distribution" requestDeadline = 25 * stdlibtime.Second - batchSize = 1320 + batchSize = 700 gasPriceCacheTTL = stdlibtime.Minute @@ -114,6 +113,7 @@ const ( ethTxStatusSuccessful ethTxStatus = "SUCCESSFUL" ethTxStatusFailed ethTxStatus = "FAILED" + ethTxStatusPending ethTxStatus = "PENDING" configKeyCoinDistributerEnabled = "coin_distributer_enabled" configKeyCoinDistributerOnDemand = "coin_distributer_forced_execution" @@ -141,6 +141,7 @@ type ( ethClient interface { SuggestGasPrice(ctx context.Context) (*big.Int, error) TransactionsStatus(ctx context.Context, hashes []*string) (statuses map[ethTxStatus][]string, err error) + TransactionStatus(ctx context.Context, hash string) (status ethTxStatus, err error) Airdrop(ctx context.Context, chanID *big.Int, gas gasGetter, recipients []common.Address, amounts []*big.Int) (string, error) io.Closer } @@ -159,28 +160,18 @@ type ( } batch struct { ID string + TX string + Status ethTxStatus Records []*batchRecord } databaseConfig struct { DB *storage.DB } - coinTracker struct { - *databaseConfig - Client ethClient - Conf *config - Workers *pond.WorkerPool - CancelSignal chan struct{} - } - coinProcessorWorkerTask struct { - Context context.Context - Result chan error - } coinProcessor struct { *databaseConfig Client ethClient Conf *config WG *sync.WaitGroup - ProcessSignal []chan coinProcessorWorkerTask CancelSignal chan struct{} gasPriceCache struct { price *big.Int @@ -198,7 +189,6 @@ type ( Client ethClient DB *storage.DB Processor *coinProcessor - Tracker *coinTracker } repository struct { cfg *config @@ -214,9 +204,8 @@ type ( ContractAddress string `yaml:"contractAddress" mapstructure:"contract-address"` ChainID int64 `yaml:"chainId" mapstructure:"chain-id"` } `yaml:"ethereum" mapstructure:"ethereum"` - StartHours int `yaml:"startHours" mapstructure:"start-hours"` - EndHours int `yaml:"endHours" mapstructure:"end-hours"` - Workers int64 `yaml:"workers" mapstructure:"workers"` - Development bool `yaml:"development" mapstructure:"development"` + StartHours int `yaml:"startHours" mapstructure:"start-hours"` + EndHours int `yaml:"endHours" mapstructure:"end-hours"` + Development bool `yaml:"development" mapstructure:"development"` } ) diff --git a/coin-distribution/processor.go b/coin-distribution/processor.go index 37f7fdd..b7d7e56 100644 --- a/coin-distribution/processor.go +++ b/coin-distribution/processor.go @@ -23,37 +23,19 @@ func newCoinProcessor(client ethClient, db *storage.DB, conf *config) *coinProce Conf: conf, WG: new(sync.WaitGroup), CancelSignal: make(chan struct{}), - ProcessSignal: make([]chan coinProcessorWorkerTask, conf.Workers), databaseConfig: &databaseConfig{DB: db}, } proc.gasPriceCache.mu = new(sync.RWMutex) proc.gasPriceCache.time = time.New(stdlibtime.Time{}) - for workerNumber := int64(0); workerNumber < proc.Conf.Workers; workerNumber++ { - proc.ProcessSignal[workerNumber] = make(chan coinProcessorWorkerTask, 1) - } - return proc } func (proc *coinProcessor) Start(ctx context.Context, notify chan<- *batch) { - proc.WG.Add(int(proc.Conf.Workers)) - - log.Info(fmt.Sprintf("starting [%d] worker(s) ...", proc.Conf.Workers)) - - for workerNumber := int64(0); workerNumber < proc.Conf.Workers; workerNumber++ { - log.Info(fmt.Sprintf("starting worker [%v]", workerNumber)) - go func(wn int64) { - defer proc.WG.Done() - proc.Worker(ctx, notify, wn) - }(workerNumber) - } - - log.Info("starting controller ...") proc.WG.Add(1) go func() { defer proc.WG.Done() - proc.Controller(ctx) + proc.Controller(ctx, notify) }() } @@ -98,7 +80,7 @@ func (proc *coinProcessor) GetGasPrice(ctx context.Context) (value *big.Int, err return value, nil } -func (proc *coinProcessor) BatchMarkAccepted(ctx context.Context, _ int64, data *batch, txHash string) error { +func (proc *coinProcessor) BatchMarkAccepted(ctx context.Context, data *batch, txHash string) error { const stmt = ` update pending_coin_distributions set @@ -115,7 +97,7 @@ where return errors.Wrapf(err, "failed to mark batch %v with TX %v as accepted", data.ID, txHash) } -func (proc *coinProcessor) BatchMarkRejected(ctx context.Context, _ int64, data *batch) error { +func (proc *coinProcessor) BatchMarkRejected(ctx context.Context, data *batch) error { const stmt = ` update pending_coin_distributions set @@ -130,7 +112,7 @@ where return errors.Wrapf(err, "failed to mark batch %v with as rejected", data.ID) } -func (proc *coinProcessor) BatchPrepareFetch(ctx context.Context, workerNumber int64) (*batch, error) { //nolint:funlen //. +func (proc *coinProcessor) BatchPrepareFetch(ctx context.Context) (*batch, error) { //nolint:funlen //. const stmt = ` with records as ( select @@ -138,11 +120,10 @@ with records as ( from pending_coin_distributions where - eth_status = 'NEW' and - (internal_id % 10) = $1 + eth_status = 'NEW' order by created_at ASC - limit $2 + limit $1 for update skip locked ) update pending_coin_distributions up @@ -155,7 +136,7 @@ where returning up.* ` - result, err := storage.ExecMany[batchRecord](ctx, proc.DB, stmt, workerNumber, batchSize) + result, err := storage.ExecMany[batchRecord](ctx, proc.DB, stmt, batchSize) if err != nil { return nil, errors.Wrap(err, "failed to fetch pending coin distributions") } else if len(result) == 0 { @@ -168,8 +149,41 @@ returning up.* }, nil } +func (ct *coinProcessor) DeleteTransactions(ctx context.Context, hash string) error { + const stmt = `delete from pending_coin_distributions where eth_status = 'ACCEPTED' and eth_tx = $1` + + r, err := storage.Exec(ctx, ct.DB, stmt, hash) + if err != nil { + return errors.Wrap(err, "failed to delete transactions") + } + + log.Info(fmt.Sprintf("transaction: %v: deleted: %v", hash, r)) + + return nil +} + +func (ct *coinProcessor) RejectTransaction(ctx context.Context, hash string) error { + const stmt = ` +update pending_coin_distributions +set + eth_status = 'REJECTED' +where + eth_status = 'ACCEPTED' and + eth_tx = $1 +` + + r, err := storage.Exec(ctx, ct.DB, stmt, hash) + if err != nil { + return errors.Wrap(err, "failed to update transactions") + } + + log.Info(fmt.Sprintf("transaction: %v: rejected: %v", hash, r)) + + return nil +} + func (proc *coinProcessor) GetGasOptions(ctx context.Context) (price *big.Int, limit uint64, err error) { - gasOverride, _ := proc.GetGasPriceOverride(ctx) + gasOverride, err := proc.GetGasPriceOverride(ctx) if err != nil { return nil, 0, err } @@ -191,11 +205,10 @@ func (proc *coinProcessor) GetGasOptions(ctx context.Context) (price *big.Int, l return price, limit, nil } -func (proc *coinProcessor) Distribute(ctx context.Context, num int64, data *batch) (string, error) { +func (proc *coinProcessor) Distribute(ctx context.Context, data *batch) (string, error) { recipients, amounts := data.Prepare() for recordNum := range data.Records { - log.Info(fmt.Sprintf("worker [%v]: batch %v: distributing %v iceflakes to address %v for user %q", - num, + log.Info(fmt.Sprintf("batch %v: distributing %v iceflakes to address %v for user %q", data.ID, data.Records[recordNum].Iceflakes, data.Records[recordNum].EthAddress, @@ -205,36 +218,36 @@ func (proc *coinProcessor) Distribute(ctx context.Context, num int64, data *batc txHash, err := proc.Client.Airdrop(ctx, big.NewInt(proc.Conf.Ethereum.ChainID), proc, recipients, amounts) if err != nil { - log.Error(errors.Wrapf(err, "worker [%v]: batch %v: failed to run contract", num, data.ID)) + log.Error(errors.Wrapf(err, "batch %v: failed to run contract", data.ID)) return "", errors.Wrapf(err, "failed to run contract on batch %v", data.ID) } - log.Info(fmt.Sprintf("worker [%v]: batch %v: transaction hash: %v", num, data.ID, txHash)) + log.Info(fmt.Sprintf("batch %v: transaction hash: %v", data.ID, txHash)) return txHash, nil } -func (proc *coinProcessor) Do(ctx context.Context, num int64) (*batch, error) { - data, err := proc.BatchPrepareFetch(ctx, num) +func (proc *coinProcessor) Do(ctx context.Context) (*batch, error) { + data, err := proc.BatchPrepareFetch(ctx) if err != nil { return nil, err } - txHash, err := proc.Distribute(ctx, num, data) + txHash, err := proc.Distribute(ctx, data) if err != nil { - err = errors.Wrapf(err, "worker [%v]: failed to distribute batch", num) + err = errors.Wrapf(err, "failed to distribute batch") log.Error(err) - if err2 := proc.BatchMarkRejected(ctx, num, data); err2 != nil { - log.Error(errors.Wrapf(err2, "worker [%v]: failed to mark batch %v as rejected", num, data.ID)) + if err2 := proc.BatchMarkRejected(ctx, data); err2 != nil { + log.Error(errors.Wrapf(err2, "failed to mark batch %v as rejected", data.ID)) } - proc.MustDisable(err.Error()) return data, err } - if err = proc.BatchMarkAccepted(ctx, num, data, txHash); err != nil { - log.Error(errors.Wrapf(err, "worker [%v]: failed to mark batch %v as accepted", num, data.ID)) + data.TX = txHash + if err = proc.BatchMarkAccepted(ctx, data, txHash); err != nil { + log.Error(errors.Wrapf(err, "failed to mark batch %v as accepted", data.ID)) return data, err } @@ -268,8 +281,8 @@ func (proc *coinProcessor) GetAction(ctx context.Context) workerAction { return workerActionRun } -func (proc *coinProcessor) Controller(ctx context.Context) { - const tickInternal = stdlibtime.Second * 30 +func (proc *coinProcessor) Controller(ctx context.Context, notify chan<- *batch) { + const tickInternal = stdlibtime.Minute log.Info("controller started") defer log.Info("controller stopped") @@ -285,7 +298,6 @@ func (proc *coinProcessor) Controller(ctx context.Context) { select { case signals <- struct{}{}: default: - log.Warn("controller: signal channel is full") } } }() @@ -335,7 +347,7 @@ func (proc *coinProcessor) Controller(ctx context.Context) { } log.Info(fmt.Sprintf("controller: running action %v", action)) - err := proc.RunWorkers(ctx) + err := proc.RunDistribution(ctx, notify) if err != nil { log.Error(errors.Wrapf(err, "controller: worker(s) failed")) proc.MustDisable(err.Error()) @@ -343,111 +355,72 @@ func (proc *coinProcessor) Controller(ctx context.Context) { log.Error(errors.Wrapf(sendCurrentCoinDistributionsFinishedBeingSentToEthereumSlackMessage(ctx), "failed to sendCurrentCoinDistributionsFinishedBeingSentToEthereumSlackMessage")) } + log.Info(fmt.Sprintf("controller: action %v finished", action)) } } } -func (proc *coinProcessor) RunWorkers(ctx context.Context) error { - taskContext, cancel := context.WithCancel(ctx) - defer cancel() +func (proc *coinProcessor) WaitForTransaction(ctx context.Context, hash string) (ethTxStatus, error) { + now := time.Now() - reports := make(chan error, proc.Conf.Workers) - task := coinProcessorWorkerTask{Context: taskContext, Result: reports} - - notified := 0 - for workerNumber := int64(0); workerNumber < proc.Conf.Workers; workerNumber++ { - select { - case proc.ProcessSignal[workerNumber] <- task: - log.Info(fmt.Sprintf("controller: worker [%v]: notified", workerNumber)) - notified++ - - default: - log.Info(fmt.Sprintf("controller: worker [%v]: channel is full", workerNumber)) + for ctx.Err() == nil { + status, err := proc.Client.TransactionStatus(ctx, hash) + if err != nil { + return "", errors.Wrapf(err, "failed to get transaction status: %v", hash) } - } - log.Info(fmt.Sprintf("controller: waiting for %v worker(s) to finish ...", notified)) - for i := 0; i < notified; i++ { - err := <-reports - switch { - case err == nil || errors.Is(err, errNotEnoughData): - // Expected, do nothing. + if status == ethTxStatusPending { + stdlibtime.Sleep(stdlibtime.Second * 3) - case errors.Is(err, errClientUncoverable): - log.Error(errors.Wrap(err, "controller: unrecoverable error detected, stopping workers")) - cancel() + continue + } - return err + log.Info(fmt.Sprintf("transaction %v: status: %v, duration: %v", hash, status, stdlibtime.Since(*now.Time))) - default: - log.Error(errors.Wrapf(err, "controller: failed to process batch")) - } + return status, nil } - log.Info("controller: all workers finished") - - return nil + return "", ctx.Err() } -func (proc *coinProcessor) Worker(ctx context.Context, notify chan<- *batch, num int64) { //nolint:funlen //. - log.Info(fmt.Sprintf("worker [%v]: started", num)) - defer log.Info(fmt.Sprintf("worker [%v]: stopped", num)) - - signals := make(chan coinProcessorWorkerTask, 1) - go func() { - for val := range proc.ProcessSignal[num] { - select { - case signals <- val: - default: - log.Warn(fmt.Sprintf("worker [%v]: signal channel is full", num)) +func (proc *coinProcessor) RunDistribution(ctx context.Context, notify chan<- *batch) error { + for it := 1; ctx.Err() == nil; it++ { + log.Info(fmt.Sprintf("distribution: iteration %v", it)) + b, err := proc.Do(context.WithoutCancel(ctx)) + if err != nil { + if errors.Is(err, errNotEnoughData) { + err = nil } - } - }() -main: - for { - select { - case <-ctx.Done(): - log.Info(fmt.Sprintf("worker [%v]: %v", num, ctx.Err())) - - return - - case <-proc.CancelSignal: - log.Info(fmt.Sprintf("worker [%v]: exit signal", num)) - - return + return err + } - case task := <-signals: - data, err := proc.Do(task.Context, num) - if data != nil { - sendNotify(notify, data) - } - if err != nil { - if !errors.Is(err, errNotEnoughData) { - err = errors.Wrapf(err, "worker [%v]: failed to process batch %v", num, data.ID) - log.Error(err) - } + status, err := proc.WaitForTransaction(ctx, b.TX) + if err != nil { + return err + } - select { - case task.Result <- err: - case <-task.Context.Done(): - log.Warn(fmt.Sprintf("worker [%v]: cannot send error report: %v: %v", num, err, task.Context.Err())) + b.Status = status + sendNotify(notify, b) - continue main - } + switch status { + case ethTxStatusSuccessful: + err = proc.DeleteTransactions(ctx, b.TX) - continue - } + case ethTxStatusFailed: + err = proc.RejectTransaction(ctx, b.TX) + proc.MustDisable(fmt.Sprintf("transaction %v failed", b.TX)) - select { - case signals <- task: - case <-task.Context.Done(): - log.Info(fmt.Sprintf("worker [%v]: action context: %v", num, task.Context.Err())) + default: + log.Panic(fmt.Sprintf("unexpected transaction status: %v", status)) + } - continue main - } + if err != nil { + return errors.Wrapf(err, "failed to update transaction status: %v", b.TX) } } + + return ctx.Err() } // isInTimeWindow checks if current hour is in time window [startHour, endHour]. @@ -471,9 +444,6 @@ func (proc *coinProcessor) isBlocked() bool { func (proc *coinProcessor) Close() error { close(proc.CancelSignal) - for workerNumber := int64(0); workerNumber < proc.Conf.Workers; workerNumber++ { - close(proc.ProcessSignal[workerNumber]) - } log.Info("waiting for workers to stop ...") proc.WG.Wait() diff --git a/coin-distribution/processor_test.go b/coin-distribution/processor_test.go index 6f90b19..9522f04 100644 --- a/coin-distribution/processor_test.go +++ b/coin-distribution/processor_test.go @@ -17,6 +17,8 @@ import ( ) func maybeSkipTest(t *testing.T) { + t.Helper() + run := os.Getenv("TEST_RUN_WITH_DATABASE") if run != "yes" { t.Skip("TEST_RUN_WITH_DATABASE is not set to 'yes'") @@ -70,12 +72,12 @@ func TestBatchPrepareFetch(t *testing.T) { //nolint:paralleltest //. helperTruncatePendingTransactions(ctx, t, proc.DB) t.Run("NotEnoughData", func(t *testing.T) { //nolint:paralleltest //. - _, err := proc.BatchPrepareFetch(ctx, 1) + _, err := proc.BatchPrepareFetch(ctx) require.ErrorIs(t, err, errNotEnoughData) }) t.Run("Fetch", func(t *testing.T) { //nolint:paralleltest //. helperAddNewPendingTransaction(ctx, t, proc, 100) - b, err := proc.BatchPrepareFetch(ctx, 1) + b, err := proc.BatchPrepareFetch(ctx) require.NoError(t, err) require.Len(t, b.Records, 100) for _, r := range b.Records { @@ -119,7 +121,7 @@ func TestGetGasPrice(t *testing.T) { //nolint:tparallel //. func TestProcessorDistributeAccepted(t *testing.T) { //nolint:paralleltest //. maybeSkipTest(t) ctx := context.TODO() - proc := newCoinProcessor(new(mockedDummyEthClient), storage.MustConnect(ctx, ddl, applicationYamlKey), &config{Workers: 2}) + proc := newCoinProcessor(new(mockedDummyEthClient), storage.MustConnect(ctx, ddl, applicationYamlKey), &config{}) require.NotNil(t, proc) defer proc.Close() @@ -142,7 +144,7 @@ func TestProcessorDistributeRejected(t *testing.T) { //nolint:paralleltest //. ctx := context.TODO() proc := newCoinProcessor(&mockedDummyEthClient{dropErr: errors.New("drop error")}, //nolint:goerr113 //. storage.MustConnect(ctx, ddl, applicationYamlKey), - &config{Workers: 2}, + &config{}, ) require.NotNil(t, proc) defer proc.Close() @@ -198,7 +200,6 @@ func TestProcessorTriggerOnDemand(t *testing.T) { //nolint:paralleltest //. proc := newCoinProcessor(&mockedDummyEthClient{}, storage.MustConnect(ctx, ddl, applicationYamlKey), &config{ - Workers: 2, StartHours: now.Hour() - 2, EndHours: now.Hour() - 1, }, diff --git a/coin-distribution/tracker.go b/coin-distribution/tracker.go deleted file mode 100644 index 1896198..0000000 --- a/coin-distribution/tracker.go +++ /dev/null @@ -1,215 +0,0 @@ -// SPDX-License-Identifier: ice License 1.0 - -package coindistribution - -import ( - "context" - "fmt" - "runtime" - stdlibtime "time" - - "github.com/alitto/pond" - "github.com/hashicorp/go-multierror" - "github.com/pkg/errors" - - "github.com/ice-blockchain/wintr/connectors/storage/v2" - "github.com/ice-blockchain/wintr/log" -) - -func newCoinTracker(client ethClient, db *storage.DB, conf *config) *coinTracker { - const workersMin = 4 - workersMax := runtime.NumCPU() * 4 //nolint:gomnd //. - - return &coinTracker{ - Client: client, - Conf: conf, - CancelSignal: make(chan struct{}), - Workers: pond.New(workersMax, 0, pond.MinWorkers(workersMin)), - databaseConfig: &databaseConfig{DB: db}, - } -} - -func (ct *coinTracker) DeleteTransactions(ctx context.Context, hashes []string) error { - const stmt = `delete from pending_coin_distributions where eth_status = 'ACCEPTED' and eth_tx = any($1)` - - r, err := storage.Exec(ctx, ct.DB, stmt, hashes) - if err != nil { - return errors.Wrap(err, "failed to delete transactions") - } - - log.Info(fmt.Sprintf("deleted %d transaction(s): %v", r, hashes)) - - return nil -} - -func (ct *coinTracker) RejectTransactions(ctx context.Context, hashes []string) error { - const stmt = ` -update pending_coin_distributions -set - eth_status = 'REJECTED' -where - eth_status = 'ACCEPTED' and - eth_tx = any($1) -` - - r, err := storage.Exec(ctx, ct.DB, stmt, hashes) - if err != nil { - return errors.Wrap(err, "failed to update transactions") - } - - log.Info(fmt.Sprintf("rejected %d transaction(s): %v", r, hashes)) - - return nil -} - -func (ct *coinTracker) ProcessTransactionsHashes(ctx context.Context, hashes []*string) (err error) { - statuses, err := ct.Client.TransactionsStatus(ctx, hashes) - if err != nil { - return errors.Wrap(err, "failed to get transaction statuses") - } - - if _, ok := statuses[ethTxStatusSuccessful]; ok { - if deleteErr := ct.DeleteTransactions(ctx, statuses[ethTxStatusSuccessful]); deleteErr != nil { - err = multierror.Append(err, deleteErr) - } - delete(statuses, ethTxStatusSuccessful) - } - - if _, ok := statuses[ethTxStatusFailed]; ok { - if rejectErr := ct.RejectTransactions(ctx, statuses[ethTxStatusFailed]); rejectErr != nil { - err = multierror.Append(err, rejectErr) - } - log.Info("some transactions failed, disabling coinDistributer") - msg := "rejected" - if err != nil { - msg = err.Error() - } - ct.MustDisable(fmt.Sprintf("some transactions failed: %v", msg)) - delete(statuses, ethTxStatusFailed) - } - - for status, hashes := range statuses { - log.Warn(fmt.Sprintf("found %d unexpected transaction(s) with status %v: %v", len(hashes), status, hashes)) - } - - return err //nolint:wrapcheck //. -} - -func (ct *coinTracker) StartChecker(ctx context.Context, notify chan<- []*string) { - const checkInterval = stdlibtime.Minute - - ticker := stdlibtime.NewTicker(checkInterval) - defer ticker.Stop() - - signals := make(chan struct{}, 1) - signals <- struct{}{} - - go func() { - for range ticker.C { - select { - case signals <- struct{}{}: - default: - log.Warn("checker: signal channel is full") - } - } - }() - - hadWork := false - for { - select { - case <-ctx.Done(): - log.Info(fmt.Sprintf("stopping checker due to context: %v", ctx.Err())) - - return - - case <-ct.CancelSignal: - log.Info("stopping checker due to cancel signal") - - return - - case <-signals: - if hadWork && !ct.HasPendingTransactions(ctx, ethApiStatusAccepted) { - hadWork = false - log.Error(sendAllCurrentCoinDistributionsWereCommittedInEthereumSlackMessage(ctx), - "failed to sendAllCurrentCoinDistributionsWereCommittedInEthereumSlackMessage") - } - - hasWork, err := ct.Do(ctx, notify) - if err != nil { - log.Error(errors.Wrap(err, "failed to check accepted transactions")) - } else { - hadWork = hadWork || hasWork - } - } - } -} - -func (ct *coinTracker) Do(ctx context.Context, notify chan<- []*string) (submitted bool, err error) { - const limit = 100 - var offset uint - - for ctx.Err() == nil { - hashes, err := ct.FetchTransactions(ctx, limit, offset) - if err != nil { - return false, errors.Wrap(err, "failed to fetch transactions") - } - if len(hashes) == 0 { - log.Debug("no transactions found to check") - - return submitted, nil - } - - ct.Workers.Submit(func() { - err = ct.ProcessTransactionsHashes(ctx, hashes) - if err != nil { - log.Error(errors.Wrap(err, "failed to process transactions")) - - return - } - - sendNotify(notify, hashes) - }) - - submitted = true - offset += uint(len(hashes)) - } - - return submitted, nil -} - -func (ct *coinTracker) FetchTransactions(ctx context.Context, limit, offset uint) ([]*string, error) { - const stmt = ` -select - distinct on (eth_tx) eth_tx -from - pending_coin_distributions -where - eth_status = 'ACCEPTED' and - eth_tx is not null -order by eth_tx, created_at asc -limit $1 -offset $2 -` - - rows, err := storage.ExecMany[string](ctx, ct.DB, stmt, limit, offset) - if err != nil { - return nil, errors.Wrap(err, "failed to fetch transactions") - } else if len(rows) == 0 { - return []*string{}, nil - } - - log.Info(fmt.Sprintf("found %d accepted transaction(s) with limit %v and offset %v", len(rows), limit, offset)) - - return rows, nil -} - -func (ct *coinTracker) Start(ctx context.Context, notify chan<- []*string) { - go ct.StartChecker(ctx, notify) -} - -func (ct *coinTracker) Close() error { - close(ct.CancelSignal) - ct.Workers.StopAndWait() - - return nil -} diff --git a/go.mod b/go.mod index f70df07..5ddd40e 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/redis/go-redis/v9 v9.3.1 github.com/stretchr/testify v1.8.4 github.com/swaggo/swag v1.16.2 - github.com/testcontainers/testcontainers-go v0.26.0 + github.com/testcontainers/testcontainers-go v0.27.0 go.uber.org/zap v1.26.0 golang.org/x/exp v0.0.0-20231226003508-02704c960a9b ) @@ -80,7 +80,7 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect - github.com/gballet/go-verkle v0.1.1-0.20231031103413-a67434b50f46 // indirect + github.com/gballet/go-verkle v0.1.1-0.20231130133252-5a8a683fa581 // indirect github.com/georgysavva/scany/v2 v2.0.0 // indirect github.com/getsentry/sentry-go v0.25.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect @@ -179,7 +179,7 @@ require ( github.com/supranational/blst v0.3.11 // indirect github.com/swaggo/files v1.0.1 // indirect github.com/swaggo/gin-swagger v1.6.0 // indirect - github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect + github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tklauser/go-sysconf v0.3.13 // indirect github.com/tklauser/numcpus v0.7.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect diff --git a/go.sum b/go.sum index a2b2780..e2aabc4 100644 --- a/go.sum +++ b/go.sum @@ -74,6 +74,9 @@ github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d/go.mod h1:8EPpV github.com/chenzhuoyu/iasm v0.9.0/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= github.com/chenzhuoyu/iasm v0.9.1 h1:tUHQJXo3NhBqw6s33wkGn9SP3bvrWLdlVIJ3hQBL7P0= github.com/chenzhuoyu/iasm v0.9.1/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudflare/circl v1.3.6 h1:/xbKIqSHbZXHwkhbrhrt2YOHIwYJlXH94E3tI/gDlUg= github.com/cloudflare/circl v1.3.6/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA= @@ -156,14 +159,15 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff h1:tY80oXqGNY4FhTFhk+o9oFHGINQ/+vhlm8HFzi6znCI= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww= -github.com/gballet/go-verkle v0.1.1-0.20231031103413-a67434b50f46 h1:BAIP2GihuqhwdILrV+7GJel5lyPV3u1+PgzrWLc0TkE= -github.com/gballet/go-verkle v0.1.1-0.20231031103413-a67434b50f46/go.mod h1:QNpY22eby74jVhqH4WhDLDwxc/vqsern6pW+u2kbkpc= +github.com/gballet/go-verkle v0.1.1-0.20231130133252-5a8a683fa581 h1:nKbTGOGvrSo4ei3AlYN+Cbnp23AzgOl7nezMsZEIyDI= +github.com/gballet/go-verkle v0.1.1-0.20231130133252-5a8a683fa581/go.mod h1:OzHSBt37xRRHc27lb9PaCldBnJYQZP8KcMdYyOB2dtU= github.com/georgysavva/scany/v2 v2.0.0 h1:RGXqxDv4row7/FYoK8MRXAZXqoWF/NM+NP0q50k3DKU= github.com/georgysavva/scany/v2 v2.0.0/go.mod h1:sigOdh+0qb/+aOs3TVhehVT10p8qJL7K/Zhyz8vWo38= github.com/getsentry/sentry-go v0.25.0 h1:q6Eo+hS+yoJlTO3uu/azhQadsD8V+jQn2D8VvX1eOyI= @@ -208,6 +212,7 @@ github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= @@ -260,6 +265,7 @@ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20231229022155-5aaadb5f27d9 h1:jmZrGBc6nAoZ1WUm6rDBnykKw8oXMmGCi1gc3nseeG4= github.com/google/pprof v0.0.0-20231229022155-5aaadb5f27d9/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= @@ -296,6 +302,7 @@ github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXei github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= +github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ice-blockchain/eskimo v1.233.0 h1:WnszLYF5BgoYP13g8STi1JWeW3jEP7isUEL4vyvdf7s= github.com/ice-blockchain/eskimo v1.233.0/go.mod h1:mxLeKAV454IQWMhmQyYASdMaztKpdRxAOopza2zFBag= github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb h1:8TnFP3mc7O+tc44kv2e0/TpZKnEVUaKH+UstwfBwRkk= @@ -378,20 +385,25 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= -github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA= -github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c= github.com/onsi/ginkgo/v2 v2.13.2 h1:Bi2gGVkfn6gQcjNjZJVO8Gf0FHzMPf2phUei9tejVMs= github.com/onsi/ginkgo/v2 v2.13.2/go.mod h1:XStQ8QcGwLyF4HdfcZB8SFOS/MWCgDuXMSBe6zrvLgM= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -477,9 +489,11 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= @@ -495,8 +509,8 @@ github.com/swaggo/gin-swagger v1.6.0 h1:y8sxvQ3E20/RCyrXeFfg60r6H0Z+SwpTjMYsMm+z github.com/swaggo/gin-swagger v1.6.0/go.mod h1:BG00cCEy294xtVpyIAHG6+e2Qzj/xKlRdOqDkvq0uzo= github.com/swaggo/swag v1.16.2 h1:28Pp+8DkQoV+HLzLx8RGJZXNGKbFqnuvSbAAtoxiY04= github.com/swaggo/swag v1.16.2/go.mod h1:6YzXnDcpr0767iOejs318CwYkCQqyGer6BizOg03f+E= -github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= -github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= +github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d h1:vfofYNRScrDdvS342BElfbETmL1Aiz3i2t0zfRj16Hs= +github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48= github.com/testcontainers/testcontainers-go v0.15.0 h1:3Ex7PUGFv0b2bBsdOv6R42+SK2qoZnWBd21LvZYhUtQ= github.com/testcontainers/testcontainers-go v0.15.0/go.mod h1:PkohMRH2X8Hib0IWtifVexDfLPVT+tb5E9hsf7cW12w= github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4= @@ -584,10 +598,12 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= @@ -611,13 +627,16 @@ golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -633,8 +652,8 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= @@ -649,6 +668,7 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA= @@ -657,6 +677,7 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= google.golang.org/api v0.154.0 h1:X7QkVKZBskztmpPKWQXgjJRPA2dJYrL6r+sYPRLj050= @@ -707,6 +728,7 @@ gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXL gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=