diff --git a/coin-distribution/.testdata/application.yaml b/coin-distribution/.testdata/application.yaml index 37f82df..97e5abf 100644 --- a/coin-distribution/.testdata/application.yaml +++ b/coin-distribution/.testdata/application.yaml @@ -5,6 +5,8 @@ logger: encoder: console level: debug coin-distribution: + startHours: 12 + endHours: 17 development: true workers: 10 batchSize: 100 diff --git a/coin-distribution/DDL.sql b/coin-distribution/DDL.sql index 8289650..2f3d785 100644 --- a/coin-distribution/DDL.sql +++ b/coin-distribution/DDL.sql @@ -8,6 +8,11 @@ CREATE TABLE IF NOT EXISTS pending_coin_distributions ( CREATE INDEX IF NOT EXISTS pending_coin_distributions_worker_number_ix ON pending_coin_distributions ((internal_id % 10), created_at ASC); +CREATE TABLE IF NOT EXISTS pending_coin_distribution_configurations ( + key text NOT NULL primary key, + value text NOT NULL ); +INSERT INTO pending_coin_distribution_configurations(key,value) VALUES ('enabled','true') ON CONFLICT(key) DO NOTHING; + --- Flow: --infinite loop: -- with 30 sec sleep between iterations if 0 rows returned --do in transaction: @@ -17,7 +22,5 @@ CREATE INDEX IF NOT EXISTS pending_coin_distributions_worker_number_ix ON pendin -- ORDER BY created_at ASC -- LIMIT $2 -- FOR UPDATE ---2. call ERC-20 smart contract method to airdrop coins ---3. delete from pending_coin_distributions WHERE user_id = ANY($1) --- if transaction fails you retry infinitely --- log.info every successful transaction, log every error +--2. delete from pending_coin_distributions WHERE user_id = ANY($1) +--3. call ERC-20 smart contract method to airdrop coins \ No newline at end of file diff --git a/coin-distribution/coin_distribution.go b/coin-distribution/coin_distribution.go index cffdf46..c2b86c6 100644 --- a/coin-distribution/coin_distribution.go +++ b/coin-distribution/coin_distribution.go @@ -4,13 +4,17 @@ package coindistribution import ( "context" + "fmt" "sync" + stdlibtime "time" "github.com/hashicorp/go-multierror" "github.com/pkg/errors" appCfg "github.com/ice-blockchain/wintr/config" "github.com/ice-blockchain/wintr/connectors/storage/v2" + "github.com/ice-blockchain/wintr/log" + "github.com/ice-blockchain/wintr/time" ) func init() { @@ -50,6 +54,66 @@ func (cd *coinDistributer) CheckHealth(ctx context.Context) error { func (cd *coinDistributer) distributeCoins(ctx context.Context, workerNumber int64) { for ctx.Err() == nil { - println(workerNumber) + if !cd.isEnabled(ctx) { + log.Info(fmt.Sprintf("coinDistributer[%v] is disabled", workerNumber)) + stdlibtime.Sleep(requestDeadline) + + continue + } + if currentHour := time.Now().Hour() + 1; (cfg.StartHours < cfg.EndHours && (currentHour < cfg.StartHours || currentHour > cfg.EndHours)) || + (cfg.StartHours > cfg.EndHours && (currentHour < cfg.StartHours && currentHour > cfg.EndHours)) { + log.Info(fmt.Sprintf("coinDistributer[%v] is disabled until %v-%v", workerNumber, cfg.StartHours, cfg.EndHours)) + stdlibtime.Sleep(requestDeadline) + + continue + } + reqCtx, cancel := context.WithTimeout(ctx, requestDeadline) + err := storage.DoInTransaction(reqCtx, cd.db, func(conn storage.QueryExecer) error { + // Logic here + + return nil + }) + cancel() + if err == nil { + log.Info(fmt.Sprintf("TODO: add stuff here")) + } else { + log.Error(errors.Wrapf(err, "TODO: add stuff here")) + } + + if false { // if call to ethereum failed or if transaction commit failed + // Send Slack message with as much info as possible + + for err = cd.Disable(ctx); err != nil; err = cd.Disable(ctx) { + } + } } } + +func (cd *coinDistributer) isEnabled(rooCtx context.Context) bool { + ctx, cancel := context.WithTimeout(rooCtx, requestDeadline) + defer cancel() + val, err := storage.Get[struct { + Enabled bool + }](ctx, cd.db, `SELECT value::bool as enabled FROM pending_coin_distribution_configurations WHERE key = 'enabled'`) + if err != nil { + log.Error(errors.Wrap(err, "failed to check if coinDistributer is enabled")) + + return false + } + + return val.Enabled +} + +func (cd *coinDistributer) Disable(rooCtx context.Context) error { + ctx, cancel := context.WithTimeout(rooCtx, requestDeadline) + defer cancel() + rows, err := storage.Exec(ctx, cd.db, `UPDATE pending_coin_distribution_configurations SET value = 'false' WHERE key = 'enabled'`) + if err != nil { + return errors.Wrap(err, "failed to disable coinDistributer") + } + if rows == 0 { + return errors.Wrap(storage.ErrNotFound, "failed to disable coinDistributer") + } + + return nil +} diff --git a/coin-distribution/contract.go b/coin-distribution/contract.go index 0d19ff5..455cbbe 100644 --- a/coin-distribution/contract.go +++ b/coin-distribution/contract.go @@ -7,6 +7,7 @@ import ( _ "embed" "io" "sync" + stdlibtime "time" "github.com/ice-blockchain/wintr/connectors/storage/v2" ) @@ -24,6 +25,7 @@ type ( const ( applicationYamlKey = "coin-distribution" + requestDeadline = 25 * stdlibtime.Second ) // . @@ -41,6 +43,8 @@ type ( wg *sync.WaitGroup } config struct { + StartHours int `yaml:"startHours"` + EndHours int `yaml:"endHours"` Workers int64 `yaml:"workers"` BatchSize int64 `yaml:"batchSize"` Development bool `yaml:"development"`