Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

startPrepareCoinDistributionsForReviewMonitor for docx tenant. #227

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion coin-distribution/DDL.sql
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ BEGIN
string_agg(distinct eth_address,'') AS eth_address,
verified
from coin_distributions_by_earner
group by day,user_id) AS X;
group by day,user_id,verified) AS X;

delete from coin_distributions_by_earner where 1=1;

Expand Down
10 changes: 6 additions & 4 deletions coin-distribution/coin_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,18 @@ func MustStartCoinDistribution(ctx context.Context, _ context.CancelFunc) Client
cd := mustCreateCoinDistributionFromConfig(ctx, &cfg, eth)
cd.MustStart(ctx, nil)

go startPrepareCoinDistributionsForReviewMonitor(ctx, cd.DB)
go cd.repository.StartPrepareCoinDistributionsForReviewMonitor(ctx)

return cd
}

func mustCreateCoinDistributionFromConfig(ctx context.Context, conf *config, ethClient ethClient) *coinDistributer {
db := storage.MustConnect(ctx, ddl, applicationYamlKey)
cd := &coinDistributer{
Client: ethClient,
Processor: newCoinProcessor(ethClient, db, conf),
DB: db,
Client: ethClient,
Processor: newCoinProcessor(ethClient, db, conf),
DB: db,
repository: NewRepository(ctx, nil),
}

return cd
Expand All @@ -172,6 +173,7 @@ func (cd *coinDistributer) Close() error {
errors.Wrap(cd.Processor.Close(), "failed to close processor"),
errors.Wrap(cd.Client.Close(), "failed to close eth client"),
errors.Wrap(cd.DB.Close(), "failed to close db"),
errors.Wrap(cd.repository.Close(), "failed to close repository"),
).ErrorOrNil()
}

Expand Down
10 changes: 7 additions & 3 deletions coin-distribution/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type (
NotifyCoinDistributionCollectionCycleEnded(ctx context.Context) error
GetCollectorSettings(ctx context.Context) (*CollectorSettings, error)
CollectCoinDistributionsForReview(ctx context.Context, records []*ByEarnerForReview) error
StartPrepareCoinDistributionsForReviewMonitor(ctx context.Context)
}
CollectorSettings struct {
DeniedCountries map[string]struct{}
Expand Down Expand Up @@ -124,6 +125,8 @@ const (
configKeyCoinDistributerMsgOnline = "coin_distributer_msg_sent_online_date"
configKeyCoinDistributerMsgOffline = "coin_distributer_msg_sent_offline_date"
configKeyCoinDistributerMsgFinished = "coin_distributer_msg_sent_finished_date"

doctorXTenant = "doctorx"
)

// .
Expand Down Expand Up @@ -191,9 +194,10 @@ type (
AirDropper airDropper
}
coinDistributer struct {
Client ethClient
DB *storage.DB
Processor *coinProcessor
Client ethClient
DB *storage.DB
Processor *coinProcessor
repository Repository
}
repository struct {
cfg *config
Expand Down
7 changes: 4 additions & 3 deletions coin-distribution/pending_review.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ func NewRepository(ctx context.Context, _ context.CancelFunc) Repository {
if localCfg.ReviewURL == "" {
log.Panic("`review-url` is missing")
}
db := storage.MustConnect(ctx, ddl, applicationYamlKey)

return &repository{
db: storage.MustConnect(ctx, ddl, applicationYamlKey),
db: db,
cfg: &localCfg,
}
}
Expand Down Expand Up @@ -379,15 +380,15 @@ func tryPrepareCoinDistributionsForReview(ctx context.Context, db *storage.DB) e
})
}

func startPrepareCoinDistributionsForReviewMonitor(ctx context.Context, db *storage.DB) {
func (r *repository) StartPrepareCoinDistributionsForReviewMonitor(ctx context.Context) {
ticker := stdlibtime.NewTicker(30 * stdlibtime.Second) //nolint:gomnd // .
defer ticker.Stop()

for {
select {
case <-ticker.C:
reqCtx, cancel := context.WithTimeout(ctx, 10*stdlibtime.Minute) //nolint:gomnd // .
log.Error(errors.Wrap(tryPrepareCoinDistributionsForReview(reqCtx, db), "failed to tryPrepareCoinDistributionsForReview"))
log.Error(errors.Wrap(tryPrepareCoinDistributionsForReview(reqCtx, r.db), "failed to tryPrepareCoinDistributionsForReview"))
cancel()
case <-ctx.Done():
return
Expand Down
1 change: 1 addition & 0 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client {
mi.mustInitCoinDistributionCollector(ctx)
if isTenantInDistributionMode() {
mi.usersRepository = users.New(context.Background(), nil)
go mi.coinDistributionRepository.StartPrepareCoinDistributionsForReviewMonitor(ctx)
}

for workerNumber := int64(0); workerNumber < cfg.Workers; workerNumber++ {
Expand Down
Loading