Skip to content

Commit

Permalink
startPrepareCoinDistributionsForReviewMonitor for docx tenant. (#227)
Browse files Browse the repository at this point in the history
startPrepareCoinDistributionsForReviewMonitor for docx tenant. Fixed
prepare_coin_distributions_for_review procedure.
  • Loading branch information
ice-myles authored Dec 5, 2024
1 parent 128a55e commit dea923e
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 11 deletions.
2 changes: 1 addition & 1 deletion coin-distribution/DDL.sql
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ BEGIN
string_agg(distinct eth_address,'') AS eth_address,
verified
from coin_distributions_by_earner
group by day,user_id) AS X;
group by day,user_id,verified) AS X;

delete from coin_distributions_by_earner where 1=1;

Expand Down
10 changes: 6 additions & 4 deletions coin-distribution/coin_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,18 @@ func MustStartCoinDistribution(ctx context.Context, _ context.CancelFunc) Client
cd := mustCreateCoinDistributionFromConfig(ctx, &cfg, eth)
cd.MustStart(ctx, nil)

go startPrepareCoinDistributionsForReviewMonitor(ctx, cd.DB)
go cd.repository.StartPrepareCoinDistributionsForReviewMonitor(ctx)

return cd
}

func mustCreateCoinDistributionFromConfig(ctx context.Context, conf *config, ethClient ethClient) *coinDistributer {
db := storage.MustConnect(ctx, ddl, applicationYamlKey)
cd := &coinDistributer{
Client: ethClient,
Processor: newCoinProcessor(ethClient, db, conf),
DB: db,
Client: ethClient,
Processor: newCoinProcessor(ethClient, db, conf),
DB: db,
repository: NewRepository(ctx, nil),
}

return cd
Expand All @@ -172,6 +173,7 @@ func (cd *coinDistributer) Close() error {
errors.Wrap(cd.Processor.Close(), "failed to close processor"),
errors.Wrap(cd.Client.Close(), "failed to close eth client"),
errors.Wrap(cd.DB.Close(), "failed to close db"),
errors.Wrap(cd.repository.Close(), "failed to close repository"),
).ErrorOrNil()
}

Expand Down
10 changes: 7 additions & 3 deletions coin-distribution/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type (
NotifyCoinDistributionCollectionCycleEnded(ctx context.Context) error
GetCollectorSettings(ctx context.Context) (*CollectorSettings, error)
CollectCoinDistributionsForReview(ctx context.Context, records []*ByEarnerForReview) error
StartPrepareCoinDistributionsForReviewMonitor(ctx context.Context)
}
CollectorSettings struct {
DeniedCountries map[string]struct{}
Expand Down Expand Up @@ -124,6 +125,8 @@ const (
configKeyCoinDistributerMsgOnline = "coin_distributer_msg_sent_online_date"
configKeyCoinDistributerMsgOffline = "coin_distributer_msg_sent_offline_date"
configKeyCoinDistributerMsgFinished = "coin_distributer_msg_sent_finished_date"

doctorXTenant = "doctorx"
)

// .
Expand Down Expand Up @@ -191,9 +194,10 @@ type (
AirDropper airDropper
}
coinDistributer struct {
Client ethClient
DB *storage.DB
Processor *coinProcessor
Client ethClient
DB *storage.DB
Processor *coinProcessor
repository Repository
}
repository struct {
cfg *config
Expand Down
7 changes: 4 additions & 3 deletions coin-distribution/pending_review.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ func NewRepository(ctx context.Context, _ context.CancelFunc) Repository {
if localCfg.ReviewURL == "" {
log.Panic("`review-url` is missing")
}
db := storage.MustConnect(ctx, ddl, applicationYamlKey)

return &repository{
db: storage.MustConnect(ctx, ddl, applicationYamlKey),
db: db,
cfg: &localCfg,
}
}
Expand Down Expand Up @@ -379,15 +380,15 @@ func tryPrepareCoinDistributionsForReview(ctx context.Context, db *storage.DB) e
})
}

func startPrepareCoinDistributionsForReviewMonitor(ctx context.Context, db *storage.DB) {
func (r *repository) StartPrepareCoinDistributionsForReviewMonitor(ctx context.Context) {
ticker := stdlibtime.NewTicker(30 * stdlibtime.Second) //nolint:gomnd // .
defer ticker.Stop()

for {
select {
case <-ticker.C:
reqCtx, cancel := context.WithTimeout(ctx, 10*stdlibtime.Minute) //nolint:gomnd // .
log.Error(errors.Wrap(tryPrepareCoinDistributionsForReview(reqCtx, db), "failed to tryPrepareCoinDistributionsForReview"))
log.Error(errors.Wrap(tryPrepareCoinDistributionsForReview(reqCtx, r.db), "failed to tryPrepareCoinDistributionsForReview"))
cancel()
case <-ctx.Done():
return
Expand Down
1 change: 1 addition & 0 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client {
mi.mustInitCoinDistributionCollector(ctx)
if isTenantInDistributionMode() {
mi.usersRepository = users.New(context.Background(), nil)
go mi.coinDistributionRepository.StartPrepareCoinDistributionsForReviewMonitor(ctx)
}

for workerNumber := int64(0); workerNumber < cfg.Workers; workerNumber++ {
Expand Down

0 comments on commit dea923e

Please sign in to comment.