From 4037263fab979bac465b41e84537c2434330f69a Mon Sep 17 00:00:00 2001 From: Ares <75481906+ice-ares@users.noreply.github.com> Date: Wed, 20 Dec 2023 12:58:32 +0200 Subject: [PATCH] coin distribution preparation --- coin-distribution/DDL.sql | 9 +- coin-distribution/contract.go | 13 +- coin-distribution/eligibility.go | 44 ++- coin-distribution/pending_review.go | 81 ++++-- miner/.testdata/application.yaml | 6 +- miner/contract.go | 66 ++++- miner/ethereum_distribution.go | 420 ++++++++++++++++++++++++++++ miner/metrics.go | 25 +- miner/metrics_test.go | 2 + miner/miner.go | 95 ++++++- miner/referral_lifecycle.go | 6 +- model/model.go | 34 ++- 12 files changed, 739 insertions(+), 62 deletions(-) create mode 100644 miner/ethereum_distribution.go diff --git a/coin-distribution/DDL.sql b/coin-distribution/DDL.sql index cc1589e..8023a0a 100644 --- a/coin-distribution/DDL.sql +++ b/coin-distribution/DDL.sql @@ -37,7 +37,14 @@ CREATE TABLE IF NOT EXISTS global ( WITH (FILLFACTOR = 70); INSERT INTO global (key,value) VALUES ('coin_distributer_enabled','true'), - ('coin_collector_enabled','true') + ('coin_collector_enabled','true'), + ('coin_collector_forced_execution','false'), + ('coin_collector_start_date','2023-12-20T10:54:20.657949659Z'), + ('coin_collector_end_date','2024-10-24T10:54:20.657949659Z'), + ('coin_collector_min_mining_streaks_required','0'), + ('coin_collector_start_hour','0'), + ('coin_collector_min_balance_required','0'), + ('coin_collector_denied_countries','') ON CONFLICT(key) DO NOTHING; CREATE TABLE IF NOT EXISTS coin_distributions_by_earner ( diff --git a/coin-distribution/contract.go b/coin-distribution/contract.go index 2fbd34c..e8b7917 100644 --- a/coin-distribution/contract.go +++ b/coin-distribution/contract.go @@ -27,9 +27,20 @@ type ( CheckHealth(ctx context.Context) error ReviewCoinDistributions(ctx context.Context, reviewerUserID string, decision string) error NotifyCoinDistributionCollectionCycleEnded(ctx context.Context) error - GetCollectorStatus(ctx context.Context) (latestCollectingDate *time.Time, collectorEnabled bool, err error) + GetCollectorSettings(ctx context.Context) (*CollectorSettings, error) CollectCoinDistributionsForReview(ctx context.Context, records []*ByEarnerForReview) error } + CollectorSettings struct { + DeniedCountries map[string]struct{} + LatestDate *time.Time + StartDate *time.Time + EndDate *time.Time + MinBalanceRequired float64 + StartHour int + MinMiningStreaksRequired uint64 + Enabled bool + ForcedExecution bool + } CoinDistributionsForReview struct { Distributions []*PendingReview `json:"distributions"` diff --git a/coin-distribution/eligibility.go b/coin-distribution/eligibility.go index a303a49..afc971e 100644 --- a/coin-distribution/eligibility.go +++ b/coin-distribution/eligibility.go @@ -3,6 +3,7 @@ package coindistribution import ( + "strings" stdlibtime "time" "github.com/ice-blockchain/eskimo/users" @@ -24,8 +25,7 @@ func CalculateEthereumDistributionICEBalance( return standardBalance } - //TODO: should this be fractional or natural? - return standardBalance / (float64(delta.Nanoseconds()) / float64(ethereumDistributionFrequencyMax.Nanoseconds())) + return standardBalance / float64(int64(delta/ethereumDistributionFrequencyMax)+1) } func IsEligibleForEthereumDistribution( @@ -37,7 +37,7 @@ func IsEligibleForEthereumDistribution( kycState model.KYCState, miningSessionDuration, ethereumDistributionFrequencyMin, ethereumDistributionFrequencyMax stdlibtime.Duration) bool { var countryAllowed bool - if _, countryDenied := distributionDeniedCountries[country]; country != "" && !countryDenied { + if _, countryDenied := distributionDeniedCountries[strings.ToLower(country)]; country != "" && !countryDenied { countryAllowed = true } @@ -49,6 +49,44 @@ func IsEligibleForEthereumDistribution( kycState.KYCStepPassedCorrectly(users.QuizKYCStep) } +//nolint:funlen // . +func IsEligibleForCoinDistributionNow(id int64, + now, lastEthereumCoinDistributionProcessedAt, coinDistributionStartDate *time.Time, + ethereumDistributionFrequencyMin, ethereumDistributionFrequencyMax stdlibtime.Duration) bool { + var ( + reservationIsTodayAndIsOutsideOfFirstCycle, secondReservationIsWithinFirstDistributionCycle bool + truncatedLastEthereumCoinDistributionProcessedAt *time.Time + ethereumDistributionStartDate = coinDistributionStartDate.Truncate(ethereumDistributionFrequencyMin) //nolint:lll // . + ethereumDistributionDayAfterStartDate = ethereumDistributionStartDate.Add(ethereumDistributionFrequencyMin) //nolint:lll // . + ethereumDistributionFirstCycleEndDate = ethereumDistributionDayAfterStartDate.Add(ethereumDistributionFrequencyMax) //nolint:lll // . + userReservedDayForEthereumCoinDistributionIndex = id % int64(ethereumDistributionFrequencyMax/ethereumDistributionFrequencyMin) //nolint:lll // . + today = now.Truncate(ethereumDistributionFrequencyMin) + neverDoneItBeforeAndTodayIsNotEthereumDistributionStartDateButReservationIsToday = userReservedDayForEthereumCoinDistributionIndex == int64((today.Sub(ethereumDistributionDayAfterStartDate)%ethereumDistributionFrequencyMax)/ethereumDistributionFrequencyMin) //nolint:lll // . + ) + if !lastEthereumCoinDistributionProcessedAt.IsNil() { + truncatedLastEthereumCoinDistributionProcessedAt = time.New(lastEthereumCoinDistributionProcessedAt.Truncate(ethereumDistributionFrequencyMin)) + reservationIsTodayAndIsOutsideOfFirstCycle = today.Sub(*truncatedLastEthereumCoinDistributionProcessedAt.Time)%ethereumDistributionFrequencyMax == 0 //nolint:lll // . + secondReservationIsWithinFirstDistributionCycle = userReservedDayForEthereumCoinDistributionIndex == int64((truncatedLastEthereumCoinDistributionProcessedAt.Add(ethereumDistributionFrequencyMin).Sub(ethereumDistributionDayAfterStartDate)%ethereumDistributionFrequencyMax)/ethereumDistributionFrequencyMin) //nolint:lll // . + } + switch { + case lastEthereumCoinDistributionProcessedAt.IsNil() && today.Equal(ethereumDistributionStartDate): + return true + case !lastEthereumCoinDistributionProcessedAt.IsNil() && today.Equal(ethereumDistributionStartDate): + return false + case lastEthereumCoinDistributionProcessedAt.IsNil() && !today.Equal(ethereumDistributionStartDate): + return neverDoneItBeforeAndTodayIsNotEthereumDistributionStartDateButReservationIsToday + case !lastEthereumCoinDistributionProcessedAt.IsNil() && !today.Equal(ethereumDistributionStartDate): + switch { + case today.Before(ethereumDistributionFirstCycleEndDate): + return secondReservationIsWithinFirstDistributionCycle + default: + return reservationIsTodayAndIsOutsideOfFirstCycle + } + default: + return false + } +} + func isEthereumAddressValid(ethAddress string) bool { if ethAddress == "" { return false diff --git a/coin-distribution/pending_review.go b/coin-distribution/pending_review.go index 0ebcafa..48b5b9b 100644 --- a/coin-distribution/pending_review.go +++ b/coin-distribution/pending_review.go @@ -52,7 +52,7 @@ func (r *repository) GetCoinDistributionsForReview(ctx context.Context, arg *Get AND %[1]v ORDER BY %[2]v LIMIT $2 OFFSET $1`, strings.Join(append(conditions, "1=1"), " AND "), strings.Join(append(arg.orderBy(), "internal_id asc"), ", ")) - result, err := storage.Select[struct { + result, err := storage.ExecMany[struct { *PendingReview Day stdlibtime.Time InternalID uint64 @@ -71,7 +71,7 @@ func (r *repository) GetCoinDistributionsForReview(ctx context.Context, arg *Get FROM coin_distributions_pending_review WHERE 1=1 AND %[1]v`, strings.Join(append(conditions, "1=1"), " AND ")) - total, err := storage.Get[struct { + total, err := storage.ExecOne[struct { Rows uint64 Ice uint64 }](ctx, r.db, sql, whereArgs...) @@ -172,7 +172,7 @@ func (r *repository) ReviewCoinDistributions(ctx context.Context, reviewerUserID switch strings.ToLower(decision) { case "approve": return storage.DoInTransaction(ctx, r.db, func(conn storage.QueryExecer) error { - if _, err := storage.Get[struct{ Bogus bool }](ctx, conn, sqlToCheckIfAnythingNeedsApproving); err != nil { + if _, err := storage.ExecOne[struct{ Bogus bool }](ctx, conn, sqlToCheckIfAnythingNeedsApproving); err != nil { if storage.IsErr(err, storage.ErrNotFound) { err = nil } @@ -188,7 +188,7 @@ func (r *repository) ReviewCoinDistributions(ctx context.Context, reviewerUserID }) case "deny": return storage.DoInTransaction(ctx, r.db, func(conn storage.QueryExecer) error { - if _, err := storage.Get[struct{ Bogus bool }](ctx, conn, sqlToCheckIfAnythingNeedsApproving); err != nil { + if _, err := storage.ExecOne[struct{ Bogus bool }](ctx, conn, sqlToCheckIfAnythingNeedsApproving); err != nil { if storage.IsErr(err, storage.ErrNotFound) { err = nil } @@ -210,6 +210,9 @@ func (r *repository) ReviewCoinDistributions(ctx context.Context, reviewerUserID } func (r *repository) CollectCoinDistributionsForReview(ctx context.Context, records []*ByEarnerForReview) error { + if len(records) == 0 { + return nil + } const columns = 9 values := make([]string, 0, len(records)) args := make([]any, 0, len(records)*columns) @@ -229,7 +232,12 @@ func (r *repository) CollectCoinDistributionsForReview(ctx context.Context, reco sql := fmt.Sprintf(`INSERT INTO coin_distributions_by_earner(created_at,day,internal_id,balance,username,referred_by_username,user_id,earner_user_id,eth_address) VALUES %v ON CONFLICT (day, user_id, earner_user_id) DO UPDATE - SET balance = EXCLUDED.balance`, strings.Join(values, ",\n")) + SET + created_at = EXCLUDED.created_at, + balance = EXCLUDED.balance, + username = EXCLUDED.username, + referred_by_username = EXCLUDED.referred_by_username, + eth_address = EXCLUDED.eth_address`, strings.Join(values, ",\n")) _, err := storage.Exec(ctx, r.db, sql, args...) return errors.Wrapf(err, "failed to insert into coin_distributions_by_earner %#v", records) @@ -246,32 +254,71 @@ func generateValuesSQLParams(index, columns int) string { func (r *repository) NotifyCoinDistributionCollectionCycleEnded(ctx context.Context) error { sql := `INSERT INTO global(key,value) - VALUES ('latest_collecting_date',$1), - ('new_coin_distributions_pending','true') + VALUES ('coin_collector_latest_collecting_date',$1), + ('new_coin_distributions_pending','true'), + ('coin_collector_forced_execution','false') ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value` _, err := storage.Exec(ctx, r.db, sql, time.Now().Format(stdlibtime.DateOnly)) - return errors.Wrap(err, "failed to update global.value for latest_collecting_date to now and mark new_coin_distributions_pending") + return errors.Wrap(err, "failed to update global.value for coin_collector_latest_collecting_date to now and mark new_coin_distributions_pending") } -func (r *repository) GetCollectorStatus(ctx context.Context) (latestCollectingDate *time.Time, collectorEnabled bool, err error) { - sql := `SELECT (SELECT value::timestamp FROM global WHERE key = $1) AS latest_collecting_date, - coalesce((SELECT value::bool FROM global WHERE key = $2),false) AS coin_collector_enabled` +func (r *repository) GetCollectorSettings(ctx context.Context) (*CollectorSettings, error) { + sql := `SELECT (SELECT value::timestamp FROM global WHERE key = $1) AS coin_collector_latest_collecting_date, + (SELECT value::timestamp FROM global WHERE key = $2) AS coin_collector_start_date, + (SELECT value::timestamp FROM global WHERE key = $3) AS coin_collector_end_date, + coalesce((SELECT value::bool FROM global WHERE key = $4),false) AS coin_collector_enabled, + coalesce((SELECT value::bool FROM global WHERE key = $5),false) AS coin_collector_forced_execution, + coalesce((SELECT value::int FROM global WHERE key = $6),0) AS coin_collector_min_mining_streaks_required, + coalesce((SELECT value::int FROM global WHERE key = $7),0) AS coin_collector_start_hour, + coalesce((SELECT value::int FROM global WHERE key = $8),0) AS coin_collector_min_balance_required, + coalesce((SELECT value FROM global WHERE key = $9),'') AS coin_collector_denied_countries` val, err := storage.ExecOne[struct { - LatestCollectingDate *time.Time - CoinCollectorEnabled bool - }](ctx, r.db, sql, "latest_collecting_date", "coin_collector_enabled") + CoinCollectorLatestCollectingDate *time.Time + CoinCollectorStartDate *time.Time + CoinCollectorEndDate *time.Time + CoinCollectorDeniedCountries string + CoinCollectorMinMiningStreaksRequired int + CoinCollectorStartHour int + CoinCollectorMinBalanceRequired int + CoinCollectorEnabled bool + CoinCollectorForcedExecution bool + }](ctx, r.db, sql, + "coin_collector_latest_collecting_date", + "coin_collector_start_date", + "coin_collector_end_date", + "coin_collector_enabled", + "coin_collector_forced_execution", + "coin_collector_min_mining_streaks_required", + "coin_collector_start_hour", + "coin_collector_min_balance_required", + "coin_collector_denied_countries") if err != nil { - return nil, false, errors.Wrap(err, "failed to select info about latest_collecting_date, coin_collector_enabled") + return nil, errors.Wrap(err, "failed to select info about GetCollectorSettings") + } + countries := strings.Split(strings.ToLower(val.CoinCollectorDeniedCountries), ",") + mappedCountries := make(map[string]struct{}, len(countries)) + for ix := range countries { + mappedCountries[countries[ix]] = struct{}{} } - return val.LatestCollectingDate, val.CoinCollectorEnabled, nil + return &CollectorSettings{ + DeniedCountries: mappedCountries, + LatestDate: val.CoinCollectorLatestCollectingDate, + StartDate: val.CoinCollectorStartDate, + EndDate: val.CoinCollectorEndDate, + MinBalanceRequired: float64(val.CoinCollectorMinBalanceRequired), + StartHour: val.CoinCollectorStartHour, + MinMiningStreaksRequired: uint64(val.CoinCollectorMinMiningStreaksRequired), + Enabled: val.CoinCollectorEnabled, + ForcedExecution: val.CoinCollectorForcedExecution, + }, nil } func tryPrepareCoinDistributionsForReview(ctx context.Context, db *storage.DB) error { return storage.DoInTransaction(ctx, db, func(conn storage.QueryExecer) error { - if _, err := storage.Get[struct{ Bogus bool }](ctx, conn, "SELECT true AS bogus FROM global WHERE key = 'new_coin_distributions_pending' FOR UPDATE SKIP LOCKED"); err != nil { + if _, err := storage.ExecOne[struct{ Bogus bool }](ctx, conn, "SELECT true AS bogus FROM global WHERE key = 'new_coin_distributions_pending' FOR UPDATE SKIP LOCKED"); err != nil { if storage.IsErr(err, storage.ErrNotFound) { err = nil } diff --git a/miner/.testdata/application.yaml b/miner/.testdata/application.yaml index 1dcb0b9..7bf89ae 100644 --- a/miner/.testdata/application.yaml +++ b/miner/.testdata/application.yaml @@ -8,4 +8,8 @@ tokenomics: miningSessionDuration: max: 24h extraBonuses: - duration: 24h \ No newline at end of file + duration: 24h +miner: + ethereumDistributionFrequency: + min: 24h + max: 672h \ No newline at end of file diff --git a/miner/contract.go b/miner/contract.go index fed9db9..af4198f 100644 --- a/miner/contract.go +++ b/miner/contract.go @@ -10,6 +10,7 @@ import ( stdlibtime "time" dwh "github.com/ice-blockchain/freezer/bookkeeper/storage" + coindistribution "github.com/ice-blockchain/freezer/coin-distribution" "github.com/ice-blockchain/freezer/model" "github.com/ice-blockchain/freezer/tokenomics" messagebroker "github.com/ice-blockchain/wintr/connectors/message_broker" @@ -55,6 +56,11 @@ type ( model.MiningSessionSoloEndedAtField model.MiningSessionSoloPreviouslyEndedAtField model.ExtraBonusStartedAtField + model.ReferralsCountChangeGuardUpdatedAtField + model.KYCState + model.MiningBlockchainAccountAddressField + model.CountryField + model.UsernameField model.LatestDeviceField model.UserIDField UpdatedUser @@ -67,7 +73,6 @@ type ( model.PreStakingAllocationField model.ExtraBonusField model.UTCOffsetField - model.ReferralsCountChangeGuardUpdatedAtField } UpdatedUser struct { // This is public only because we have to embed it, and it has to be if so. @@ -76,6 +81,11 @@ type ( model.ResurrectSoloUsedAtField model.ResurrectT0UsedAtField model.ResurrectTMinus1UsedAtField + model.SoloLastEthereumCoinDistributionProcessedAtField + model.ForT0LastEthereumCoinDistributionProcessedAtField + model.ForTMinus1LastEthereumCoinDistributionProcessedAtField + model.BalanceT1EthereumPendingField + model.BalanceT2EthereumPendingField model.DeserializedUsersKey model.IDT0Field model.IDTMinus1Field @@ -92,6 +102,12 @@ type ( model.BalanceT2Field model.BalanceForT0Field model.BalanceForTMinus1Field + model.BalanceSoloEthereumField + model.BalanceT0EthereumField + model.BalanceT1EthereumField + model.BalanceT2EthereumField + model.BalanceForT0EthereumField + model.BalanceForTMinus1EthereumField model.SlashingRateSoloField model.SlashingRateT0Field model.SlashingRateT1Field @@ -112,9 +128,21 @@ type ( model.MiningSessionSoloEndedAtField model.MiningSessionSoloPreviouslyEndedAtField model.ResurrectSoloUsedAtField + model.SoloLastEthereumCoinDistributionProcessedAtField model.UserIDField + model.CountryField + model.UsernameField + model.MiningBlockchainAccountAddressField + model.KYCState model.IDT0Field model.DeserializedUsersKey + model.BalanceTotalStandardField + model.BalanceSoloEthereumField + model.BalanceT0EthereumField + model.BalanceT1EthereumField + model.BalanceT2EthereumField + model.PreStakingAllocationField + model.PreStakingBonusField } referralCountGuardUpdatedUser struct { @@ -128,20 +156,30 @@ type ( } miner struct { - mb messagebroker.Client - db storage.DB - dwhClient dwh.Client - cancel context.CancelFunc - telemetry *telemetry - wg *sync.WaitGroup - extraBonusStartDate *time.Time - extraBonusIndicesDistribution map[uint16]map[uint16]uint16 + coinDistributionStartedSignaler chan struct{} + coinDistributionEndedSignaler chan struct{} + stopCoinDistributionCollectionWorkerManager chan struct{} + coinDistributionWorkerMX *sync.Mutex + coinDistributionRepository coindistribution.Repository + mb messagebroker.Client + db storage.DB + dwhClient dwh.Client + cancel context.CancelFunc + telemetry *telemetry + wg *sync.WaitGroup + extraBonusStartDate *time.Time + extraBonusIndicesDistribution map[uint16]map[uint16]uint16 } config struct { - disableAdvancedTeam *atomic.Pointer[[]string] - tokenomics.Config `mapstructure:",squash"` //nolint:tagliatelle // Nope. - Workers int64 `yaml:"workers"` - BatchSize int64 `yaml:"batchSize"` - Development bool `yaml:"development"` + disableAdvancedTeam *atomic.Pointer[[]string] + coinDistributionCollectorSettings *atomic.Pointer[coindistribution.CollectorSettings] + tokenomics.Config `mapstructure:",squash"` //nolint:tagliatelle // Nope. + EthereumDistributionFrequency struct { + Min stdlibtime.Duration `yaml:"min"` + Max stdlibtime.Duration `yaml:"max"` + } `yaml:"ethereumDistributionFrequency" mapstructure:"ethereumDistributionFrequency"` + Workers int64 `yaml:"workers"` + BatchSize int64 `yaml:"batchSize"` + Development bool `yaml:"development"` } ) diff --git a/miner/ethereum_distribution.go b/miner/ethereum_distribution.go new file mode 100644 index 0000000..db6d2f1 --- /dev/null +++ b/miner/ethereum_distribution.go @@ -0,0 +1,420 @@ +// SPDX-License-Identifier: ice License 1.0 + +package miner + +import ( + "context" + "sync" + stdlibtime "time" + + "github.com/pkg/errors" + + coindistribution "github.com/ice-blockchain/freezer/coin-distribution" + "github.com/ice-blockchain/freezer/model" + "github.com/ice-blockchain/freezer/tokenomics" + "github.com/ice-blockchain/wintr/log" + "github.com/ice-blockchain/wintr/time" +) + +const ( + ethereumDistributionDryRunModeEnabled = true +) + +func (ref *referral) username() string { + if ref != nil && ref.Username != "" { + return ref.Username + } + + return "icenetwork/bogus" +} + +func (ref *referral) isEligibleForSelfForEthereumDistribution(now *time.Time) bool { + coinDistributionCollectorSettings := cfg.coinDistributionCollectorSettings.Load() + + return ref != nil && + ref.ID != 0 && + coindistribution.IsEligibleForCoinDistributionNow( + ref.ID, + now, + ref.SoloLastEthereumCoinDistributionProcessedAt, + cfg.coinDistributionCollectorSettings.Load().StartDate, + cfg.EthereumDistributionFrequency.Min, + cfg.EthereumDistributionFrequency.Max) && + coindistribution.IsEligibleForEthereumDistribution( + coinDistributionCollectorSettings.MinMiningStreaksRequired, + ref.BalanceTotalStandard-ref.BalanceSoloEthereum-ref.BalanceT0Ethereum-ref.BalanceT1Ethereum-ref.BalanceT2Ethereum, + coinDistributionCollectorSettings.MinBalanceRequired, + ref.MiningBlockchainAccountAddress, + ref.Country, + coinDistributionCollectorSettings.DeniedCountries, + now, + ref.MiningSessionSoloStartedAt, + ref.MiningSessionSoloEndedAt, + coinDistributionCollectorSettings.EndDate, + ref.KYCState, + cfg.MiningSessionDuration.Max, + cfg.EthereumDistributionFrequency.Min, + cfg.EthereumDistributionFrequency.Max) +} + +func (ref *referral) isEligibleForReferralForEthereumDistribution(now *time.Time) bool { + coinDistributionCollectorSettings := cfg.coinDistributionCollectorSettings.Load() + + return ref != nil && + ref.ID != 0 && + coindistribution.IsEligibleForEthereumDistribution( + 0, + 0, + 0, + "skip", + ref.Country, + coinDistributionCollectorSettings.DeniedCountries, + now, + ref.MiningSessionSoloStartedAt, + ref.MiningSessionSoloEndedAt, + coinDistributionCollectorSettings.EndDate, + ref.KYCState, + cfg.MiningSessionDuration.Max, + cfg.EthereumDistributionFrequency.Min, + cfg.EthereumDistributionFrequency.Max) +} + +func (u *user) isEligibleForSelfForEthereumDistribution(now *time.Time) bool { + coinDistributionCollectorSettings := cfg.coinDistributionCollectorSettings.Load() + + return u != nil && + u.ID != 0 && + coindistribution.IsEligibleForCoinDistributionNow( + u.ID, + now, + u.SoloLastEthereumCoinDistributionProcessedAt, + cfg.coinDistributionCollectorSettings.Load().StartDate, + cfg.EthereumDistributionFrequency.Min, + cfg.EthereumDistributionFrequency.Max) && + coindistribution.IsEligibleForEthereumDistribution( + coinDistributionCollectorSettings.MinMiningStreaksRequired, + u.BalanceTotalStandard-u.BalanceSoloEthereum-u.BalanceT0Ethereum-u.BalanceT1Ethereum-u.BalanceT2Ethereum, + coinDistributionCollectorSettings.MinBalanceRequired, + u.MiningBlockchainAccountAddress, + u.Country, + coinDistributionCollectorSettings.DeniedCountries, + now, + u.MiningSessionSoloStartedAt, + u.MiningSessionSoloEndedAt, + coinDistributionCollectorSettings.EndDate, + u.KYCState, + cfg.MiningSessionDuration.Max, + cfg.EthereumDistributionFrequency.Min, + cfg.EthereumDistributionFrequency.Max) +} + +func (u *user) isEligibleForT0ForEthereumDistribution(now *time.Time) bool { + return u != nil && + u.ID != 0 && + coindistribution.IsEligibleForCoinDistributionNow( + u.ID, + now, + u.ForT0LastEthereumCoinDistributionProcessedAt, + cfg.coinDistributionCollectorSettings.Load().StartDate, + cfg.EthereumDistributionFrequency.Min, + cfg.EthereumDistributionFrequency.Max) && + u.isEligibleForReferralForEthereumDistribution(now) +} + +func (u *user) isEligibleForTMinus1ForEthereumDistribution(now *time.Time) bool { + return u != nil && + u.ID != 0 && + coindistribution.IsEligibleForCoinDistributionNow( + u.ID, + now, + u.ForTMinus1LastEthereumCoinDistributionProcessedAt, + cfg.coinDistributionCollectorSettings.Load().StartDate, + cfg.EthereumDistributionFrequency.Min, + cfg.EthereumDistributionFrequency.Max) && + u.isEligibleForReferralForEthereumDistribution(now) +} + +func (u *user) isEligibleForReferralForEthereumDistribution(now *time.Time) bool { + coinDistributionCollectorSettings := cfg.coinDistributionCollectorSettings.Load() + return coindistribution.IsEligibleForEthereumDistribution( + 0, + 0, + 0, + "skip", + u.Country, + coinDistributionCollectorSettings.DeniedCountries, + now, + u.MiningSessionSoloStartedAt, + u.MiningSessionSoloEndedAt, + coinDistributionCollectorSettings.EndDate, + u.KYCState, + cfg.MiningSessionDuration.Max, + cfg.EthereumDistributionFrequency.Min, + cfg.EthereumDistributionFrequency.Max) +} + +//nolint:funlen // . +func (u *user) processEthereumCoinDistribution( + now *time.Time, t0, tMinus1 *referral, +) (records []*coindistribution.ByEarnerForReview, balanceDistributedForT0, balanceDistributedForTMinus1 float64) { + if !isCoinDistributionCollectorEnabled(now) { + if u.BalanceT1EthereumPending != nil { + u.BalanceT1Ethereum += float64(*u.BalanceT1EthereumPending) + } + if u.BalanceT2EthereumPending != nil { + u.BalanceT2Ethereum += float64(*u.BalanceT2EthereumPending) + } + u.BalanceT1EthereumPending = new(model.FlexibleFloat64) + u.BalanceT2EthereumPending = new(model.FlexibleFloat64) + u.SoloLastEthereumCoinDistributionProcessedAt = nil + u.ForT0LastEthereumCoinDistributionProcessedAt = nil + u.ForTMinus1LastEthereumCoinDistributionProcessedAt = nil + + return nil, 0, 0 + } + u.BalanceT1EthereumPending = nil + u.BalanceT2EthereumPending = nil + var ( + t0CD *coindistribution.ByEarnerForReview + forT0CD *coindistribution.ByEarnerForReview + forTMinus1CD *coindistribution.ByEarnerForReview + soloCD = &coindistribution.ByEarnerForReview{ + CreatedAt: now, + Username: u.Username, + ReferredByUsername: t0.username(), + UserID: u.UserID, + EarnerUserID: u.UserID, + EthAddress: u.MiningBlockchainAccountAddress, + InternalID: u.ID, + Balance: 0, + } + ) + records = append(make([]*coindistribution.ByEarnerForReview, 0, 1+1+1+1), soloCD) + if t0 != nil { + t0CD = &coindistribution.ByEarnerForReview{ + CreatedAt: now, + UserID: u.UserID, + EarnerUserID: t0.UserID, + Balance: 0, + } + forT0CD = &coindistribution.ByEarnerForReview{ + CreatedAt: now, + UserID: t0.UserID, + EarnerUserID: u.UserID, + Balance: 0, + } + records = append(records, t0CD, forT0CD) + } + if tMinus1 != nil { + forTMinus1CD = &coindistribution.ByEarnerForReview{ + CreatedAt: now, + UserID: tMinus1.UserID, + EarnerUserID: u.UserID, + Balance: 0, + } + records = append(records, forTMinus1CD) + } + + if u.isEligibleForSelfForEthereumDistribution(now) { + // Amount I've earned for myself. + soloCD.Balance = u.processEthereumCoinDistributionForSolo(now) + + if t0.isEligibleForReferralForEthereumDistribution(now) { + // Amount my T0 earned for me. + t0CD.Balance = u.processEthereumCoinDistributionForT0(now) + } + + if !ethereumDistributionDryRunModeEnabled { + u.SoloLastEthereumCoinDistributionProcessedAt = now + } + } else { + u.SoloLastEthereumCoinDistributionProcessedAt = nil + } + + if u.isEligibleForT0ForEthereumDistribution(now) && t0.isEligibleForSelfForEthereumDistribution(now) { + // Amount I've earned for my T0. + balanceDistributedForT0 = u.processEthereumCoinDistributionForForT0(t0, now) + forT0CD.Balance = balanceDistributedForT0 + + if !ethereumDistributionDryRunModeEnabled { + u.ForT0LastEthereumCoinDistributionProcessedAt = now + } else { + balanceDistributedForT0 = 0 + } + } else { + u.ForT0LastEthereumCoinDistributionProcessedAt = nil + } + + if u.isEligibleForTMinus1ForEthereumDistribution(now) && tMinus1.isEligibleForSelfForEthereumDistribution(now) { + // Amount I've earned for my T-1. + balanceDistributedForTMinus1 = u.processEthereumCoinDistributionForForTMinus1(tMinus1, now) + forTMinus1CD.Balance = balanceDistributedForTMinus1 + + if !ethereumDistributionDryRunModeEnabled { + u.ForTMinus1LastEthereumCoinDistributionProcessedAt = now + } else { + balanceDistributedForTMinus1 = 0 + } + } else { + u.ForTMinus1LastEthereumCoinDistributionProcessedAt = nil + } + + return records, balanceDistributedForT0, balanceDistributedForTMinus1 +} + +func (u *user) processEthereumCoinDistributionForSolo(now *time.Time) float64 { + standard, _ := tokenomics.ApplyPreStaking(u.BalanceSolo, u.PreStakingAllocation, u.PreStakingBonus) + ethIce := coindistribution.CalculateEthereumDistributionICEBalance(standard-u.BalanceSoloEthereum, cfg.EthereumDistributionFrequency.Min, cfg.EthereumDistributionFrequency.Max, now, cfg.coinDistributionCollectorSettings.Load().EndDate) //nolint:lll // . + if !ethereumDistributionDryRunModeEnabled { + u.BalanceSoloEthereum += ethIce + } + + return ethIce +} + +func (u *user) processEthereumCoinDistributionForT0(now *time.Time) float64 { + standard, _ := tokenomics.ApplyPreStaking(u.BalanceT0, u.PreStakingAllocation, u.PreStakingBonus) + ethIce := coindistribution.CalculateEthereumDistributionICEBalance(standard-u.BalanceT0Ethereum, cfg.EthereumDistributionFrequency.Min, cfg.EthereumDistributionFrequency.Max, now, cfg.coinDistributionCollectorSettings.Load().EndDate) //nolint:lll // . + if !ethereumDistributionDryRunModeEnabled { + u.BalanceT0Ethereum += ethIce + } + + return ethIce +} + +// The double `For` is intended, cuz it's ForXX, where XX can be Solo/T0/ForT1/ForTMinus1. +func (u *user) processEthereumCoinDistributionForForT0(t0 *referral, now *time.Time) float64 { + standard, _ := tokenomics.ApplyPreStaking(u.BalanceForT0, t0.PreStakingAllocation, t0.PreStakingBonus) + ethIce := coindistribution.CalculateEthereumDistributionICEBalance(standard-u.BalanceForT0Ethereum, cfg.EthereumDistributionFrequency.Min, cfg.EthereumDistributionFrequency.Max, now, cfg.coinDistributionCollectorSettings.Load().EndDate) //nolint:lll // . + if !ethereumDistributionDryRunModeEnabled { + u.BalanceForT0Ethereum += ethIce + } + + return ethIce +} + +// The double `For` is intended, cuz it's ForXX, where XX can be Solo/T0/ForT1/ForTMinus1. +func (u *user) processEthereumCoinDistributionForForTMinus1(tMinus1 *referral, now *time.Time) float64 { + standard, _ := tokenomics.ApplyPreStaking(u.BalanceForTMinus1, tMinus1.PreStakingAllocation, tMinus1.PreStakingBonus) + ethIce := coindistribution.CalculateEthereumDistributionICEBalance(standard-u.BalanceForTMinus1Ethereum, cfg.EthereumDistributionFrequency.Min, cfg.EthereumDistributionFrequency.Max, now, cfg.coinDistributionCollectorSettings.Load().EndDate) //nolint:lll // . + if !ethereumDistributionDryRunModeEnabled { + u.BalanceForTMinus1Ethereum += ethIce + } + + return ethIce +} + +func isCoinDistributionCollectorEnabled(now *time.Time) bool { + coinDistributionCollectorSettings := cfg.coinDistributionCollectorSettings.Load() + + return coinDistributionCollectorSettings.Enabled && + (coinDistributionCollectorSettings.ForcedExecution || + (now.Hour() >= coinDistributionCollectorSettings.StartHour && + now.After(*coinDistributionCollectorSettings.StartDate.Time) && + (coinDistributionCollectorSettings.LatestDate.IsNil() || + !now.Truncate(cfg.EthereumDistributionFrequency.Min).Equal(coinDistributionCollectorSettings.LatestDate.Truncate(cfg.EthereumDistributionFrequency.Min))))) +} + +func (m *miner) startCoinDistributionCollectionWorkerManager(ctx context.Context) { + defer func() { m.stopCoinDistributionCollectionWorkerManager <- struct{}{} }() + + for ctx.Err() == nil { + select { + case <-m.coinDistributionStartedSignaler: + log.Info("started collecting coin distributions") + m.coinDistributionWorkerMX.Lock() + workersStarted := int64(1) + outerStarted: + for ctx.Err() == nil { + select { + case <-m.coinDistributionStartedSignaler: + workersStarted++ + if workersStarted == cfg.Workers { + break outerStarted + } + case <-ctx.Done(): + m.coinDistributionWorkerMX.Unlock() + return + } + } + workersEnded := int64(0) + outerEnded: + for ctx.Err() == nil { + select { + case <-m.coinDistributionEndedSignaler: + workersEnded++ + if workersEnded == cfg.Workers { + break outerEnded + } + case <-ctx.Done(): + m.coinDistributionWorkerMX.Unlock() + return + } + } + m.notifyCoinDistributionCollectionCycleEnded(ctx) + m.coinDistributionWorkerMX.Unlock() + log.Info("stopped collecting coin distributions") + case <-ctx.Done(): + return + } + } +} + +func (m *miner) notifyCoinDistributionCollectionCycleEnded(ctx context.Context) { + for ctx.Err() == nil { + reqCtx, cancel := context.WithTimeout(ctx, requestDeadline) + if err := m.coinDistributionRepository.NotifyCoinDistributionCollectionCycleEnded(reqCtx); err != nil { + cancel() + log.Error(errors.Wrap(err, "failed to NotifyCoinDistributionCollectionCycleEnded")) + } else { + cancel() + + break + } + } + for ctx.Err() == nil { + reqCtx, cancel := context.WithTimeout(ctx, requestDeadline) + if settings, err := m.coinDistributionRepository.GetCollectorSettings(reqCtx); err != nil { + cancel() + log.Error(errors.Wrap(err, "failed to GetCollectorSettings")) + } else { + cancel() + cfg.coinDistributionCollectorSettings.Store(settings) + + break + } + } +} + +func (m *miner) mustInitCoinDistributionCollector(ctx context.Context) { + settings, err := m.coinDistributionRepository.GetCollectorSettings(ctx) + log.Panic(err) + cfg.coinDistributionCollectorSettings.Store(settings) + m.coinDistributionStartedSignaler = make(chan struct{}, cfg.Workers) + m.coinDistributionEndedSignaler = make(chan struct{}, cfg.Workers) + m.stopCoinDistributionCollectionWorkerManager = make(chan struct{}) + m.coinDistributionWorkerMX = new(sync.Mutex) + + go m.startCoinDistributionCollectionWorkerManager(ctx) + go m.startSynchronizingCoinDistributionCollectorSettings(ctx) +} + +func (m *miner) startSynchronizingCoinDistributionCollectorSettings(ctx context.Context) { + ticker := stdlibtime.NewTicker(30 * stdlibtime.Second) //nolint:gomnd // . + defer ticker.Stop() + + for { + select { + case <-ticker.C: + reqCtx, cancel := context.WithTimeout(ctx, requestDeadline) + if settings, err := m.coinDistributionRepository.GetCollectorSettings(reqCtx); err != nil { + log.Error(errors.Wrap(err, "failed to GetCollectorSettings")) + } else { + cfg.coinDistributionCollectorSettings.Store(settings) + } + cancel() + case <-ctx.Done(): + return + } + } +} diff --git a/miner/metrics.go b/miner/metrics.go index d8315ec..253e1b9 100644 --- a/miner/metrics.go +++ b/miner/metrics.go @@ -9,6 +9,8 @@ import ( stdlibtime "time" "github.com/rcrowley/go-metrics" + + "github.com/ice-blockchain/wintr/log" ) func init() { @@ -16,9 +18,10 @@ func init() { } type telemetry struct { - registry metrics.Registry - steps [8]string - cfg config + registry metrics.Registry + steps [9]string + currentStepName string + cfg config } func (t *telemetry) mustInit(cfg config) *telemetry { @@ -28,17 +31,15 @@ func (t *telemetry) mustInit(cfg config) *telemetry { ) t.cfg = cfg t.registry = metrics.NewRegistry() - t.steps = [8]string{"mine[full iteration]", "mine", "get_users", "get_referrals", "send_messages", "get_history", "insert_history", "update_users"} + t.steps = [9]string{"mine[full iteration]", "mine", "get_users", "get_referrals", "send_messages", "get_history", "insert_history", "collect_coin_distributions", "update_users"} //nolint:lll // . for ix := range &t.steps { if ix > 1 { t.steps[ix] = fmt.Sprintf("[%v]mine.%v", ix-1, t.steps[ix]) } - if err := t.registry.Register(t.steps[ix], metrics.NewCustomTimer(metrics.NewHistogram(metrics.NewExpDecaySample(reservoirSize, decayAlpha)), metrics.NewMeter())); err != nil { //nolint:lll // . - panic(err) - } + log.Panic(t.registry.Register(t.steps[ix], metrics.NewCustomTimer(metrics.NewHistogram(metrics.NewExpDecaySample(reservoirSize, decayAlpha)), metrics.NewMeter()))) //nolint:lll // . } - go metrics.LogScaled(t.registry, 60*stdlibtime.Minute, stdlibtime.Millisecond, t) + go metrics.LogScaled(t.registry, 15*stdlibtime.Minute, stdlibtime.Millisecond, t) //nolint:gomnd // . return t } @@ -114,6 +115,10 @@ func (t *telemetry) shouldSynchronizeBalanceFunc(workerNumber, totalBatches, ite } } -func (*telemetry) Printf(format string, args ...interface{}) { - stdlog.Printf(strings.ReplaceAll(format, "timer ", ""), args...) +func (t *telemetry) Printf(format string, args ...interface{}) { + const prefixMarker = "timer " + if strings.HasPrefix(format, prefixMarker) { + t.currentStepName = fmt.Sprintf(format[len(prefixMarker):strings.IndexRune(format, '\n')], args...) + } + stdlog.Printf("["+t.currentStepName+"]"+strings.ReplaceAll(format, prefixMarker, ""), args...) } diff --git a/miner/metrics_test.go b/miner/metrics_test.go index 96dacc2..37929a2 100644 --- a/miner/metrics_test.go +++ b/miner/metrics_test.go @@ -172,6 +172,8 @@ func slowTelemetry(workers int64) *telemetry { tel.collectElapsed(4, stdlibtime.Now().Add(-20*stdlibtime.Second)) tel.collectElapsed(5, stdlibtime.Now().Add(-10*stdlibtime.Second)) tel.collectElapsed(6, stdlibtime.Now().Add(-1*stdlibtime.Second)) + tel.collectElapsed(7, stdlibtime.Now().Add(-1*stdlibtime.Second)) + tel.collectElapsed(8, stdlibtime.Now().Add(-1*stdlibtime.Second)) return tel } diff --git a/miner/miner.go b/miner/miner.go index c2841f0..f8101c8 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -18,6 +18,7 @@ import ( balancesynchronizer "github.com/ice-blockchain/freezer/balance-synchronizer" dwh "github.com/ice-blockchain/freezer/bookkeeper/storage" + coindistribution "github.com/ice-blockchain/freezer/coin-distribution" extrabonusnotifier "github.com/ice-blockchain/freezer/extra-bonus-notifier" "github.com/ice-blockchain/freezer/model" "github.com/ice-blockchain/freezer/tokenomics" @@ -36,17 +37,19 @@ func init() { func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client { mi := &miner{ - mb: messagebroker.MustConnect(context.Background(), parentApplicationYamlKey), - db: storage.MustConnect(context.Background(), parentApplicationYamlKey, int(cfg.Workers)), - dwhClient: dwh.MustConnect(context.Background(), applicationYamlKey), - wg: new(sync.WaitGroup), - telemetry: new(telemetry).mustInit(cfg), + coinDistributionRepository: coindistribution.NewRepository(ctx, cancel), + mb: messagebroker.MustConnect(context.Background(), parentApplicationYamlKey), + db: storage.MustConnect(context.Background(), parentApplicationYamlKey, int(cfg.Workers)), + dwhClient: dwh.MustConnect(context.Background(), applicationYamlKey), + wg: new(sync.WaitGroup), + telemetry: new(telemetry).mustInit(cfg), } go mi.startDisableAdvancedTeamCfgSyncer(ctx) mi.wg.Add(int(cfg.Workers)) mi.cancel = cancel mi.extraBonusStartDate = extrabonusnotifier.MustGetExtraBonusStartDate(ctx, mi.db) mi.extraBonusIndicesDistribution = extrabonusnotifier.MustGetExtraBonusIndicesDistribution(ctx, mi.db) + mi.mustInitCoinDistributionCollector(ctx) for workerNumber := int64(0); workerNumber < cfg.Workers; workerNumber++ { go func(wn int64) { @@ -61,15 +64,20 @@ func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client { func (m *miner) Close() error { m.cancel() m.wg.Wait() + <-m.stopCoinDistributionCollectionWorkerManager return multierror.Append( errors.Wrap(m.mb.Close(), "failed to close mb"), errors.Wrap(m.db.Close(), "failed to close db"), errors.Wrap(m.dwhClient.Close(), "failed to close dwh"), + errors.Wrap(m.coinDistributionRepository.Close(), "failed to close coinDistributionRepository"), ).ErrorOrNil() } func (m *miner) CheckHealth(ctx context.Context) error { + if err := m.coinDistributionRepository.CheckHealth(ctx); err != nil { + return err + } if err := m.dwhClient.Ping(ctx); err != nil { return err } @@ -132,7 +140,9 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { t0Referrals, tMinus1Referrals = make(map[int64]*referral, batchSize), make(map[int64]*referral, batchSize) t1ReferralsToIncrementActiveValue, t2ReferralsToIncrementActiveValue = make(map[int64]int32, batchSize), make(map[int64]int32, batchSize) t1ReferralsThatStoppedMining, t2ReferralsThatStoppedMining = make(map[int64]uint32, batchSize), make(map[int64]uint32, batchSize) + balanceT1EthereumIncr, balanceT2EthereumIncr = make(map[int64]float64, batchSize), make(map[int64]float64, batchSize) referralsThatStoppedMining = make([]*referralThatStoppedMining, 0, batchSize) + coinDistributions = make([]*coindistribution.ByEarnerForReview, 0, 4*batchSize) msgResponder = make(chan error, 3*batchSize) msgs = make([]*messagebroker.Message, 0, 3*batchSize) errs = make([]error, 0, 3*batchSize) @@ -144,10 +154,24 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { userGlobalRanks = make([]redis.Z, 0, batchSize) historyColumns, historyInsertMetadata = dwh.InsertDDL(int(batchSize)) shouldSynchronizeBalanceFunc = func(batchNumberArg uint64) bool { return false } + startedCoinDistributionCollecting = isCoinDistributionCollectorEnabled(now) ) + if startedCoinDistributionCollecting { + m.coinDistributionStartedSignaler <- struct{}{} + } resetVars := func(success bool) { if success && len(userKeys) == int(batchSize) && len(userResults) == 0 { go m.telemetry.collectElapsed(0, *lastIterationStartedAt.Time) + if !startedCoinDistributionCollecting && iteration%2 == 1 && isCoinDistributionCollectorEnabled(now) { + m.coinDistributionStartedSignaler <- struct{}{} + startedCoinDistributionCollecting = true + } + if startedCoinDistributionCollecting && iteration%2 == 0 && isCoinDistributionCollectorEnabled(now) { + m.coinDistributionEndedSignaler <- struct{}{} + m.coinDistributionWorkerMX.Lock() + m.coinDistributionWorkerMX.Unlock() + startedCoinDistributionCollecting = true + } lastIterationStartedAt = time.Now() iteration++ if batchNumber < 1 { @@ -175,6 +199,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { histories = histories[:0] userGlobalRanks = userGlobalRanks[:0] referralsThatStoppedMining = referralsThatStoppedMining[:0] + coinDistributions = coinDistributions[:0] for k := range t0Referrals { delete(t0Referrals, k) } @@ -193,6 +218,12 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { for k := range t2ReferralsToIncrementActiveValue { delete(t2ReferralsToIncrementActiveValue, k) } + for k := range balanceT1EthereumIncr { + delete(balanceT1EthereumIncr, k) + } + for k := range balanceT2EthereumIncr { + delete(balanceT2EthereumIncr, k) + } } for ctx.Err() == nil { /****************************************************************************************************************************************************** @@ -328,8 +359,17 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { } if usr.IDTMinus1 != t0Ref.IDT0 { updatedUser.IDTMinus1 = t0Ref.IDT0 + tMinus1Ref = tMinus1Referrals[updatedUser.IDTMinus1] } } + userCoinDistributions, balanceDistributedForT0, balanceDistributedForTMinus1 := updatedUser.processEthereumCoinDistribution(now, t0Ref, tMinus1Ref) + coinDistributions = append(coinDistributions, userCoinDistributions...) + if balanceDistributedForT0 > 0 { + balanceT1EthereumIncr[t0Ref.ID] += balanceDistributedForT0 + } + if balanceDistributedForTMinus1 > 0 { + balanceT2EthereumIncr[tMinus1Ref.ID] += balanceDistributedForTMinus1 + } updatedUsers = append(updatedUsers, &updatedUser.UpdatedUser) } else { extraBonusOnlyUpdatedUsr := extrabonusnotifier.UpdatedUser{ @@ -421,7 +461,25 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { } /****************************************************************************************************************************************************** - 7. Persisting the mining progress for the users. + 7. Processing Ethereum Coin Distributions for eligible users. + ******************************************************************************************************************************************************/ + + before = time.Now() + reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) + if err := m.coinDistributionRepository.CollectCoinDistributionsForReview(reqCtx, coinDistributions); err != nil { + log.Error(errors.Wrapf(err, "[miner] failed to CollectCoinDistributionsForReview for batchNumber:%v,workerNumber:%v", batchNumber, workerNumber)) + reqCancel() + resetVars(false) + + continue + } + reqCancel() + if len(coinDistributions) > 0 { + go m.telemetry.collectElapsed(7, *before.Time) + } + + /****************************************************************************************************************************************************** + 8. Persisting the mining progress for the users. ******************************************************************************************************************************************************/ for _, usr := range referralsThatStoppedMining { @@ -434,8 +492,10 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { } var pipeliner redis.Pipeliner - if len(t1ReferralsToIncrementActiveValue)+len(t2ReferralsToIncrementActiveValue)+len(referralsCountGuardOnlyUpdatedUsers)+len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks) > 0 { + var transactional bool + if len(balanceT1EthereumIncr)+len(balanceT2EthereumIncr)+len(t1ReferralsToIncrementActiveValue)+len(t2ReferralsToIncrementActiveValue)+len(referralsCountGuardOnlyUpdatedUsers)+len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks) > 0 { pipeliner = m.db.TxPipeline() + transactional = true } else { pipeliner = m.db.Pipeline() } @@ -488,6 +548,22 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { return err } } + for idT0, amount := range balanceT1EthereumIncr { + if amount == 0 { + continue + } + if err := pipeliner.HIncrByFloat(reqCtx, model.SerializedUsersKey(idT0), "balance_t1_ethereum_pending", amount).Err(); err != nil { + return err + } + } + for idTMinus1, amount := range balanceT2EthereumIncr { + if amount == 0 { + continue + } + if err := pipeliner.HIncrByFloat(reqCtx, model.SerializedUsersKey(idTMinus1), "balance_t2_ethereum_pending", amount).Err(); err != nil { + return err + } + } return nil }); err != nil { @@ -513,9 +589,8 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { continue } } - - if len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(updatedUsers)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks) > 0 { - go m.telemetry.collectElapsed(7, *before.Time) + if transactional || len(updatedUsers) > 0 { + go m.telemetry.collectElapsed(8, *before.Time) } batchNumber++ diff --git a/miner/referral_lifecycle.go b/miner/referral_lifecycle.go index 2b892ce..b7c761f 100644 --- a/miner/referral_lifecycle.go +++ b/miner/referral_lifecycle.go @@ -10,15 +10,15 @@ func changeT0AndTMinus1Referrals(usr *user) (IDT0Changed, IDTMinus1Changed bool) if usr.IDT0 <= 0 { usr.IDT0 *= -1 usr.IDTMinus1 *= -1 - usr.BalanceT0, usr.BalanceForT0, usr.SlashingRateT0, usr.SlashingRateForT0 = 0, 0, 0, 0 - usr.BalanceForTMinus1, usr.SlashingRateForTMinus1 = 0, 0 + usr.BalanceT0, usr.BalanceForT0, usr.BalanceForT0Ethereum, usr.SlashingRateT0, usr.SlashingRateForT0 = 0, 0, 0, 0, 0 + usr.BalanceForTMinus1, usr.BalanceForTMinus1Ethereum, usr.SlashingRateForTMinus1 = 0, 0, 0 usr.ResurrectT0UsedAt, usr.ResurrectTMinus1UsedAt = new(time.Time), new(time.Time) if usr.IDT0 != 0 { IDT0Changed = true } } else if usr.IDTMinus1 <= 0 { usr.IDTMinus1 *= -1 - usr.BalanceForTMinus1, usr.SlashingRateForTMinus1 = 0, 0 + usr.BalanceForTMinus1, usr.BalanceForTMinus1Ethereum, usr.SlashingRateForTMinus1 = 0, 0, 0 usr.ResurrectTMinus1UsedAt = new(time.Time) if usr.IDTMinus1 != 0 { IDTMinus1Changed = true diff --git a/model/model.go b/model/model.go index 2c88e1f..629a08a 100644 --- a/model/model.go +++ b/model/model.go @@ -201,7 +201,7 @@ type ( BalanceT1Ethereum float64 `redis:"balance_t1_ethereum"` } BalanceT1EthereumPendingField struct { - BalanceT1EthereumPending float64 `redis:"balance_t1_ethereum_pending"` + BalanceT1EthereumPending *FlexibleFloat64 `redis:"balance_t1_ethereum_pending,omitempty"` } BalanceT2Field struct { BalanceT2 float64 `redis:"balance_t2"` @@ -210,7 +210,7 @@ type ( BalanceT2Ethereum float64 `redis:"balance_t2_ethereum"` } BalanceT2EthereumPendingField struct { - BalanceT2EthereumPending float64 `redis:"balance_t2_ethereum_pending"` + BalanceT2EthereumPending *FlexibleFloat64 `redis:"balance_t2_ethereum_pending,omitempty"` } BalanceForT0Field struct { BalanceForT0 float64 `redis:"balance_for_t0"` @@ -301,6 +301,36 @@ type ( } ) +type ( + FlexibleFloat64 float64 +) + +func (ff *FlexibleFloat64) UnmarshalBinary(data []byte) error { + return ff.UnmarshalText(data) +} + +func (ff *FlexibleFloat64) MarshalBinary() ([]byte, error) { + return ff.MarshalText() +} + +func (ff *FlexibleFloat64) MarshalText() ([]byte, error) { + if ff == nil { + return nil, nil + } + + return []byte(fmt.Sprint(*ff)), nil +} + +func (ff *FlexibleFloat64) UnmarshalText(text []byte) error { + val, err := strconv.ParseFloat(string(text), 64) + if err != nil { + return errors.Wrapf(err, "failed to ParseFloat `%v`", text) + } + *ff = FlexibleFloat64(val) + + return nil +} + func (k *DeserializedUsersKey) Key() string { if k == nil || k.ID == 0 { return ""