From f0d03defe787613a24294000ee9fb7a85abd261c Mon Sep 17 00:00:00 2001 From: ice-myles <96409608+ice-myles@users.noreply.github.com> Date: Thu, 5 Dec 2024 16:27:11 +0300 Subject: [PATCH 1/2] startPrepareCoinDistributionsForReviewMonitor for docx tenant. Fixed prepare_coin_distributions_for_review procedure. --- coin-distribution/DDL.sql | 2 +- coin-distribution/contract.go | 2 ++ coin-distribution/pending_review.go | 8 ++++++-- miner/miner.go | 2 +- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/coin-distribution/DDL.sql b/coin-distribution/DDL.sql index b904fa1..9c12a05 100644 --- a/coin-distribution/DDL.sql +++ b/coin-distribution/DDL.sql @@ -193,7 +193,7 @@ BEGIN string_agg(distinct eth_address,'') AS eth_address, verified from coin_distributions_by_earner - group by day,user_id) AS X; + group by day,user_id,verified) AS X; delete from coin_distributions_by_earner where 1=1; diff --git a/coin-distribution/contract.go b/coin-distribution/contract.go index 7d97f00..7996f5c 100644 --- a/coin-distribution/contract.go +++ b/coin-distribution/contract.go @@ -124,6 +124,8 @@ const ( configKeyCoinDistributerMsgOnline = "coin_distributer_msg_sent_online_date" configKeyCoinDistributerMsgOffline = "coin_distributer_msg_sent_offline_date" configKeyCoinDistributerMsgFinished = "coin_distributer_msg_sent_finished_date" + + doctorXTenant = "doctorx" ) // . diff --git a/coin-distribution/pending_review.go b/coin-distribution/pending_review.go index 92dbb22..3ab6f3f 100644 --- a/coin-distribution/pending_review.go +++ b/coin-distribution/pending_review.go @@ -16,7 +16,7 @@ import ( "github.com/ice-blockchain/wintr/time" ) -func NewRepository(ctx context.Context, _ context.CancelFunc) Repository { +func NewRepository(ctx context.Context, _ context.CancelFunc, tenantName string) Repository { var localCfg config appcfg.MustLoadFromKey(applicationYamlKey, &localCfg) if localCfg.AlertSlackWebhook == "" { @@ -28,9 +28,13 @@ func NewRepository(ctx context.Context, _ context.CancelFunc) Repository { if localCfg.ReviewURL == "" { log.Panic("`review-url` is missing") } + db := storage.MustConnect(ctx, ddl, applicationYamlKey) + if tenantName == doctorXTenant { + go startPrepareCoinDistributionsForReviewMonitor(ctx, db) + } return &repository{ - db: storage.MustConnect(ctx, ddl, applicationYamlKey), + db: db, cfg: &localCfg, } } diff --git a/miner/miner.go b/miner/miner.go index 625e1e2..4d5b53a 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -62,7 +62,7 @@ func init() { func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client { mi := &miner{ - coinDistributionRepository: coindistribution.NewRepository(context.Background(), func() {}), + coinDistributionRepository: coindistribution.NewRepository(context.Background(), func() {}, cfg.Tenant), mb: messagebroker.MustConnect(context.Background(), parentApplicationYamlKey), db: storage.MustConnect(context.Background(), parentApplicationYamlKey, int(cfg.Workers)), dwhClient: dwh.MustConnect(context.Background(), applicationYamlKey), From 109d68cbb86b2e846f4aa9c8b4798993097713cd Mon Sep 17 00:00:00 2001 From: ice-myles <96409608+ice-myles@users.noreply.github.com> Date: Thu, 5 Dec 2024 16:41:40 +0300 Subject: [PATCH 2/2] Move startPrepareCoinDistributionsForReviewMonitor to repository. --- coin-distribution/coin_distribution.go | 10 ++++++---- coin-distribution/contract.go | 8 +++++--- coin-distribution/pending_review.go | 9 +++------ miner/miner.go | 3 ++- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/coin-distribution/coin_distribution.go b/coin-distribution/coin_distribution.go index 23b491d..f6b6c72 100644 --- a/coin-distribution/coin_distribution.go +++ b/coin-distribution/coin_distribution.go @@ -147,7 +147,7 @@ func MustStartCoinDistribution(ctx context.Context, _ context.CancelFunc) Client cd := mustCreateCoinDistributionFromConfig(ctx, &cfg, eth) cd.MustStart(ctx, nil) - go startPrepareCoinDistributionsForReviewMonitor(ctx, cd.DB) + go cd.repository.StartPrepareCoinDistributionsForReviewMonitor(ctx) return cd } @@ -155,9 +155,10 @@ func MustStartCoinDistribution(ctx context.Context, _ context.CancelFunc) Client func mustCreateCoinDistributionFromConfig(ctx context.Context, conf *config, ethClient ethClient) *coinDistributer { db := storage.MustConnect(ctx, ddl, applicationYamlKey) cd := &coinDistributer{ - Client: ethClient, - Processor: newCoinProcessor(ethClient, db, conf), - DB: db, + Client: ethClient, + Processor: newCoinProcessor(ethClient, db, conf), + DB: db, + repository: NewRepository(ctx, nil), } return cd @@ -172,6 +173,7 @@ func (cd *coinDistributer) Close() error { errors.Wrap(cd.Processor.Close(), "failed to close processor"), errors.Wrap(cd.Client.Close(), "failed to close eth client"), errors.Wrap(cd.DB.Close(), "failed to close db"), + errors.Wrap(cd.repository.Close(), "failed to close repository"), ).ErrorOrNil() } diff --git a/coin-distribution/contract.go b/coin-distribution/contract.go index 7996f5c..8336e20 100644 --- a/coin-distribution/contract.go +++ b/coin-distribution/contract.go @@ -37,6 +37,7 @@ type ( NotifyCoinDistributionCollectionCycleEnded(ctx context.Context) error GetCollectorSettings(ctx context.Context) (*CollectorSettings, error) CollectCoinDistributionsForReview(ctx context.Context, records []*ByEarnerForReview) error + StartPrepareCoinDistributionsForReviewMonitor(ctx context.Context) } CollectorSettings struct { DeniedCountries map[string]struct{} @@ -193,9 +194,10 @@ type ( AirDropper airDropper } coinDistributer struct { - Client ethClient - DB *storage.DB - Processor *coinProcessor + Client ethClient + DB *storage.DB + Processor *coinProcessor + repository Repository } repository struct { cfg *config diff --git a/coin-distribution/pending_review.go b/coin-distribution/pending_review.go index 3ab6f3f..5963dda 100644 --- a/coin-distribution/pending_review.go +++ b/coin-distribution/pending_review.go @@ -16,7 +16,7 @@ import ( "github.com/ice-blockchain/wintr/time" ) -func NewRepository(ctx context.Context, _ context.CancelFunc, tenantName string) Repository { +func NewRepository(ctx context.Context, _ context.CancelFunc) Repository { var localCfg config appcfg.MustLoadFromKey(applicationYamlKey, &localCfg) if localCfg.AlertSlackWebhook == "" { @@ -29,9 +29,6 @@ func NewRepository(ctx context.Context, _ context.CancelFunc, tenantName string) log.Panic("`review-url` is missing") } db := storage.MustConnect(ctx, ddl, applicationYamlKey) - if tenantName == doctorXTenant { - go startPrepareCoinDistributionsForReviewMonitor(ctx, db) - } return &repository{ db: db, @@ -383,7 +380,7 @@ func tryPrepareCoinDistributionsForReview(ctx context.Context, db *storage.DB) e }) } -func startPrepareCoinDistributionsForReviewMonitor(ctx context.Context, db *storage.DB) { +func (r *repository) StartPrepareCoinDistributionsForReviewMonitor(ctx context.Context) { ticker := stdlibtime.NewTicker(30 * stdlibtime.Second) //nolint:gomnd // . defer ticker.Stop() @@ -391,7 +388,7 @@ func startPrepareCoinDistributionsForReviewMonitor(ctx context.Context, db *stor select { case <-ticker.C: reqCtx, cancel := context.WithTimeout(ctx, 10*stdlibtime.Minute) //nolint:gomnd // . - log.Error(errors.Wrap(tryPrepareCoinDistributionsForReview(reqCtx, db), "failed to tryPrepareCoinDistributionsForReview")) + log.Error(errors.Wrap(tryPrepareCoinDistributionsForReview(reqCtx, r.db), "failed to tryPrepareCoinDistributionsForReview")) cancel() case <-ctx.Done(): return diff --git a/miner/miner.go b/miner/miner.go index 4d5b53a..e783840 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -62,7 +62,7 @@ func init() { func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client { mi := &miner{ - coinDistributionRepository: coindistribution.NewRepository(context.Background(), func() {}, cfg.Tenant), + coinDistributionRepository: coindistribution.NewRepository(context.Background(), func() {}), mb: messagebroker.MustConnect(context.Background(), parentApplicationYamlKey), db: storage.MustConnect(context.Background(), parentApplicationYamlKey, int(cfg.Workers)), dwhClient: dwh.MustConnect(context.Background(), applicationYamlKey), @@ -77,6 +77,7 @@ func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client { mi.mustInitCoinDistributionCollector(ctx) if isTenantInDistributionMode() { mi.usersRepository = users.New(context.Background(), nil) + go mi.coinDistributionRepository.StartPrepareCoinDistributionsForReviewMonitor(ctx) } for workerNumber := int64(0); workerNumber < cfg.Workers; workerNumber++ {