diff --git a/.github/workflows/CICD.yaml b/.github/workflows/CICD.yaml index aaee2d9..aef99e0 100644 --- a/.github/workflows/CICD.yaml +++ b/.github/workflows/CICD.yaml @@ -175,7 +175,7 @@ jobs: name: Test strategy: matrix: - package: [ "miner", "extra-bonus-notifier", "tokenomics", "cmd/freezer", "cmd/freezer-refrigerant", "cmd/freezer-miner"] + package: [ "miner", "coin-distribution", "extra-bonus-notifier", "tokenomics", "cmd/freezer", "cmd/freezer-refrigerant", "cmd/freezer-miner", "cmd/freezer-coin-distributer"] if: ${{ (github.event_name == 'pull_request' && github.event.pull_request.draft == false) || github.event_name == 'push' }} runs-on: ubuntu-latest # runs-on: self-hosted-ubuntu-latest-x64 @@ -226,7 +226,7 @@ jobs: name: Benchmark strategy: matrix: - package: [ "miner", "extra-bonus-notifier", "tokenomics", "cmd/freezer", "cmd/freezer-refrigerant", "cmd/freezer-miner"] + package: [ "miner", "coin-distribution", "extra-bonus-notifier", "tokenomics", "cmd/freezer", "cmd/freezer-refrigerant", "cmd/freezer-miner", "cmd/freezer-coin-distributer"] if: ${{ (github.event_name == 'pull_request' && github.event.pull_request.draft == false) || github.event_name == 'push' }} runs-on: ubuntu-latest # runs-on: self-hosted-ubuntu-latest-x64 @@ -276,7 +276,7 @@ jobs: name: Verify Dockerfile strategy: matrix: - service: [ "freezer", "freezer-refrigerant", "freezer-miner"] + service: [ "freezer", "freezer-refrigerant", "freezer-miner", "freezer-coin-distributer"] #those are not supported by golang docker image: linux/riscv64 #platforms: linux/s390x,linux/arm64,linux/amd64,linux/ppc64le #commented because build takes too damn much with the other 3 platforms (~10 mins for each!!!) and we don`t need them atm @@ -422,7 +422,7 @@ jobs: name: Push Docker strategy: matrix: - service: [ "freezer", "freezer-refrigerant", "freezer-miner"] + service: [ "freezer", "freezer-refrigerant", "freezer-miner", "freezer-coin-distributer"] #those are not supported by golang docker image: linux/riscv64 #platforms: linux/s390x,linux/arm64,linux/amd64,linux/ppc64le #commented because build takes too damn much with the other 3 platforms (~10 mins for each!!!) and we don`t need them atm @@ -528,6 +528,12 @@ jobs: cmd: | cd secret-infrastructure yq e -i '.generic-service-chart.applicationImage.tag = strenv(APP_TAG)' helm/freezer-miner/staging/common-values.yaml + - name: Update [staging] application tag version in helm/freezer-coin-distributer/staging/common-values.yaml + uses: mikefarah/yq@master + with: + cmd: | + cd secret-infrastructure + yq e -i '.generic-service-chart.applicationImage.tag = strenv(APP_TAG)' helm/freezer-coin-distributer/staging/common-values.yaml - name: Commit and Push Changes to Application Tag Version run: | cd secret-infrastructure @@ -536,7 +542,8 @@ jobs: git add helm/freezer/staging/common-values.yaml git add helm/freezer-refrigerant/staging/common-values.yaml git add helm/freezer-miner/staging/common-values.yaml - git commit -m "Updated 'freezer' & 'freezer-refrigerant' & 'freezer-miner' tag version (${{env.APP_TAG}}) in application helm chart deployment manifests" + git add helm/freezer-coin-distributer/staging/common-values.yaml + git commit -m "Updated 'freezer' & 'freezer-refrigerant' & 'freezer-miner' & 'freezer-coin-distributer' tag version (${{env.APP_TAG}}) in application helm chart deployment manifests" git push --set-upstream origin master - name: Slack Notification For Success if: ${{ success() }} diff --git a/application.yaml b/application.yaml index 1108ec9..433ecb0 100644 --- a/application.yaml +++ b/application.yaml @@ -180,6 +180,9 @@ miner: batchSize: 100 wintr/connectors/storage/v2: *db coin-distribution: + alert-slack-webhook: https://hooks.slack.com/services/dummy/dummy/dummy + environment: local + review-url: https://some.bogus.example.com/going/somewhere development: true workers: 2 batchSize: 100 diff --git a/cmd/freezer/api/docs.go b/cmd/freezer/api/docs.go index 072c8a0..75d94b2 100644 --- a/cmd/freezer/api/docs.go +++ b/cmd/freezer/api/docs.go @@ -776,6 +776,10 @@ const docTemplate = `{ "type": "string", "example": "1,243.02" }, + "totalMiningBlockchain": { + "type": "string", + "example": "1,243.02" + }, "totalNoPreStakingBonus": { "type": "string", "example": "1,243.02" diff --git a/cmd/freezer/api/swagger.json b/cmd/freezer/api/swagger.json index 3c47460..8762c95 100644 --- a/cmd/freezer/api/swagger.json +++ b/cmd/freezer/api/swagger.json @@ -770,6 +770,10 @@ "type": "string", "example": "1,243.02" }, + "totalMiningBlockchain": { + "type": "string", + "example": "1,243.02" + }, "totalNoPreStakingBonus": { "type": "string", "example": "1,243.02" diff --git a/cmd/freezer/api/swagger.yaml b/cmd/freezer/api/swagger.yaml index 79dd1d4..49f73f2 100644 --- a/cmd/freezer/api/swagger.yaml +++ b/cmd/freezer/api/swagger.yaml @@ -78,6 +78,9 @@ definitions: total: example: 1,243.02 type: string + totalMiningBlockchain: + example: 1,243.02 + type: string totalNoPreStakingBonus: example: 1,243.02 type: string diff --git a/coin-distribution/.testdata/application.yaml b/coin-distribution/.testdata/application.yaml index 97e5abf..7aa657f 100644 --- a/coin-distribution/.testdata/application.yaml +++ b/coin-distribution/.testdata/application.yaml @@ -5,6 +5,9 @@ logger: encoder: console level: debug coin-distribution: + alert-slack-webhook: https://hooks.slack.com/services/dummy/dummy/dummy + environment: local + review-url: https://some.bogus.example.com/going/somewhere startHours: 12 endHours: 17 development: true diff --git a/coin-distribution/DDL.sql b/coin-distribution/DDL.sql index 96db6ff..cc1589e 100644 --- a/coin-distribution/DDL.sql +++ b/coin-distribution/DDL.sql @@ -7,6 +7,15 @@ EXCEPTION WHEN duplicate_object THEN null; END $$; +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'pending_coin_distributions_status') THEN + create type pending_coin_distributions_status AS ENUM ('NEW', 'PENDING', 'ACCEPTED', 'REJECTED'); + END IF; +END +$$; + + CREATE TABLE IF NOT EXISTS pending_coin_distributions ( created_at timestamp NOT NULL, internal_id bigint NOT NULL, @@ -14,15 +23,21 @@ CREATE TABLE IF NOT EXISTS pending_coin_distributions ( iceflakes uint256, user_id text NOT NULL, eth_address text NOT NULL, - PRIMARY KEY(day, user_id)); + eth_status pending_coin_distributions_status NOT NULL DEFAULT 'NEW', + eth_tx text, + PRIMARY KEY(day, user_id)) + WITH (FILLFACTOR = 70); -CREATE INDEX IF NOT EXISTS pending_coin_distributions_worker_number_ix ON pending_coin_distributions ((internal_id % 10), created_at ASC); +CREATE INDEX IF NOT EXISTS pending_coin_distributions_worker_number_ix ON pending_coin_distributions (eth_status, (internal_id % 10), created_at ASC); +CREATE INDEX IF NOT EXISTS pending_coin_distributions_eth_status_tx_ix ON pending_coin_distributions (eth_status, eth_tx); CREATE TABLE IF NOT EXISTS global ( key text NOT NULL primary key, - value text NOT NULL ); + value text NOT NULL ) + WITH (FILLFACTOR = 70); INSERT INTO global (key,value) - VALUES ('coin_distributer_enabled','true') + VALUES ('coin_distributer_enabled','true'), + ('coin_collector_enabled','true') ON CONFLICT(key) DO NOTHING; CREATE TABLE IF NOT EXISTS coin_distributions_by_earner ( @@ -35,7 +50,8 @@ CREATE TABLE IF NOT EXISTS coin_distributions_by_earner ( user_id text NOT NULL, earner_user_id text NOT NULL, eth_address text NOT NULL, - PRIMARY KEY(day, user_id, earner_user_id)); + PRIMARY KEY(day, user_id, earner_user_id)) + WITH (FILLFACTOR = 70); CREATE TABLE IF NOT EXISTS coin_distributions_pending_review ( created_at timestamp NOT NULL, @@ -75,4 +91,77 @@ CREATE TABLE IF NOT EXISTS reviewed_coin_distributions ( eth_address text NOT NULL, reviewer_user_id text NOT NULL, decision text NOT NULL, - PRIMARY KEY(user_id, day, review_day)); \ No newline at end of file + PRIMARY KEY(user_id, day, review_day)); + +create or replace procedure approve_coin_distributions(reviewer_user_id text, nested boolean) +language plpgsql + as $$ +declare + now timestamp := current_timestamp; +BEGIN + insert into pending_coin_distributions(created_at, internal_id, day, iceflakes, user_id, eth_address) + select created_at, internal_id, day, iceflakes, user_id, eth_address + from coin_distributions_pending_review; + + insert into reviewed_coin_distributions(reviewed_at, created_at, internal_id, ice, day, review_day, iceflakes, username, referred_by_username, user_id, eth_address, reviewer_user_id, decision) + select now, created_at, internal_id, ice, day, now::date, iceflakes, username, referred_by_username, user_id, eth_address, reviewer_user_id, 'approve' + from coin_distributions_pending_review; + + delete from coin_distributions_pending_review where 1=1; + + IF nested is false THEN + commit; + END IF; +end; $$; + +create or replace procedure deny_coin_distributions(reviewer_user_id text, nested boolean) +language plpgsql + as $$ +declare + now timestamp := current_timestamp; +BEGIN + insert into reviewed_coin_distributions(reviewed_at, created_at, internal_id, ice, day, review_day, iceflakes, username, referred_by_username, user_id, eth_address, reviewer_user_id, decision) + select now, created_at, internal_id, ice, day, now::date, iceflakes, username, referred_by_username, user_id, eth_address, reviewer_user_id, 'deny' + from coin_distributions_pending_review; + + delete from coin_distributions_pending_review where 1=1; + + INSERT INTO global (key,value) + VALUES ('coin_distributer_enabled','false'), + ('coin_collector_enabled','false') + ON CONFLICT (key) DO UPDATE + SET value = EXCLUDED.value; + + IF nested is false THEN + commit; + END IF; +end; $$; + +create or replace procedure prepare_coin_distributions_for_review(nested boolean) +language plpgsql + as $$ +declare + zeros text := '0000000000000000'; +BEGIN + delete from coin_distributions_by_earner WHERE balance = 0; + + insert into coin_distributions_pending_review(created_at, internal_id, ice, day, iceflakes, username, referred_by_username, user_id, eth_address) + SELECT created_at, internal_id, ice, day, (ice::text||zeros)::uint256 AS iceflakes, username, referred_by_username, user_id, eth_address + FROM (select + min (created_at) filter ( where user_id=earner_user_id ) AS created_at, + min (internal_id) filter ( where user_id=earner_user_id ) AS internal_id, + sum(balance) AS ice, + min (day) filter ( where user_id=earner_user_id ) AS day, + string_agg(username,'') AS username, + string_agg(referred_by_username,'') AS referred_by_username, + user_id, + string_agg(eth_address,'') AS eth_address + from coin_distributions_by_earner + group by day,user_id) AS X; + + delete from coin_distributions_by_earner where 1=1; + + IF nested is false THEN + commit; + END IF; +end; $$; \ No newline at end of file diff --git a/coin-distribution/coin_distribution.go b/coin-distribution/coin_distribution.go index ae7e7ed..1fc8249 100644 --- a/coin-distribution/coin_distribution.go +++ b/coin-distribution/coin_distribution.go @@ -26,25 +26,12 @@ func MustStartCoinDistribution(ctx context.Context, cancel context.CancelFunc) C db: storage.MustConnect(context.Background(), ddl, applicationYamlKey), wg: new(sync.WaitGroup), } - cd.wg.Add(int(cfg.Workers)) cd.cancel = cancel - - for workerNumber := int64(0); workerNumber < cfg.Workers; workerNumber++ { - go func(wn int64) { - defer cd.wg.Done() - cd.distributeCoins(ctx, wn) - }(workerNumber) - } + go startPrepareCoinDistributionsForReviewMonitor(ctx, cd.db) return cd } -func NewRepository(ctx context.Context, _ context.CancelFunc) Repository { - repo := &repository{db: storage.MustConnect(ctx, ddl, applicationYamlKey)} - - return repo -} - func (cd *coinDistributer) Close() error { cd.cancel() cd.wg.Wait() @@ -123,10 +110,3 @@ func (cd *coinDistributer) Disable(rooCtx context.Context) error { return nil } - -func (r *repository) CheckHealth(ctx context.Context) error { - return errors.Wrap(r.db.Ping(ctx), "[health-check] failed to ping DB for coindistribution.repository") -} -func (r *repository) Close() error { - return errors.Wrap(r.db.Close(), "failed to close db") -} diff --git a/coin-distribution/contract.go b/coin-distribution/contract.go index b0f073f..2fbd34c 100644 --- a/coin-distribution/contract.go +++ b/coin-distribution/contract.go @@ -26,6 +26,9 @@ type ( GetCoinDistributionsForReview(ctx context.Context, arg *GetCoinDistributionsForReviewArg) (*CoinDistributionsForReview, error) CheckHealth(ctx context.Context) error ReviewCoinDistributions(ctx context.Context, reviewerUserID string, decision string) error + NotifyCoinDistributionCollectionCycleEnded(ctx context.Context) error + GetCollectorStatus(ctx context.Context) (latestCollectingDate *time.Time, collectorEnabled bool, err error) + CollectCoinDistributionsForReview(ctx context.Context, records []*ByEarnerForReview) error } CoinDistributionsForReview struct { @@ -56,6 +59,17 @@ type ( Ice float64 `json:"ice" db:"-" example:"1000"` IceInternal int64 `json:"-" db:"ice" swaggerignore:"true"` } + + ByEarnerForReview struct { + CreatedAt *time.Time + Username string + ReferredByUsername string + UserID string + EarnerUserID string + EthAddress string + InternalID int64 + Balance float64 + } ) // Private API. @@ -80,13 +94,17 @@ type ( wg *sync.WaitGroup } repository struct { - db *storage.DB + cfg *config + db *storage.DB } config struct { - StartHours int `yaml:"startHours"` - EndHours int `yaml:"endHours"` - Workers int64 `yaml:"workers"` - BatchSize int64 `yaml:"batchSize"` - Development bool `yaml:"development"` + AlertSlackWebhook string `yaml:"alert-slack-webhook" mapstructure:"alert-slack-webhook"` //nolint:tagliatelle // . + Environment string `yaml:"environment" mapstructure:"environment"` + ReviewURL string `yaml:"review-url" mapstructure:"review-url"` + StartHours int `yaml:"startHours"` + EndHours int `yaml:"endHours"` + Workers int64 `yaml:"workers"` + BatchSize int64 `yaml:"batchSize"` + Development bool `yaml:"development"` } ) diff --git a/coin-distribution/pending_review.go b/coin-distribution/pending_review.go index b46b6a7..0ebcafa 100644 --- a/coin-distribution/pending_review.go +++ b/coin-distribution/pending_review.go @@ -10,10 +10,39 @@ import ( "github.com/pkg/errors" + appcfg "github.com/ice-blockchain/wintr/config" "github.com/ice-blockchain/wintr/connectors/storage/v2" "github.com/ice-blockchain/wintr/log" + "github.com/ice-blockchain/wintr/time" ) +func NewRepository(ctx context.Context, _ context.CancelFunc) Repository { + var localCfg config + appcfg.MustLoadFromKey(applicationYamlKey, &localCfg) + if localCfg.AlertSlackWebhook == "" { + log.Panic("`alert-slack-webhook` is missing") + } + if localCfg.Environment == "" { + log.Panic("`environment` is missing") + } + if localCfg.ReviewURL == "" { + log.Panic("`review-url` is missing") + } + + return &repository{ + db: storage.MustConnect(ctx, ddl, applicationYamlKey), + cfg: &localCfg, + } +} + +func (r *repository) CheckHealth(ctx context.Context) error { + return errors.Wrap(r.db.Ping(ctx), "[health-check] failed to ping DB for coindistribution.repository") +} + +func (r *repository) Close() error { + return errors.Wrap(r.db.Close(), "failed to close db") +} + //nolint:funlen // . func (r *repository) GetCoinDistributionsForReview(ctx context.Context, arg *GetCoinDistributionsForReviewArg) (*CoinDistributionsForReview, error) { //nolint:lll // . conditions, whereArgs := arg.where() @@ -137,8 +166,147 @@ func (a *GetCoinDistributionsForReviewArg) totalsWhere() ([]string, []any) { return conditions, args } +//nolint:funlen // . func (r *repository) ReviewCoinDistributions(ctx context.Context, reviewerUserID string, decision string) error { - log.Info(fmt.Sprintf("ReviewCoinDistributions(userID:`%v`, decision:`%v`)", reviewerUserID, decision)) + const sqlToCheckIfAnythingNeedsApproving = "SELECT true AS bogus WHERE exists (select 1 FROM coin_distributions_pending_review LIMIT 1)" + switch strings.ToLower(decision) { + case "approve": + return storage.DoInTransaction(ctx, r.db, func(conn storage.QueryExecer) error { + if _, err := storage.Get[struct{ Bogus bool }](ctx, conn, sqlToCheckIfAnythingNeedsApproving); err != nil { + if storage.IsErr(err, storage.ErrNotFound) { + err = nil + } + + return errors.Wrap(err, "failed to check if any rows in coin_distributions_pending_review exist") + } + if _, err := storage.Exec(ctx, conn, "call approve_coin_distributions($1,true)", reviewerUserID); err != nil { + return errors.Wrap(err, "failed to call approve_coin_distributions") + } + + return errors.Wrap(r.sendCurrentCoinDistributionsAvailableForReviewAreApprovedSlackMessage(ctx), + "failed to sendCurrentCoinDistributionsAvailableForReviewAreApprovedSlackMessage") + }) + case "deny": + return storage.DoInTransaction(ctx, r.db, func(conn storage.QueryExecer) error { + if _, err := storage.Get[struct{ Bogus bool }](ctx, conn, sqlToCheckIfAnythingNeedsApproving); err != nil { + if storage.IsErr(err, storage.ErrNotFound) { + err = nil + } + + return errors.Wrap(err, "failed to check if any rows in coin_distributions_pending_review exist") + } + if _, err := storage.Exec(ctx, conn, "call deny_coin_distributions($1,true)", reviewerUserID); err != nil { + return errors.Wrap(err, "failed to call deny_coin_distributions") + } + + return errors.Wrap(r.sendCurrentCoinDistributionsAvailableForReviewAreDeniedSlackMessage(ctx), + "failed to sendCurrentCoinDistributionsAvailableForReviewAreDeniedSlackMessage") + }) + default: + log.Panic(fmt.Sprintf("unknown decision:`%v`", decision)) + } return ctx.Err() } + +func (r *repository) CollectCoinDistributionsForReview(ctx context.Context, records []*ByEarnerForReview) error { + const columns = 9 + values := make([]string, 0, len(records)) + args := make([]any, 0, len(records)*columns) + for ix, record := range records { + values = append(values, generateValuesSQLParams(ix, columns)) + args = append(args, + record.CreatedAt.Time, + record.CreatedAt.Time, + record.InternalID, + int64(record.Balance*100), + record.Username, + record.ReferredByUsername, + record.UserID, + record.EarnerUserID, + record.EthAddress) + } + sql := fmt.Sprintf(`INSERT INTO coin_distributions_by_earner(created_at,day,internal_id,balance,username,referred_by_username,user_id,earner_user_id,eth_address) + VALUES %v + ON CONFLICT (day, user_id, earner_user_id) DO UPDATE + SET balance = EXCLUDED.balance`, strings.Join(values, ",\n")) + _, err := storage.Exec(ctx, r.db, sql, args...) + + return errors.Wrapf(err, "failed to insert into coin_distributions_by_earner %#v", records) +} + +func generateValuesSQLParams(index, columns int) string { + params := make([]string, 0, columns) + for ii := 1; ii <= columns; ii++ { + params = append(params, fmt.Sprintf("$%v", index*columns+ii)) + } + + return fmt.Sprintf("(%v)", strings.Join(params, ",")) +} + +func (r *repository) NotifyCoinDistributionCollectionCycleEnded(ctx context.Context) error { + sql := `INSERT INTO global(key,value) + VALUES ('latest_collecting_date',$1), + ('new_coin_distributions_pending','true') + ON CONFLICT (key) DO UPDATE + SET value = EXCLUDED.value` + _, err := storage.Exec(ctx, r.db, sql, time.Now().Format(stdlibtime.DateOnly)) + + return errors.Wrap(err, "failed to update global.value for latest_collecting_date to now and mark new_coin_distributions_pending") +} + +func (r *repository) GetCollectorStatus(ctx context.Context) (latestCollectingDate *time.Time, collectorEnabled bool, err error) { + sql := `SELECT (SELECT value::timestamp FROM global WHERE key = $1) AS latest_collecting_date, + coalesce((SELECT value::bool FROM global WHERE key = $2),false) AS coin_collector_enabled` + val, err := storage.ExecOne[struct { + LatestCollectingDate *time.Time + CoinCollectorEnabled bool + }](ctx, r.db, sql, "latest_collecting_date", "coin_collector_enabled") + if err != nil { + return nil, false, errors.Wrap(err, "failed to select info about latest_collecting_date, coin_collector_enabled") + } + + return val.LatestCollectingDate, val.CoinCollectorEnabled, nil +} + +func tryPrepareCoinDistributionsForReview(ctx context.Context, db *storage.DB) error { + return storage.DoInTransaction(ctx, db, func(conn storage.QueryExecer) error { + if _, err := storage.Get[struct{ Bogus bool }](ctx, conn, "SELECT true AS bogus FROM global WHERE key = 'new_coin_distributions_pending' FOR UPDATE SKIP LOCKED"); err != nil { + if storage.IsErr(err, storage.ErrNotFound) { + err = nil + } + + return errors.Wrap(err, "failed to check if we should start preparing new coin distributions for review") + } + + if _, err := storage.Exec(ctx, conn, "call prepare_coin_distributions_for_review(true)"); err != nil { + return errors.Wrap(err, "failed to call prepare_coin_distributions_for_review") + } + + if rowsDeleted, err := storage.Exec(ctx, conn, "DELETE FROM global where key = 'new_coin_distributions_pending'"); err != nil || rowsDeleted != 1 { + if err == nil { + err = errors.Errorf("expected 1 rowsDeleted, actual: %v", rowsDeleted) + } + + return errors.Wrap(err, "failed to del global.key='new_coin_distributions_pending'") + } + + return errors.Wrap(sendNewCoinDistributionsAvailableForReviewSlackMessage(ctx), "failed to sendNewCoinDistributionsAvailableForReviewSlackMessage") + }) +} + +func startPrepareCoinDistributionsForReviewMonitor(ctx context.Context, db *storage.DB) { + ticker := stdlibtime.NewTicker(30 * stdlibtime.Second) //nolint:gomnd // . + defer ticker.Stop() + + for { + select { + case <-ticker.C: + reqCtx, cancel := context.WithTimeout(ctx, 10*stdlibtime.Minute) //nolint:gomnd // . + log.Error(errors.Wrap(tryPrepareCoinDistributionsForReview(reqCtx, db), "failed to tryPrepareCoinDistributionsForReview")) + cancel() + case <-ctx.Done(): + return + } + } +} diff --git a/coin-distribution/slack_alerts.go b/coin-distribution/slack_alerts.go new file mode 100644 index 0000000..2219edc --- /dev/null +++ b/coin-distribution/slack_alerts.go @@ -0,0 +1,76 @@ +// SPDX-License-Identifier: ice License 1.0 + +package coindistribution + +import ( + "bytes" + "context" + "fmt" + "net/http" + + "github.com/goccy/go-json" + "github.com/pkg/errors" +) + +func (r *repository) sendNewCoinDistributionsAvailableForReviewSlackMessage(ctx context.Context) error { + text := fmt.Sprintf(":eyes:`%v` <%v|new coin distributions are available for review> :eyes:", r.cfg.Environment, r.cfg.ReviewURL) + + return errors.Wrap(sendSlackMessage(ctx, text, r.cfg.AlertSlackWebhook), "failed to sendSlackMessage") +} + +func sendNewCoinDistributionsAvailableForReviewSlackMessage(ctx context.Context) error { + text := fmt.Sprintf(":eyes:`%v` <%v|new coin distributions are available for review> :eyes:", cfg.Environment, cfg.ReviewURL) + + return errors.Wrap(sendSlackMessage(ctx, text, cfg.AlertSlackWebhook), "failed to sendSlackMessage") +} + +func (r *repository) sendCurrentCoinDistributionsAvailableForReviewAreApprovedSlackMessage(ctx context.Context) error { + text := fmt.Sprintf(":white_check_mark:`%v` current pending coin distributions are approved and are going to be processed as soon as the coin-distributer comes online :white_check_mark:", r.cfg.Environment) //nolint:lll // . + + return errors.Wrap(sendSlackMessage(ctx, text, r.cfg.AlertSlackWebhook), "failed to sendSlackMessage") +} + +func (r *repository) sendCurrentCoinDistributionsAvailableForReviewAreDeniedSlackMessage(ctx context.Context) error { + text := fmt.Sprintf(":no_entry:`%v` current pending coin distributions are denied and will not be processed :no_entry:", r.cfg.Environment) + + return errors.Wrap(sendSlackMessage(ctx, text, r.cfg.AlertSlackWebhook), "failed to sendSlackMessage") +} + +func sendCurrentCoinDistributionsFinishedBeingSentToEthereumSlackMessage(ctx context.Context) error { + text := fmt.Sprintf(":rocket:`%v` all pending coin distributions have been processed successfully :rocket:", cfg.Environment) + + return errors.Wrap(sendSlackMessage(ctx, text, cfg.AlertSlackWebhook), "failed to sendSlackMessage") +} + +func sendCoinDistributionsProcessingStoppedDueToUnrecoverableFailureSlackMessage(ctx context.Context, reason string) error { + text := fmt.Sprintf(":bangbang:`%v` coin distribution processing stopped due to failure :bangbang:\n:rotating_light: reason: `%v` :rotating_light:", cfg.Environment, reason) //nolint:lll // . + + return errors.Wrap(sendSlackMessage(ctx, text, cfg.AlertSlackWebhook), "failed to sendSlackMessage") +} + +//nolint:funlen // . +func sendSlackMessage(ctx context.Context, text, alertSlackWebhook string) error { + message := struct { + Text string `json:"text,omitempty"` + }{ + Text: text, + } + data, err := json.Marshal(message) + if err != nil { + return errors.Wrapf(err, "failed to Marshal slack message:%#v", message) + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, alertSlackWebhook, bytes.NewBuffer(data)) + if err != nil { + return errors.Wrap(err, "newRequestWithContext failed") + } + + resp, err := new(http.Client).Do(req) + if err != nil { + return errors.Wrap(err, "slack webhook request failed") + } + if resp.StatusCode != http.StatusOK { + return errors.Errorf("unexpected statusCode:%v", resp.StatusCode) + } + + return errors.Wrap(resp.Body.Close(), "failed to close body") +} diff --git a/coin-distribution/trigger.go b/coin-distribution/trigger.go deleted file mode 100644 index a6cfe30..0000000 --- a/coin-distribution/trigger.go +++ /dev/null @@ -1,73 +0,0 @@ -// SPDX-License-Identifier: ice License 1.0 - -package coindistribution - -import ( - "context" - "fmt" - "strings" - stdlibtime "time" - - "github.com/pkg/errors" - - "github.com/ice-blockchain/wintr/connectors/storage/v2" - "github.com/ice-blockchain/wintr/time" -) - -type ( - TriggeringRecord struct { - CreatedAt *time.Time - Username string - ReferredByUsername string - UserID string - EarnerUserID string - EthAddress string - InternalID int64 - Balance float64 - } -) - -func TriggerCoinDistribution(ctx context.Context, db storage.Execer, records []*TriggeringRecord) error { - const columns = 9 - values := make([]string, 0, len(records)) - args := make([]any, 0, len(records)*columns) - for ix, record := range records { - values = append(values, generateValuesSQLParams(ix, columns)) - args = append(args, - record.CreatedAt.Time, - record.CreatedAt.Time, - record.InternalID, - int64(record.Balance*100), - record.Username, - record.ReferredByUsername, - record.UserID, - record.EarnerUserID, - record.EthAddress) - } - sql := fmt.Sprintf(`INSERT INTO coin_distributions_by_earner(created_at,day,internal_id,balance,username,referred_by_username,user_id,earner_user_id,eth_address) - VALUES %v - ON CONFLICT (day, user_id, earner_user_id) DO UPDATE - SET balance = EXCLUDED.balance`, strings.Join(values, ",\n")) - _, err := storage.Exec(ctx, db, sql, args...) - - return errors.Wrapf(err, "failed to insert into coin_distributions_by_earner %#v", records) -} - -func generateValuesSQLParams(index, columns int) string { - params := make([]string, 0, columns) - for ii := 1; ii <= columns; ii++ { - params = append(params, fmt.Sprintf("$%v", index*columns+ii)) - } - - return fmt.Sprintf("(%v)", strings.Join(params, ",")) -} - -func X(ctx context.Context, db storage.Execer) error { - sql := `INSERT INTO global(key,value) VALUES ('latest_processing_date',$1) - ON CONFLICT (key) DO UPDATE - SET value = EXCLUDED.value - WHERE value != EXCLUDED.value` - _, err := storage.Exec(ctx, db, sql, time.Now().Format(stdlibtime.DateOnly)) - - return errors.Wrap(err, "failed to XXX ") -} diff --git a/go.mod b/go.mod index b6a9de7..2d8f320 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/goccy/go-json v0.10.2 github.com/hashicorp/go-multierror v1.1.1 - github.com/ice-blockchain/eskimo v1.227.0 + github.com/ice-blockchain/eskimo v1.228.0 github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb github.com/ice-blockchain/wintr v1.127.0 github.com/imroc/req/v3 v3.42.2 @@ -129,7 +129,7 @@ require ( github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/spf13/viper v1.18.1 // indirect + github.com/spf13/viper v1.18.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/swaggo/files v1.0.1 // indirect github.com/swaggo/gin-swagger v1.6.0 // indirect @@ -150,7 +150,7 @@ require ( go.uber.org/mock v0.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.6.0 // indirect - golang.org/x/crypto v0.16.0 // indirect + golang.org/x/crypto v0.17.0 // indirect golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611 // indirect golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.19.0 // indirect diff --git a/go.sum b/go.sum index d5125c6..b211a5b 100644 --- a/go.sum +++ b/go.sum @@ -208,8 +208,8 @@ github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mO github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/ice-blockchain/eskimo v1.227.0 h1:w8TaGCJY7FfbOGOyQG2kp+weDkyJZEXLRujdCtsDr5w= -github.com/ice-blockchain/eskimo v1.227.0/go.mod h1:Btj5U8KPf2+dkGtajQER1SjfXv40t8RqaihVNd4FE1s= +github.com/ice-blockchain/eskimo v1.228.0 h1:+znQ8lsx48scMRngfpBWi+h5scSqKF5TBAKe/mDqIQ4= +github.com/ice-blockchain/eskimo v1.228.0/go.mod h1:Btj5U8KPf2+dkGtajQER1SjfXv40t8RqaihVNd4FE1s= github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb h1:8TnFP3mc7O+tc44kv2e0/TpZKnEVUaKH+UstwfBwRkk= github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb/go.mod h1:ZsQU7i3mxhgBBu43Oev7WPFbIjP4TniN/b1UPNGbrq8= github.com/ice-blockchain/wintr v1.127.0 h1:YuGfLCGu91mLtsH0AcdNKnDERZPD6+3er93T/m7vF2Q= @@ -329,8 +329,8 @@ github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/spf13/viper v1.18.1 h1:rmuU42rScKWlhhJDyXZRKJQHXFX02chSVW1IvkPGiVM= -github.com/spf13/viper v1.18.1/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk= +github.com/spf13/viper v1.18.2 h1:LUXCnvUvSM6FXAsj6nnfc8Q2tp1dIgUfY9Kc8GsSOiQ= +github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= @@ -404,8 +404,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= -golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611 h1:qCEDpW1G+vcj3Y7Fy52pEM1AWm3abj8WimGYejI3SC4= golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= diff --git a/tokenomics/balance.go b/tokenomics/balance.go index ee4462b..393ed57 100644 --- a/tokenomics/balance.go +++ b/tokenomics/balance.go @@ -71,6 +71,10 @@ func (r *repository) GetBalanceSummary( //nolint:lll // . model.BalanceT0Field model.BalanceT1Field model.BalanceT2Field + model.BalanceSoloEthereumField + model.BalanceT0EthereumField + model.BalanceT1EthereumField + model.BalanceT2EthereumField model.PreStakingBonusField model.PreStakingAllocationField }](ctx, r.db, model.SerializedUsersKey(id)) @@ -97,6 +101,7 @@ func (r *repository) GetBalanceSummary( //nolint:lll // . T1: fmt.Sprintf(floatToStringFormatter, t1Standard+t1PreStaking), T2: fmt.Sprintf(floatToStringFormatter, t2Standard+t2PreStaking), TotalReferrals: fmt.Sprintf(floatToStringFormatter, t1Standard+t1PreStaking+t2Standard+t2PreStaking), + TotalMiningBlockchain: fmt.Sprintf(floatToStringFormatter, res[0].BalanceSoloEthereum+res[0].BalanceT0Ethereum+res[0].BalanceT1Ethereum+res[0].BalanceT2Ethereum), //nolint:lll // . }, }, nil } diff --git a/tokenomics/contract.go b/tokenomics/contract.go index 1a92a8b..a621b48 100644 --- a/tokenomics/contract.go +++ b/tokenomics/contract.go @@ -79,6 +79,7 @@ type ( T1 DENOM `json:"t1,omitempty" swaggertype:"string" example:"1,243.02"` T2 DENOM `json:"t2,omitempty" swaggertype:"string" example:"1,243.02"` TotalReferrals DENOM `json:"totalReferrals,omitempty" swaggertype:"string" example:"1,243.02"` + TotalMiningBlockchain DENOM `json:"totalMiningBlockchain,omitempty" swaggertype:"string" example:"1,243.02"` UserID string `json:"userId,omitempty" swaggerignore:"true" example:"did:ethr:0x4B73C58370AEfcEf86A6021afCDe5673511376B2"` miningBlockchainAccountAddress string } diff --git a/tokenomics/kyc.go b/tokenomics/kyc.go index cfcfe55..6d6ea1b 100644 --- a/tokenomics/kyc.go +++ b/tokenomics/kyc.go @@ -386,7 +386,7 @@ func (r *repository) overrideKYCStateWithEskimoKYCState(ctx context.Context, use } }). SetRetryCondition(func(resp *req.Response, err error) bool { - return err != nil || (resp.GetStatusCode() != http.StatusOK && resp.GetStatusCode() != http.StatusUnauthorized && resp.GetStatusCode() != http.StatusForbidden) //nolint:lll // . + return err != nil || (resp.GetStatusCode() != http.StatusOK && resp.GetStatusCode() != http.StatusNotFound && resp.GetStatusCode() != http.StatusUnauthorized && resp.GetStatusCode() != http.StatusForbidden) //nolint:lll // . }). AddQueryParam("caller", "freezer-refrigerant"). SetHeader("Authorization", authorization(ctx)).