Skip to content

Commit

Permalink
fixed coin-distribution structure
Browse files Browse the repository at this point in the history
  • Loading branch information
ice-ares committed Nov 30, 2023
1 parent e19be03 commit 2ff6724
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 5 deletions.
2 changes: 2 additions & 0 deletions coin-distribution/.testdata/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ logger:
encoder: console
level: debug
coin-distribution:
startHours: 12
endHours: 17
development: true
workers: 10
batchSize: 100
Expand Down
11 changes: 7 additions & 4 deletions coin-distribution/DDL.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ CREATE TABLE IF NOT EXISTS pending_coin_distributions (

CREATE INDEX IF NOT EXISTS pending_coin_distributions_worker_number_ix ON pending_coin_distributions ((internal_id % 10), created_at ASC);

CREATE TABLE IF NOT EXISTS pending_coin_distribution_configurations (
key text NOT NULL primary key,
value text NOT NULL );
INSERT INTO pending_coin_distribution_configurations(key,value) VALUES ('enabled','true') ON CONFLICT(key) DO NOTHING;

--- Flow:
--infinite loop: -- with 30 sec sleep between iterations if 0 rows returned
--do in transaction:
Expand All @@ -17,7 +22,5 @@ CREATE INDEX IF NOT EXISTS pending_coin_distributions_worker_number_ix ON pendin
-- ORDER BY created_at ASC
-- LIMIT $2
-- FOR UPDATE
--2. call ERC-20 smart contract method to airdrop coins
--3. delete from pending_coin_distributions WHERE user_id = ANY($1)
-- if transaction fails you retry infinitely
-- log.info every successful transaction, log every error
--2. delete from pending_coin_distributions WHERE user_id = ANY($1)
--3. call ERC-20 smart contract method to airdrop coins
66 changes: 65 additions & 1 deletion coin-distribution/coin_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ package coindistribution

import (
"context"
"fmt"
"sync"
stdlibtime "time"

"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"

appCfg "github.com/ice-blockchain/wintr/config"
"github.com/ice-blockchain/wintr/connectors/storage/v2"
"github.com/ice-blockchain/wintr/log"
"github.com/ice-blockchain/wintr/time"
)

func init() {
Expand Down Expand Up @@ -50,6 +54,66 @@ func (cd *coinDistributer) CheckHealth(ctx context.Context) error {

func (cd *coinDistributer) distributeCoins(ctx context.Context, workerNumber int64) {
for ctx.Err() == nil {
println(workerNumber)
if !cd.isEnabled(ctx) {
log.Info(fmt.Sprintf("coinDistributer[%v] is disabled", workerNumber))
stdlibtime.Sleep(requestDeadline)

continue
}
if currentHour := time.Now().Hour() + 1; (cfg.StartHours < cfg.EndHours && (currentHour < cfg.StartHours || currentHour > cfg.EndHours)) ||
(cfg.StartHours > cfg.EndHours && (currentHour < cfg.StartHours && currentHour > cfg.EndHours)) {
log.Info(fmt.Sprintf("coinDistributer[%v] is disabled until %v-%v", workerNumber, cfg.StartHours, cfg.EndHours))
stdlibtime.Sleep(requestDeadline)

continue
}
reqCtx, cancel := context.WithTimeout(ctx, requestDeadline)
err := storage.DoInTransaction(reqCtx, cd.db, func(conn storage.QueryExecer) error {
// Logic here

return nil
})
cancel()
if err == nil {
log.Info(fmt.Sprintf("TODO: add stuff here"))
} else {
log.Error(errors.Wrapf(err, "TODO: add stuff here"))
}

if false { // if call to ethereum failed or if transaction commit failed
// Send Slack message with as much info as possible

for err = cd.Disable(ctx); err != nil; err = cd.Disable(ctx) {
}
}
}
}

func (cd *coinDistributer) isEnabled(rooCtx context.Context) bool {
ctx, cancel := context.WithTimeout(rooCtx, requestDeadline)
defer cancel()
val, err := storage.Get[struct {
Enabled bool
}](ctx, cd.db, `SELECT value::bool as enabled FROM pending_coin_distribution_configurations WHERE key = 'enabled'`)
if err != nil {
log.Error(errors.Wrap(err, "failed to check if coinDistributer is enabled"))

return false
}

return val.Enabled
}

func (cd *coinDistributer) Disable(rooCtx context.Context) error {
ctx, cancel := context.WithTimeout(rooCtx, requestDeadline)
defer cancel()
rows, err := storage.Exec(ctx, cd.db, `UPDATE pending_coin_distribution_configurations SET value = 'false' WHERE key = 'enabled'`)
if err != nil {
return errors.Wrap(err, "failed to disable coinDistributer")
}
if rows == 0 {
return errors.Wrap(storage.ErrNotFound, "failed to disable coinDistributer")
}

return nil
}
4 changes: 4 additions & 0 deletions coin-distribution/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
_ "embed"
"io"
"sync"
stdlibtime "time"

"github.com/ice-blockchain/wintr/connectors/storage/v2"
)
Expand All @@ -24,6 +25,7 @@ type (

const (
applicationYamlKey = "coin-distribution"
requestDeadline = 25 * stdlibtime.Second
)

// .
Expand All @@ -41,6 +43,8 @@ type (
wg *sync.WaitGroup
}
config struct {
StartHours int `yaml:"startHours"`
EndHours int `yaml:"endHours"`
Workers int64 `yaml:"workers"`
BatchSize int64 `yaml:"batchSize"`
Development bool `yaml:"development"`
Expand Down

0 comments on commit 2ff6724

Please sign in to comment.