From d4d0b2eb1d3f9d35fc262bcfce37aefa696cef53 Mon Sep 17 00:00:00 2001 From: ice-myles <96409608+ice-myles@users.noreply.github.com> Date: Fri, 15 Dec 2023 11:44:32 +0300 Subject: [PATCH] Revert balances recalculations commits + moving t2 balance from old tminus1 to new tminus1 (#39) --- bookkeeper/storage/storage_test.go | 38 --- miner/.testdata/DDL.sql | 30 --- miner/contract.go | 24 -- miner/miner.go | 267 +------------------ miner/recalculate_balance.go | 396 ----------------------------- model/model.go | 63 ----- tokenomics/adoption.go | 4 +- 7 files changed, 12 insertions(+), 810 deletions(-) delete mode 100644 miner/.testdata/DDL.sql delete mode 100644 miner/recalculate_balance.go diff --git a/bookkeeper/storage/storage_test.go b/bookkeeper/storage/storage_test.go index 7b69b69..13babb3 100644 --- a/bookkeeper/storage/storage_test.go +++ b/bookkeeper/storage/storage_test.go @@ -4,7 +4,6 @@ package storage import ( "context" - "fmt" "sort" "testing" stdlibtime "time" @@ -116,40 +115,3 @@ func TestStorage(t *testing.T) { sort.SliceStable(h2, func(ii, jj int) bool { return h2[ii].CreatedAt.Before(*h2[jj].CreatedAt.Time) }) assert.EqualValues(t, []*BalanceHistory{}, h2) } - -func TestStorage_SelectAdjustUserInformation_NoError_On_LongValues(t *testing.T) { - cl := MustConnect(context.Background(), "self") - defer func() { - if err := recover(); err != nil { - cl.Close() - panic(err) - } - cl.Close() - }() - require.NoError(t, cl.Ping(context.Background())) - - limit := int64(1000) - offset := int64(0) - userIDs := make([]string, 0, 1_000_000) - for ix := 1_000_000; ix < 2_000_000; ix++ { - userIDs = append(userIDs, fmt.Sprint(ix)) - } - _, err := cl.GetAdjustUserInformation(context.Background(), userIDs, limit, offset) - require.NoError(t, err) - - userIDs = nil - userIDs = make([]string, 0, 1_000_000) - for ix := 10_000_000; ix < 11_000_000; ix++ { - userIDs = append(userIDs, fmt.Sprint(ix)) - } - _, err = cl.GetAdjustUserInformation(context.Background(), userIDs, limit, offset) - require.NoError(t, err) - - userIDs = nil - userIDs = make([]string, 0, 1_000_000) - for ix := 54_000_000; ix < 55_000_000; ix++ { - userIDs = append(userIDs, fmt.Sprint(ix)) - } - _, err = cl.GetAdjustUserInformation(context.Background(), userIDs, limit, offset) - require.NoError(t, err) -} diff --git a/miner/.testdata/DDL.sql b/miner/.testdata/DDL.sql deleted file mode 100644 index 42204d4..0000000 --- a/miner/.testdata/DDL.sql +++ /dev/null @@ -1,30 +0,0 @@ --- SPDX-License-Identifier: ice License 1.0 ---************************************************************************************************************************************ --- balance_recalculation_metrics -DROP TABLE IF EXISTS balance_recalculation_metrics; -CREATE TABLE IF NOT EXISTS balance_recalculation_metrics ( - started_at timestamp NOT NULL, - ended_at timestamp NOT NULL, - t1_balance_positive DOUBLE PRECISION NOT NULL, - t1_balance_negative DOUBLE PRECISION NOT NULL, - t2_balance_positive DOUBLE PRECISION NOT NULL, - t2_balance_negative DOUBLE PRECISION NOT NULL, - t1_active_counts_positive BIGINT NOT NULL, - t1_active_counts_negative BIGINT NOT NULL, - t2_active_counts_positive BIGINT NOT NULL, - t2_active_counts_negative BIGINT NOT NULL, - iterations_num BIGINT NOT NULL, - affected_users BIGINT NOT NULL, - worker BIGINT NOT NULL PRIMARY KEY - ) WITH (fillfactor = 70); ---************************************************************************************************************************************ --- balance_recalculation_dry_run -DROP TABLE IF EXISTS balance_recalculation_dry_run; -CREATE TABLE IF NOT EXISTS balance_recalculation_dry_run ( - diff_t1_balance DOUBLE PRECISION NOT NULL, - diff_t2_balance DOUBLE PRECISION NOT NULL, - diff_t1_active_counts DOUBLE PRECISION NOT NULL, - diff_t2_active_counts DOUBLE PRECISION NOT NULL, - user_id text PRIMARY KEY - ) WITH (fillfactor = 70); ---************************************************************************************************************************************ diff --git a/miner/contract.go b/miner/contract.go index 9bd76a6..5fd4dc7 100644 --- a/miner/contract.go +++ b/miner/contract.go @@ -4,7 +4,6 @@ package miner import ( "context" - _ "embed" "io" "sync" "sync/atomic" @@ -14,7 +13,6 @@ import ( "github.com/ice-blockchain/freezer/model" "github.com/ice-blockchain/freezer/tokenomics" messagebroker "github.com/ice-blockchain/wintr/connectors/message_broker" - storagePG "github.com/ice-blockchain/wintr/connectors/storage/v2" "github.com/ice-blockchain/wintr/connectors/storage/v3" "github.com/ice-blockchain/wintr/time" ) @@ -49,9 +47,6 @@ const ( var ( //nolint:gochecknoglobals // Singleton & global config mounted only during bootstrap. cfg config - - //go:embed .testdata/DDL.sql - eskimoDDL string ) type ( @@ -63,7 +58,6 @@ type ( model.ExtraBonusStartedAtField model.LatestDeviceField model.UserIDField - model.UsernameField UpdatedUser model.BalanceSoloPendingField model.BalanceT1PendingField @@ -114,22 +108,6 @@ type ( model.IDTMinus1Field } - backupUserUpdated struct { - model.BalancesBackupUsedAtField - model.UserIDField - model.BalanceT1Field - model.BalanceT2Field - model.SlashingRateT1Field - model.SlashingRateT2Field - model.FirstRecalculatedBalanceT1Field - model.FirstRecalculatedBalanceT2Field - model.DeserializedBackupUsersKey - model.ActiveT1ReferralsField - model.ActiveT2ReferralsField - model.FirstRecalculatedActiveT1ReferralsField - model.FirstRecalculatedActiveT2ReferralsField - } - referral struct { model.MiningSessionSoloStartedAtField model.MiningSessionSoloEndedAtField @@ -160,14 +138,12 @@ type ( miner struct { mb messagebroker.Client db storage.DB - dbPG *storagePG.DB dwhClient dwh.Client cancel context.CancelFunc telemetry *telemetry wg *sync.WaitGroup extraBonusStartDate *time.Time extraBonusIndicesDistribution map[uint16]map[uint16]uint16 - recalculationBalanceStartDate *time.Time } config struct { disableAdvancedTeam *atomic.Pointer[[]string] diff --git a/miner/miner.go b/miner/miner.go index 037d84d..c2841f0 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -23,7 +23,6 @@ import ( "github.com/ice-blockchain/freezer/tokenomics" appCfg "github.com/ice-blockchain/wintr/config" messagebroker "github.com/ice-blockchain/wintr/connectors/message_broker" - storagePG "github.com/ice-blockchain/wintr/connectors/storage/v2" "github.com/ice-blockchain/wintr/connectors/storage/v3" "github.com/ice-blockchain/wintr/log" "github.com/ice-blockchain/wintr/time" @@ -39,7 +38,6 @@ func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client { mi := &miner{ mb: messagebroker.MustConnect(context.Background(), parentApplicationYamlKey), db: storage.MustConnect(context.Background(), parentApplicationYamlKey, int(cfg.Workers)), - dbPG: storagePG.MustConnect(ctx, eskimoDDL, applicationYamlKey), dwhClient: dwh.MustConnect(context.Background(), applicationYamlKey), wg: new(sync.WaitGroup), telemetry: new(telemetry).mustInit(cfg), @@ -49,9 +47,6 @@ func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client { mi.cancel = cancel mi.extraBonusStartDate = extrabonusnotifier.MustGetExtraBonusStartDate(ctx, mi.db) mi.extraBonusIndicesDistribution = extrabonusnotifier.MustGetExtraBonusIndicesDistribution(ctx, mi.db) - if balanceBugFixEnabled { - mi.recalculationBalanceStartDate = mustGetRecalculationBalancesStartDate(ctx, mi.db) - } for workerNumber := int64(0); workerNumber < cfg.Workers; workerNumber++ { go func(wn int64) { @@ -70,7 +65,6 @@ func (m *miner) Close() error { return multierror.Append( errors.Wrap(m.mb.Close(), "failed to close mb"), errors.Wrap(m.db.Close(), "failed to close db"), - errors.Wrap(m.dbPG.Close(), "failed to close db pg"), errors.Wrap(m.dwhClient.Close(), "failed to close dwh"), ).ErrorOrNil() } @@ -116,38 +110,6 @@ func (m *miner) checkDBHealth(ctx context.Context) error { return nil } -func mustGetRecalculationBalancesStartDate(ctx context.Context, db storage.DB) (recalculationBalancesStartDate *time.Time) { - recalculationBalancesStartDateString, err := db.Get(ctx, "recalculation_balances_start_date").Result() - if err != nil && errors.Is(err, redis.Nil) { - err = nil - } - log.Panic(errors.Wrap(err, "failed to get recalculation_balances_start_date")) - if recalculationBalancesStartDateString != "" { - recalculationBalancesStartDate = new(time.Time) - log.Panic(errors.Wrapf(recalculationBalancesStartDate.UnmarshalText([]byte(recalculationBalancesStartDateString)), "failed to parse recalculation_balances_start_date `%v`", recalculationBalancesStartDateString)) //nolint:lll // . - recalculationBalancesStartDate = time.New(recalculationBalancesStartDate.UTC()) - - return - } - recalculationBalancesStartDate = time.Now() - set, sErr := db.SetNX(ctx, "recalculation_balances_start_date", recalculationBalancesStartDate, 0).Result() - log.Panic(errors.Wrap(sErr, "failed to set recalculation_balances_start_date")) - if !set { - return mustGetRecalculationBalancesStartDate(ctx, db) - } - - return recalculationBalancesStartDate -} - -func mustGetBalancesBackupMode(ctx context.Context, db storage.DB) (result bool, err error) { - balancesBackupModeString, err := db.Get(ctx, "balances_backup_mode").Result() - if err != nil && errors.Is(err, redis.Nil) { - err = nil - } - - return balancesBackupModeString == "true", err -} - func (m *miner) mine(ctx context.Context, workerNumber int64) { dwhClient := dwh.MustConnect(context.Background(), applicationYamlKey) defer func() { @@ -165,9 +127,8 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { currentAdoption = m.getAdoption(ctx, m.db, workerNumber) workers = cfg.Workers batchSize = cfg.BatchSize - metrics = new(balanceRecalculationMetrics) - userKeys, userBackupKeys, userHistoryKeys, referralKeys = make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, 2*batchSize) - userResults, backupUserResults, referralResults = make([]*user, 0, batchSize), make([]*backupUserUpdated, 0, batchSize), make([]*referral, 0, 2*batchSize) + userKeys, userHistoryKeys, referralKeys = make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, 2*batchSize) + userResults, referralResults = make([]*user, 0, batchSize), make([]*referral, 0, 2*batchSize) t0Referrals, tMinus1Referrals = make(map[int64]*referral, batchSize), make(map[int64]*referral, batchSize) t1ReferralsToIncrementActiveValue, t2ReferralsToIncrementActiveValue = make(map[int64]int32, batchSize), make(map[int64]int32, batchSize) t1ReferralsThatStoppedMining, t2ReferralsThatStoppedMining = make(map[int64]uint32, batchSize), make(map[int64]uint32, batchSize) @@ -181,14 +142,8 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { referralsUpdated = make([]*referralUpdated, 0, batchSize) histories = make([]*model.User, 0, batchSize) userGlobalRanks = make([]redis.Z, 0, batchSize) - balanceRecalculationDryRunItems = make([]*balanceRecalculationDryRun, 0, batchSize) - backupedUsers = make(map[int64]*backupUserUpdated, batchSize) - backupUsersUpdated = make([]*backupUserUpdated, 0, batchSize) - recalculatedTiersBalancesUsers = make(map[int64]*user, batchSize) historyColumns, historyInsertMetadata = dwh.InsertDDL(int(batchSize)) shouldSynchronizeBalanceFunc = func(batchNumberArg uint64) bool { return false } - recalculationInfo *recalculationData - allAdoptions []*tokenomics.Adoption[float64] ) resetVars := func(success bool) { if success && len(userKeys) == int(batchSize) && len(userResults) == 0 { @@ -199,21 +154,6 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { panic("unexpected batch number: " + fmt.Sprint(batchNumber)) } totalBatches = uint64(batchNumber - 1) - if balanceBugFixEnabled { - metricsExists, err := m.getBalanceRecalculationMetrics(ctx, workerNumber) - if err != nil { - log.Error(err, "can't get balance recalculation metrics for worker:", workerNumber) - } - if metricsExists == nil && err == nil { - metrics.IterationsNum = int64(totalBatches) - metrics.EndedAt = time.Now() - metrics.Worker = workerNumber - if err := m.insertBalanceRecalculationMetrics(ctx, metrics); err != nil { - log.Error(err, "can't insert balance recalculation metrics for worker:", workerNumber) - } - metrics.reset() - } - } if totalBatches != 0 && iteration > 2 { shouldSynchronizeBalanceFunc = m.telemetry.shouldSynchronizeBalanceFunc(uint64(workerNumber), totalBatches, iteration) } @@ -225,19 +165,16 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { if batchNumber == 0 || currentAdoption == nil { currentAdoption = m.getAdoption(ctx, m.db, workerNumber) } - userKeys, userBackupKeys, userHistoryKeys, referralKeys = userKeys[:0], userBackupKeys[:0], userHistoryKeys[:0], referralKeys[:0] + userKeys, userHistoryKeys, referralKeys = userKeys[:0], userHistoryKeys[:0], referralKeys[:0] userResults, referralResults = userResults[:0], referralResults[:0] msgs, errs = msgs[:0], errs[:0] updatedUsers = updatedUsers[:0] extraBonusOnlyUpdatedUsers = extraBonusOnlyUpdatedUsers[:0] referralsCountGuardOnlyUpdatedUsers = referralsCountGuardOnlyUpdatedUsers[:0] referralsUpdated = referralsUpdated[:0] - balanceRecalculationDryRunItems = balanceRecalculationDryRunItems[:0] histories = histories[:0] userGlobalRanks = userGlobalRanks[:0] referralsThatStoppedMining = referralsThatStoppedMining[:0] - allAdoptions = allAdoptions[:0] - backupUsersUpdated = backupUsersUpdated[:0] for k := range t0Referrals { delete(t0Referrals, k) } @@ -256,14 +193,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { for k := range t2ReferralsToIncrementActiveValue { delete(t2ReferralsToIncrementActiveValue, k) } - for k := range backupedUsers { - delete(backupedUsers, k) - } - for k := range recalculatedTiersBalancesUsers { - delete(recalculatedTiersBalancesUsers, k) - } } - metrics.StartedAt = time.Now() for ctx.Err() == nil { /****************************************************************************************************************************************************** 1. Fetching a new batch of users. @@ -271,7 +201,6 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { if len(userKeys) == 0 { for ix := batchNumber * batchSize; ix < (batchNumber+1)*batchSize; ix++ { userKeys = append(userKeys, model.SerializedUsersKey((workers*ix)+workerNumber)) - userBackupKeys = append(userBackupKeys, model.SerializedBackupUsersKey((workers*ix)+workerNumber)) } } before := time.Now() @@ -287,30 +216,6 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { if len(userKeys) > 0 { go m.telemetry.collectElapsed(2, *before.Time) } - if balanceBugFixEnabled { - reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) - if err := storage.Bind[backupUserUpdated](reqCtx, m.db, userBackupKeys, &backupUserResults); err != nil { - log.Error(errors.Wrapf(err, "[miner] failed to get backuped users for batchNumber:%v,workerNumber:%v", batchNumber, workerNumber)) - reqCancel() - now = time.Now() - - continue - } - } - reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) - balanceBackupMode, err := mustGetBalancesBackupMode(reqCtx, m.db) - if err != nil { - log.Error(errors.Wrapf(err, "[miner] failed to get backup flag for batchNumber:%v,workerNumber:%v", batchNumber, workerNumber)) - reqCancel() - - continue - } - reqCancel() - if balanceBugFixEnabled { - for _, usr := range backupUserResults { - backupedUsers[usr.ID] = usr - } - } /****************************************************************************************************************************************************** 2. Fetching T0 & T-1 referrals of the fetched users. @@ -366,152 +271,11 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { t0Referrals[ref.ID] = ref } } - if balanceBugFixEnabled { - if !balanceBackupMode { - reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) - recalculationInfo, err = m.gatherReferralsInformation(reqCtx, userResults) - if err != nil { - log.Error(errors.New("tiers diff balances error"), workerNumber, err) - } - reqCancel() - reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) - allAdoptions, err = tokenomics.GetAllAdoptions[float64](reqCtx, m.db) - if err != nil { - log.Error(errors.New("can't get all adoptions"), workerNumber, err) - } - reqCancel() - } - } shouldSynchronizeBalance := shouldSynchronizeBalanceFunc(uint64(batchNumber)) for _, usr := range userResults { if usr.UserID == "" { continue } - backupedUsr, backupExists := backupedUsers[usr.ID] - if balanceBugFixEnabled { - if balanceBackupMode { - if backupExists && backupedUsr.BalancesBackupUsedAt.IsNil() { - diffT1ActiveValue := backupedUsr.ActiveT1Referrals - usr.ActiveT1Referrals - diffT2ActiveValue := backupedUsr.ActiveT2Referrals - usr.ActiveT2Referrals - if diffT1ActiveValue < 0 && diffT1ActiveValue*-1 > usr.ActiveT1Referrals { - diffT1ActiveValue = -usr.ActiveT1Referrals - } - if diffT2ActiveValue < 0 && diffT2ActiveValue*-1 > usr.ActiveT2Referrals { - diffT2ActiveValue = -usr.ActiveT2Referrals - } - - t1ReferralsToIncrementActiveValue[usr.ID] += diffT1ActiveValue - t2ReferralsToIncrementActiveValue[usr.ID] += diffT2ActiveValue - - usr.BalanceT1 = backupedUsr.BalanceT1 - usr.BalanceT2 = backupedUsr.BalanceT2 - usr.SlashingRateT1 = backupedUsr.SlashingRateT1 - usr.SlashingRateT2 = backupedUsr.SlashingRateT2 - usr.ActiveT1Referrals = backupedUsr.ActiveT1Referrals - usr.ActiveT2Referrals = backupedUsr.ActiveT2Referrals - - backupedUsr.BalancesBackupUsedAt = time.Now() - backupUsersUpdated = append(backupUsersUpdated, backupedUsr) - } - } else { - if !backupExists && recalculationInfo != nil { - if _, ok := recalculationInfo.NeedToBeRecalculatedUsers[usr.UserID]; ok { - balanceT1, balanceT2 := 0.0, 0.0 - if recalculationInfo.T1Referrals != nil { - if referralsT1, ok := recalculationInfo.T1Referrals[usr.UserID]; ok { - for _, ref := range referralsT1 { - if _, ok := recalculationInfo.Referrals[ref]; ok { - balanceT1 += recalculationInfo.Referrals[ref].BalanceForT0 - } - } - } - } - if recalculationInfo.T2Referrals != nil { - if referralsT2, ok := recalculationInfo.T2Referrals[usr.UserID]; ok { - for _, ref := range referralsT2 { - if _, ok := recalculationInfo.Referrals[ref]; ok { - balanceT2 += recalculationInfo.Referrals[ref].BalanceForTMinus1 - } - } - } - } - - diffT1ActiveValue := recalculationInfo.T1ActiveCounts[usr.UserID] - usr.ActiveT1Referrals - diffT2ActiveValue := recalculationInfo.T2ActiveCounts[usr.UserID] - usr.ActiveT2Referrals - - oldBalanceT1 := usr.BalanceT1 - oldBalanceT2 := usr.BalanceT2 - - if diffT1ActiveValue < 0 && diffT1ActiveValue*-1 > usr.ActiveT1Referrals { - diffT1ActiveValue = -usr.ActiveT1Referrals - } - if diffT2ActiveValue < 0 && diffT2ActiveValue*-1 > usr.ActiveT2Referrals { - diffT2ActiveValue = -usr.ActiveT2Referrals - } - - t1ReferralsToIncrementActiveValue[usr.ID] += diffT1ActiveValue - t2ReferralsToIncrementActiveValue[usr.ID] += diffT2ActiveValue - - usr.BalanceT1 = balanceT1 - usr.BalanceT2 = balanceT2 - usr.SlashingRateT1 = 0 - usr.SlashingRateT2 = 0 - usr.ActiveT1Referrals = recalculationInfo.T1ActiveCounts[usr.UserID] - usr.ActiveT2Referrals = recalculationInfo.T2ActiveCounts[usr.UserID] - - t1BalanceDiff := balanceT1 - oldBalanceT1 - t2BalanceDiff := balanceT2 - oldBalanceT2 - - metrics.AffectedUsers += 1 - - if t1BalanceDiff >= 0 { - metrics.T1BalancePositive += t1BalanceDiff - } else { - metrics.T1BalanceNegative += t1BalanceDiff - } - if t2BalanceDiff >= 0 { - metrics.T2BalancePositive += t2BalanceDiff - } else { - metrics.T2BalanceNegative += t2BalanceDiff - } - if diffT1ActiveValue < 0 { - metrics.T1ActiveCountsNegative += int64(diffT1ActiveValue) - } else { - metrics.T1ActiveCountsPositive += int64(diffT1ActiveValue) - } - if diffT2ActiveValue < 0 { - metrics.T2ActiveCountsNegative += int64(diffT2ActiveValue) - } else { - metrics.T2ActiveCountsPositive += int64(diffT2ActiveValue) - } - - balanceRecalculationDryRunItems = append(balanceRecalculationDryRunItems, &balanceRecalculationDryRun{ - T1BalanceDiff: t1BalanceDiff, - T2BalanceDiff: t2BalanceDiff, - T1ActiveCountsDiff: diffT1ActiveValue, - T2ActiveCountsDiff: diffT2ActiveValue, - UserID: usr.UserID, - }) - - backupUsersUpdated = append(backupUsersUpdated, &backupUserUpdated{ - DeserializedBackupUsersKey: model.DeserializedBackupUsersKey{ID: usr.ID}, - UserIDField: usr.UserIDField, - BalanceT1Field: model.BalanceT1Field{BalanceT1: oldBalanceT1}, - BalanceT2Field: model.BalanceT2Field{BalanceT2: oldBalanceT2}, - SlashingRateT1Field: model.SlashingRateT1Field{SlashingRateT1: usr.SlashingRateT1}, - SlashingRateT2Field: model.SlashingRateT2Field{SlashingRateT2: usr.SlashingRateT2}, - ActiveT1ReferralsField: model.ActiveT1ReferralsField{ActiveT1Referrals: usr.ActiveT1Referrals}, - ActiveT2ReferralsField: model.ActiveT2ReferralsField{ActiveT2Referrals: usr.ActiveT2Referrals}, - FirstRecalculatedBalanceT1Field: model.FirstRecalculatedBalanceT1Field{FirstRecalculatedBalanceT1: balanceT1}, - FirstRecalculatedBalanceT2Field: model.FirstRecalculatedBalanceT2Field{FirstRecalculatedBalanceT2: balanceT2}, - FirstRecalculatedActiveT1ReferralsField: model.FirstRecalculatedActiveT1ReferralsField{FirstRecalculatedActiveT1Referrals: usr.ActiveT1Referrals + diffT1ActiveValue}, - FirstRecalculatedActiveT2ReferralsField: model.FirstRecalculatedActiveT2ReferralsField{FirstRecalculatedActiveT2Referrals: usr.ActiveT2Referrals + diffT2ActiveValue}, - }) - } - } - } - } - var t0Ref, tMinus1Ref *referral if usr.IDT0 > 0 { t0Ref = t0Referrals[usr.IDT0] @@ -541,13 +305,11 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { updatedUser.ExtraBonusDaysClaimNotAvailable = 0 updatedUser.ExtraBonusLastClaimAvailableAt = nil } - if !balanceBugFixEnabled || balanceBackupMode { - if userStoppedMining := didUserStoppedMining(now, usr); userStoppedMining != nil { - referralsCountGuardOnlyUpdatedUsers = append(referralsCountGuardOnlyUpdatedUsers, userStoppedMining) - } - if userStoppedMining := didReferralJustStopMining(now, usr, t0Ref, tMinus1Ref); userStoppedMining != nil { - referralsThatStoppedMining = append(referralsThatStoppedMining, userStoppedMining) - } + if userStoppedMining := didUserStoppedMining(now, usr); userStoppedMining != nil { + referralsCountGuardOnlyUpdatedUsers = append(referralsCountGuardOnlyUpdatedUsers, userStoppedMining) + } + if userStoppedMining := didReferralJustStopMining(now, usr, t0Ref, tMinus1Ref); userStoppedMining != nil { + referralsThatStoppedMining = append(referralsThatStoppedMining, userStoppedMining) } if dayOffStarted := didANewDayOffJustStart(now, usr); dayOffStarted != nil { msgs = append(msgs, dayOffStartedMessage(reqCtx, dayOffStarted)) @@ -672,11 +434,12 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { } var pipeliner redis.Pipeliner - if len(t1ReferralsToIncrementActiveValue)+len(t2ReferralsToIncrementActiveValue)+len(referralsCountGuardOnlyUpdatedUsers)+len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks)+len(backupUsersUpdated) > 0 { + if len(t1ReferralsToIncrementActiveValue)+len(t2ReferralsToIncrementActiveValue)+len(referralsCountGuardOnlyUpdatedUsers)+len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks) > 0 { pipeliner = m.db.TxPipeline() } else { pipeliner = m.db.Pipeline() } + before = time.Now() reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) if responses, err := pipeliner.Pipelined(reqCtx, func(pipeliner redis.Pipeliner) error { @@ -720,13 +483,6 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { return err } } - if balanceBugFixEnabled { - for _, value := range backupUsersUpdated { - if err := pipeliner.HSet(reqCtx, value.Key(), storage.SerializeValue(value)...).Err(); err != nil { - return err - } - } - } if len(userGlobalRanks) > 0 { if err := pipeliner.ZAdd(reqCtx, "top_miners", userGlobalRanks...).Err(); err != nil { return err @@ -757,9 +513,6 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { continue } } - if err := m.insertBalanceRecalculationDryRunBatch(ctx, balanceRecalculationDryRunItems); err != nil { - log.Error(err, fmt.Sprintf("can't insert balance recalcualtion dry run information for users:%#v, workerNumber:%v", userResults, workerNumber)) - } if len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(updatedUsers)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks) > 0 { go m.telemetry.collectElapsed(7, *before.Time) diff --git a/miner/recalculate_balance.go b/miner/recalculate_balance.go deleted file mode 100644 index db3e610..0000000 --- a/miner/recalculate_balance.go +++ /dev/null @@ -1,396 +0,0 @@ -// SPDX-License-Identifier: ice License 1.0 - -package miner - -import ( - "context" - "fmt" - "strings" - - "github.com/pkg/errors" - "github.com/redis/go-redis/v9" - - "github.com/ice-blockchain/eskimo/users" - "github.com/ice-blockchain/freezer/model" - storagePG "github.com/ice-blockchain/wintr/connectors/storage/v2" - "github.com/ice-blockchain/wintr/connectors/storage/v3" - "github.com/ice-blockchain/wintr/log" - "github.com/ice-blockchain/wintr/time" -) - -const ( - maxLimit int64 = 10000 -) - -type ( - pgUser struct { - Active *users.NotExpired - ID, ReferredBy string - ReferralType string - } - pgUserCreated struct { - CreatedAt *time.Time - ID string - } - - recalculationData struct { - Referrals map[string]*recalculateReferral - NeedToBeRecalculatedUsers map[string]struct{} - T1Referrals, T2Referrals map[string][]string - T1ActiveCounts, T2ActiveCounts map[string]int32 - } - - balanceRecalculationMetrics struct { - StartedAt *time.Time - EndedAt *time.Time - T1BalancePositive float64 - T1BalanceNegative float64 - T2BalancePositive float64 - T2BalanceNegative float64 - T1ActiveCountsPositive int64 - T1ActiveCountsNegative int64 - T2ActiveCountsPositive int64 - T2ActiveCountsNegative int64 - IterationsNum int64 - AffectedUsers int64 - Worker int64 - } - - balanceRecalculationDryRun struct { - T1BalanceDiff float64 - T2BalanceDiff float64 - T1ActiveCountsDiff int32 - T2ActiveCountsDiff int32 - UserID string - } -) - -func (m *miner) getUsers(ctx context.Context, users []*user) (map[string]*pgUserCreated, error) { - var ( - userIDs []string - offset int64 = 0 - result = make(map[string]*pgUserCreated, len(users)) - ) - for _, val := range users { - userIDs = append(userIDs, val.UserID) - } - for { - sql := `SELECT - id, - created_at - FROM users - WHERE id = ANY($1) - LIMIT $2 OFFSET $3` - rows, err := storagePG.Select[pgUserCreated](ctx, m.dbPG, sql, userIDs, maxLimit, offset) - if err != nil { - return nil, errors.Wrapf(err, "can't get users from pg for: %#v", userIDs) - } - if len(rows) == 0 { - break - } - offset += maxLimit - for _, row := range rows { - result[row.ID] = row - } - } - - return result, nil -} - -func (m *miner) collectTiers(ctx context.Context, needToBeRecalculatedUsers map[string]struct{}) ( - referralsUserKeys []string, t1Referrals, t2Referrals map[string][]string, t1ActiveCounts, t2ActiveCounts map[string]int32, err error, -) { - var ( - userIDs = make([]string, 0, len(needToBeRecalculatedUsers)) - offset int64 = 0 - now = time.Now() - ) - for key := range needToBeRecalculatedUsers { - userIDs = append(userIDs, key) - } - t1ActiveCounts, t2ActiveCounts = make(map[string]int32, len(needToBeRecalculatedUsers)), make(map[string]int32, len(needToBeRecalculatedUsers)) - t1Referrals, t2Referrals = make(map[string][]string), make(map[string][]string) - referralsUserKeysMap := make(map[string]struct{}) - for { - sql := `SELECT * FROM( - SELECT - id, - referred_by, - 'T1' AS referral_type, - (CASE - WHEN COALESCE(last_mining_ended_at, to_timestamp(1)) > $1 - THEN COALESCE(last_mining_ended_at, to_timestamp(1)) - ELSE NULL - END) AS active - FROM users - WHERE referred_by = ANY($2) - AND referred_by != id - AND username != id - UNION ALL - SELECT - t2.id AS id, - t0.id AS referred_by, - 'T2' AS referral_type, - (CASE - WHEN COALESCE(t2.last_mining_ended_at, to_timestamp(1)) > $1 - THEN COALESCE(t2.last_mining_ended_at, to_timestamp(1)) - ELSE NULL - END) AS active - FROM users t0 - JOIN users t1 - ON t1.referred_by = t0.id - JOIN users t2 - ON t2.referred_by = t1.id - WHERE t0.id = ANY($2) - AND t2.referred_by != t2.id - AND t2.username != t2.id - ) X - LIMIT $3 OFFSET $4` - rows, err := storagePG.Select[pgUser](ctx, m.dbPG, sql, now.Time, userIDs, maxLimit, offset) - if err != nil { - return nil, nil, nil, nil, nil, errors.Wrap(err, "can't get referrals from pg for showing actual data") - } - if len(rows) == 0 { - break - } - offset += maxLimit - for _, row := range rows { - if row.ReferredBy != "bogus" && row.ReferredBy != "icenetwork" && row.ID != "bogus" && row.ID != "icenetwork" { - if row.ReferralType == "T1" { - t1Referrals[row.ReferredBy] = append(t1Referrals[row.ReferredBy], row.ID) - referralsUserKeysMap[model.SerializedUsersKey(row.ID)] = struct{}{} - if row.Active != nil && *row.Active { - t1ActiveCounts[row.ReferredBy]++ - } - } else if row.ReferralType == "T2" { - t2Referrals[row.ReferredBy] = append(t2Referrals[row.ReferredBy], row.ID) - referralsUserKeysMap[model.SerializedUsersKey(row.ID)] = struct{}{} - if row.Active != nil && *row.Active { - t2ActiveCounts[row.ReferredBy]++ - } - } else { - log.Panic("wrong tier type") - } - } - } - } - for key, _ := range referralsUserKeysMap { - referralsUserKeys = append(referralsUserKeys, key) - } - - return referralsUserKeys, t1Referrals, t2Referrals, t1ActiveCounts, t2ActiveCounts, nil -} - -func getInternalIDsBatch(ctx context.Context, db storage.DB, keys ...string) ([]string, error) { - if cmdResults, err := db.Pipelined(ctx, func(pipeliner redis.Pipeliner) error { - if err := pipeliner.MGet(ctx, keys...).Err(); err != nil { - return err - } - - return nil - }); err != nil { - return nil, err - } else { - results := make([]string, 0, len(cmdResults)) - for _, cmdResult := range cmdResults { - sliceResult := cmdResult.(*redis.SliceCmd) - for _, val := range sliceResult.Val() { - if val == nil { - continue - } - results = append(results, model.SerializedUsersKey(val.(string))) - } - } - - return results, nil - } -} - -func getInternalIDs(ctx context.Context, db storage.DB, keys ...string) (result []string, err error) { - var batchKeys []string - for _, key := range keys { - batchKeys = append(batchKeys, key) - if len(batchKeys) >= int(cfg.BatchSize) { - res, err := getInternalIDsBatch(ctx, db, batchKeys...) - if err != nil { - return nil, err - } - result = append(result, res...) - batchKeys = batchKeys[:0] - } - } - if len(batchKeys) > 0 { - res, err := getInternalIDsBatch(ctx, db, batchKeys...) - if err != nil { - return nil, err - } - result = append(result, res...) - } - - return result, nil -} - -func getReferralsBatch(ctx context.Context, db storage.DB, keys ...string) ([]*recalculateReferral, error) { - referrals := make([]*recalculateReferral, 0) - if err := storage.Bind[recalculateReferral](ctx, db, keys, &referrals); err != nil { - return nil, errors.Wrapf(err, "failed to get referrals for:%v", keys) - } - - return referrals, nil -} - -func getReferrals(ctx context.Context, db storage.DB, keys ...string) (result []*recalculateReferral, err error) { - var batchKeys []string - for _, key := range keys { - batchKeys = append(batchKeys, key) - if len(batchKeys) >= int(cfg.BatchSize) { - referrals, err := getReferralsBatch(ctx, db, batchKeys...) - if err != nil { - return nil, err - } - result = append(result, referrals...) - batchKeys = batchKeys[:0] - } - } - if len(batchKeys) > 0 { - referrals, err := getReferralsBatch(ctx, db, batchKeys...) - if err != nil { - return nil, err - } - result = append(result, referrals...) - } - - return result, nil -} - -func (m *miner) gatherReferralsInformation(ctx context.Context, users []*user) (history *recalculationData, err error) { - if len(users) == 0 { - return nil, nil - } - needToBeRecalculatedUsers := make(map[string]struct{}, len(users)) - usrs, err := m.getUsers(ctx, users) - if err != nil { - return nil, errors.Wrapf(err, "can't get CreatedAt information for users:%#v", usrs) - } - for _, usr := range users { - if usr.UserID == "" || usr.Username == "" { - continue - } - if _, ok := usrs[usr.UserID]; ok { - if usrs[usr.UserID].CreatedAt == nil || usrs[usr.UserID].CreatedAt.After(*m.recalculationBalanceStartDate.Time) { - continue - } - } - needToBeRecalculatedUsers[usr.UserID] = struct{}{} - } - if len(needToBeRecalculatedUsers) == 0 { - return nil, nil - } - referralsUserKeys, t1Referrals, t2Referrals, t1ActiveCounts, t2ActiveCounts, err := m.collectTiers(ctx, needToBeRecalculatedUsers) - if err != nil { - return nil, errors.Wrap(err, "can't get active users for users") - } - if len(t1Referrals) == 0 && len(t2Referrals) == 0 { - return nil, nil - } - internalIDKeys, err := getInternalIDs(ctx, m.db, referralsUserKeys...) - if err != nil { - return nil, errors.Wrapf(err, "can't get internal ids for:%#v", referralsUserKeys) - } - referrals, err := getReferrals(ctx, m.db, internalIDKeys...) - if err != nil { - return nil, errors.Wrapf(err, "failed to get referrals for:%v", users) - } - referralsCollection := make(map[string]*recalculateReferral, len(referrals)) - for _, ref := range referrals { - referralsCollection[ref.UserID] = ref - } - - return &recalculationData{ - Referrals: referralsCollection, - NeedToBeRecalculatedUsers: needToBeRecalculatedUsers, - T1Referrals: t1Referrals, - T2Referrals: t2Referrals, - T1ActiveCounts: t1ActiveCounts, - T2ActiveCounts: t2ActiveCounts, - }, nil -} - -func (m *miner) getBalanceRecalculationMetrics(ctx context.Context, workerNumber int64) (brm *balanceRecalculationMetrics, err error) { - sql := `SELECT * FROM balance_recalculation_metrics WHERE worker = $1` - res, err := storagePG.Get[balanceRecalculationMetrics](ctx, m.dbPG, sql, workerNumber) - if err != nil { - if err == storagePG.ErrNotFound { - return nil, nil - } - - return nil, errors.Wrapf(err, "failed to get balance recalculation metrics:%v", res) - } - - return res, nil -} - -func (m *miner) insertBalanceRecalculationMetrics(ctx context.Context, brm *balanceRecalculationMetrics) error { - sql := `INSERT INTO balance_recalculation_metrics( - worker, - started_at, - ended_at, - t1_balance_positive, - t1_balance_negative, - t2_balance_positive, - t2_balance_negative, - t1_active_counts_positive, - t1_active_counts_negative, - t2_active_counts_positive, - t2_active_counts_negative, - iterations_num, - affected_users - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)` - _, err := storagePG.Exec(ctx, m.dbPG, sql, brm.Worker, brm.StartedAt.Time, brm.EndedAt.Time, brm.T1BalancePositive, brm.T1BalanceNegative, brm.T2BalancePositive, brm.T2BalanceNegative, - brm.T1ActiveCountsPositive, brm.T1ActiveCountsNegative, brm.T2ActiveCountsPositive, brm.T2ActiveCountsNegative, brm.IterationsNum, brm.AffectedUsers) - - return errors.Wrapf(err, "failed to insert metrics for worker:%v, params:%#v", brm.Worker, brm) -} - -func (b *balanceRecalculationMetrics) reset() { - b.EndedAt = nil - b.AffectedUsers = 0 - b.IterationsNum = 0 - b.T1BalancePositive = 0 - b.T1BalanceNegative = 0 - b.T2BalancePositive = 0 - b.T2BalanceNegative = 0 - b.T1ActiveCountsPositive = 0 - b.T1ActiveCountsNegative = 0 - b.T2ActiveCountsPositive = 0 - b.T2ActiveCountsNegative = 0 - b.StartedAt = time.Now() -} - -func (m *miner) insertBalanceRecalculationDryRunBatch(ctx context.Context, infos []*balanceRecalculationDryRun) error { - if len(infos) == 0 { - return nil - } - paramCounter := 1 - params := []any{} - sqlParams := []string{} - for _, info := range infos { - sqlParams = append(sqlParams, fmt.Sprintf("($%v, $%v, $%v, $%v, $%v)", paramCounter, paramCounter+1, paramCounter+2, paramCounter+3, paramCounter+4)) - paramCounter += 5 - params = append(params, info.T1BalanceDiff, info.T2BalanceDiff, info.T1ActiveCountsDiff, info.T2ActiveCountsDiff, info.UserID) - } - - sql := fmt.Sprintf(`INSERT INTO balance_recalculation_dry_run( - diff_t1_balance, - diff_t2_balance, - diff_t1_active_counts, - diff_t2_active_counts, - user_id - ) - VALUES %v - ON CONFLICT(user_id) DO NOTHING`, strings.Join(sqlParams, ",")) - _, err := storagePG.Exec(ctx, m.dbPG, sql, params...) - - return errors.Wrap(err, "failed to insert dry run info") -} diff --git a/model/model.go b/model/model.go index d0e2544..1d83f8a 100644 --- a/model/model.go +++ b/model/model.go @@ -102,9 +102,6 @@ type ( ExtraBonusLastClaimAvailableAtField struct { ExtraBonusLastClaimAvailableAt *time.Time `redis:"extra_bonus_last_claim_available_at,omitempty"` } - BalancesBackupUsedAtField struct { - BalancesBackupUsedAt *time.Time `redis:"balance_backup_used_at,omitempty"` - } ReferralsCountChangeGuardUpdatedAtField struct { ReferralsCountChangeGuardUpdatedAt *time.Time `redis:"referrals_count_change_guard_updated_at,omitempty"` } @@ -168,12 +165,6 @@ type ( BalanceT2Field struct { BalanceT2 float64 `redis:"balance_t2"` } - FirstRecalculatedBalanceT1Field struct { - FirstRecalculatedBalanceT1 float64 `redis:"first_recalculated_balance_t1"` - } - FirstRecalculatedBalanceT2Field struct { - FirstRecalculatedBalanceT2 float64 `redis:"first_recalculated_balance_t2"` - } BalanceForT0Field struct { BalanceForT0 float64 `redis:"balance_for_t0"` } @@ -192,12 +183,6 @@ type ( SlashingRateT2Field struct { SlashingRateT2 float64 `redis:"slashing_rate_t2"` } - FirstRecalculatedSlashingRateT1Field struct { - FirstRecalculatedSlashingRateT1 float64 `redis:"first_recalculated_slashing_rate_t1"` - } - FirstRecalculatedSlashingRateT2Field struct { - FirstRecalculatedSlashingRateT2 float64 `redis:"first_recalculated_slashing_rate_t2"` - } SlashingRateForT0Field struct { SlashingRateForT0 float64 `redis:"slashing_rate_for_t0"` } @@ -234,12 +219,6 @@ type ( ActiveT2ReferralsField struct { ActiveT2Referrals int32 `redis:"active_t2_referrals,omitempty"` } - FirstRecalculatedActiveT1ReferralsField struct { - FirstRecalculatedActiveT1Referrals int32 `redis:"first_recalculated_active_t1_referrals,omitempty"` - } - FirstRecalculatedActiveT2ReferralsField struct { - FirstRecalculatedActiveT2Referrals int32 `redis:"first_recalculated_active_t2_referrals,omitempty"` - } NewsSeenField struct { NewsSeen uint16 `redis:"news_seen"` } @@ -267,9 +246,6 @@ type ( KYCStepBlockedField struct { KYCStepBlocked users.KYCStep `json:"kycStepBlocked" redis:"kyc_step_blocked"` } - DeserializedBackupUsersKey struct { - ID int64 `redis:"-"` - } ) func (k *DeserializedUsersKey) Key() string { @@ -311,45 +287,6 @@ func SerializedUsersKey(val any) string { } } -func (k *DeserializedBackupUsersKey) Key() string { - if k == nil || k.ID == 0 { - return "" - } - - return SerializedBackupUsersKey(k.ID) -} - -func (k *DeserializedBackupUsersKey) SetKey(val string) { - if val == "" || val == "backup:" { - return - } - if val[0] == 'b' { - val = val[7:] - } - var err error - k.ID, err = strconv.ParseInt(val, 10, 64) - log.Panic(err) -} - -func SerializedBackupUsersKey(val any) string { - switch typedVal := val.(type) { - case string: - if typedVal == "" { - return "" - } - - return "backup:" + typedVal - case int64: - if typedVal == 0 { - return "" - } - - return "backup:" + strconv.FormatInt(typedVal, 10) - default: - panic(fmt.Sprintf("%#v cannot be used as backup key", val)) - } -} - type ( TimeSlice []*time.Time ) diff --git a/tokenomics/adoption.go b/tokenomics/adoption.go index 5e2e47e..d9a808b 100644 --- a/tokenomics/adoption.go +++ b/tokenomics/adoption.go @@ -30,7 +30,7 @@ func (r *repository) GetAdoptionSummary(ctx context.Context) (as *AdoptionSummar if as.TotalActiveUsers, err = r.db.Get(ctx, r.totalActiveUsersKey(*time.Now().Time)).Uint64(); err != nil && !errors.Is(err, redis.Nil) { return nil, errors.Wrap(err, "failed to get current totalActiveUsers") } - if as.Milestones, err = GetAllAdoptions[string](ctx, r.db); err != nil { + if as.Milestones, err = getAllAdoptions[string](ctx, r.db); err != nil { return nil, errors.Wrap(err, "failed to get all adoption milestones") } @@ -213,7 +213,7 @@ func getAdoption(ctx context.Context, db storage.DB, milestone uint64) (*Adoptio return resp[0], nil } -func GetAllAdoptions[DENOM ~string | ~float64](ctx context.Context, db storage.DB) ([]*Adoption[DENOM], error) { +func getAllAdoptions[DENOM ~string | ~float64](ctx context.Context, db storage.DB) ([]*Adoption[DENOM], error) { const max = 20 // We try to get 20 just to be sure we get all of them. We're never going to have more than 20 milestones. keys := make([]string, max) for ix := uint64(1); ix <= max; ix++ {