diff --git a/coin-distribution/DDL.sql b/coin-distribution/DDL.sql index b904fa1..9c12a05 100644 --- a/coin-distribution/DDL.sql +++ b/coin-distribution/DDL.sql @@ -193,7 +193,7 @@ BEGIN string_agg(distinct eth_address,'') AS eth_address, verified from coin_distributions_by_earner - group by day,user_id) AS X; + group by day,user_id,verified) AS X; delete from coin_distributions_by_earner where 1=1; diff --git a/coin-distribution/coin_distribution.go b/coin-distribution/coin_distribution.go index 23b491d..f6b6c72 100644 --- a/coin-distribution/coin_distribution.go +++ b/coin-distribution/coin_distribution.go @@ -147,7 +147,7 @@ func MustStartCoinDistribution(ctx context.Context, _ context.CancelFunc) Client cd := mustCreateCoinDistributionFromConfig(ctx, &cfg, eth) cd.MustStart(ctx, nil) - go startPrepareCoinDistributionsForReviewMonitor(ctx, cd.DB) + go cd.repository.StartPrepareCoinDistributionsForReviewMonitor(ctx) return cd } @@ -155,9 +155,10 @@ func MustStartCoinDistribution(ctx context.Context, _ context.CancelFunc) Client func mustCreateCoinDistributionFromConfig(ctx context.Context, conf *config, ethClient ethClient) *coinDistributer { db := storage.MustConnect(ctx, ddl, applicationYamlKey) cd := &coinDistributer{ - Client: ethClient, - Processor: newCoinProcessor(ethClient, db, conf), - DB: db, + Client: ethClient, + Processor: newCoinProcessor(ethClient, db, conf), + DB: db, + repository: NewRepository(ctx, nil), } return cd @@ -172,6 +173,7 @@ func (cd *coinDistributer) Close() error { errors.Wrap(cd.Processor.Close(), "failed to close processor"), errors.Wrap(cd.Client.Close(), "failed to close eth client"), errors.Wrap(cd.DB.Close(), "failed to close db"), + errors.Wrap(cd.repository.Close(), "failed to close repository"), ).ErrorOrNil() } diff --git a/coin-distribution/contract.go b/coin-distribution/contract.go index 7d97f00..8336e20 100644 --- a/coin-distribution/contract.go +++ b/coin-distribution/contract.go @@ -37,6 +37,7 @@ type ( NotifyCoinDistributionCollectionCycleEnded(ctx context.Context) error GetCollectorSettings(ctx context.Context) (*CollectorSettings, error) CollectCoinDistributionsForReview(ctx context.Context, records []*ByEarnerForReview) error + StartPrepareCoinDistributionsForReviewMonitor(ctx context.Context) } CollectorSettings struct { DeniedCountries map[string]struct{} @@ -124,6 +125,8 @@ const ( configKeyCoinDistributerMsgOnline = "coin_distributer_msg_sent_online_date" configKeyCoinDistributerMsgOffline = "coin_distributer_msg_sent_offline_date" configKeyCoinDistributerMsgFinished = "coin_distributer_msg_sent_finished_date" + + doctorXTenant = "doctorx" ) // . @@ -191,9 +194,10 @@ type ( AirDropper airDropper } coinDistributer struct { - Client ethClient - DB *storage.DB - Processor *coinProcessor + Client ethClient + DB *storage.DB + Processor *coinProcessor + repository Repository } repository struct { cfg *config diff --git a/coin-distribution/pending_review.go b/coin-distribution/pending_review.go index 92dbb22..5963dda 100644 --- a/coin-distribution/pending_review.go +++ b/coin-distribution/pending_review.go @@ -28,9 +28,10 @@ func NewRepository(ctx context.Context, _ context.CancelFunc) Repository { if localCfg.ReviewURL == "" { log.Panic("`review-url` is missing") } + db := storage.MustConnect(ctx, ddl, applicationYamlKey) return &repository{ - db: storage.MustConnect(ctx, ddl, applicationYamlKey), + db: db, cfg: &localCfg, } } @@ -379,7 +380,7 @@ func tryPrepareCoinDistributionsForReview(ctx context.Context, db *storage.DB) e }) } -func startPrepareCoinDistributionsForReviewMonitor(ctx context.Context, db *storage.DB) { +func (r *repository) StartPrepareCoinDistributionsForReviewMonitor(ctx context.Context) { ticker := stdlibtime.NewTicker(30 * stdlibtime.Second) //nolint:gomnd // . defer ticker.Stop() @@ -387,7 +388,7 @@ func startPrepareCoinDistributionsForReviewMonitor(ctx context.Context, db *stor select { case <-ticker.C: reqCtx, cancel := context.WithTimeout(ctx, 10*stdlibtime.Minute) //nolint:gomnd // . - log.Error(errors.Wrap(tryPrepareCoinDistributionsForReview(reqCtx, db), "failed to tryPrepareCoinDistributionsForReview")) + log.Error(errors.Wrap(tryPrepareCoinDistributionsForReview(reqCtx, r.db), "failed to tryPrepareCoinDistributionsForReview")) cancel() case <-ctx.Done(): return diff --git a/miner/miner.go b/miner/miner.go index 625e1e2..e783840 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -77,6 +77,7 @@ func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client { mi.mustInitCoinDistributionCollector(ctx) if isTenantInDistributionMode() { mi.usersRepository = users.New(context.Background(), nil) + go mi.coinDistributionRepository.StartPrepareCoinDistributionsForReviewMonitor(ctx) } for workerNumber := int64(0); workerNumber < cfg.Workers; workerNumber++ {