From d1757de0bc659806ad81b96565194082db2e980c Mon Sep 17 00:00:00 2001 From: ice-myles <96409608+ice-myles@users.noreply.github.com> Date: Wed, 13 Dec 2023 23:42:05 +0300 Subject: [PATCH] Balance recalculations based on balanceForXX balances without any clickhouse usage. (#54) --- bookkeeper/storage/contract.go | 22 -- bookkeeper/storage/storage.go | 130 --------- miner/.testdata/DDL.sql | 10 + miner/adoption_range_test.go | 151 ---------- miner/contract.go | 11 +- miner/miner.go | 127 ++++++--- miner/recalculate_balance.go | 497 ++++++++------------------------- 7 files changed, 227 insertions(+), 721 deletions(-) delete mode 100644 miner/adoption_range_test.go diff --git a/bookkeeper/storage/contract.go b/bookkeeper/storage/contract.go index 1903dc9..8351c58 100644 --- a/bookkeeper/storage/contract.go +++ b/bookkeeper/storage/contract.go @@ -25,28 +25,6 @@ type ( Insert(ctx context.Context, columns *Columns, input InsertMetadata, usrs []*model.User) error SelectBalanceHistory(ctx context.Context, id int64, createdAts []stdlibtime.Time) ([]*BalanceHistory, error) SelectTotalCoins(ctx context.Context, createdAts []stdlibtime.Time) ([]*TotalCoins, error) - GetAdjustUserInformation(ctx context.Context, userIDs []string, limit, offset int64) ([]*AdjustUserInfo, error) - } - AdjustUserInfo struct { - MiningSessionSoloStartedAt *time.Time - MiningSessionSoloEndedAt *time.Time - MiningSessionSoloLastStartedAt *time.Time - MiningSessionSoloPreviouslyEndedAt *time.Time - CreatedAt *time.Time - ResurrectSoloUsedAt *time.Time - UserID string - ID int64 - SlashingRateSolo float64 - SlashingRateT1 float64 - SlashingRateT2 float64 - BalanceSolo float64 - BalanceT0 float64 - BalanceT1Pending float64 - BalanceT1PendingApplied float64 - BalanceT2Pending float64 - BalanceT2PendingApplied float64 - PrestakingAllocation uint16 - PrestakingBonus uint16 } BalanceHistory struct { CreatedAt *time.Time diff --git a/bookkeeper/storage/storage.go b/bookkeeper/storage/storage.go index a6ada51..dac92ca 100644 --- a/bookkeeper/storage/storage.go +++ b/bookkeeper/storage/storage.go @@ -14,7 +14,6 @@ import ( "github.com/ClickHouse/ch-go/chpool" "github.com/ClickHouse/ch-go/proto" "github.com/hashicorp/go-multierror" - "github.com/pkg/errors" "go.uber.org/zap" "github.com/ice-blockchain/freezer/model" @@ -441,135 +440,6 @@ func (db *db) SelectBalanceHistory(ctx context.Context, id int64, createdAts []s return res, nil } -func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs []string, limit, offset int64) ([]*AdjustUserInfo, error) { - const ( - maxIDCount = 25_000 - ) - var ( - res = make([]*AdjustUserInfo, 0, len(userIDs)) - counter = 0 - ) - var userIDArray []string - for _, id := range userIDs { - userIDArray = append(userIDArray, id) - counter++ - if counter >= maxIDCount { // Hack not to have 'Max query size exceeded' error. - result, err := db.getAdjustUserInformation(ctx, userIDArray, limit, offset) - if err != nil { - return nil, errors.Wrapf(err, "can't get adjust user information for userIDs:%#v", userIDArray) - } - res = append(res, result...) - userIDArray = userIDArray[:0] - counter = 0 - - continue - } - } - if len(userIDArray) > 0 { - result, err := db.getAdjustUserInformation(ctx, userIDArray, limit, offset) - if err != nil { - return nil, errors.Wrapf(err, "can't get adjust user information for userIDs:%#v", userIDArray) - } - res = append(res, result...) - } - - return res, nil -} - -func (db *db) getAdjustUserInformation(ctx context.Context, userIDArray []string, limit, offset int64) ([]*AdjustUserInfo, error) { - var ( - id = make(proto.ColInt64, 0, len(userIDArray)) - userID = &proto.ColStr{Buf: make([]byte, 0, 40*len(userIDArray)), Pos: make([]proto.Position, 0, len(userIDArray))} - miningSessionSoloStartedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC} - miningSessionSoloEndedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC} - miningSessionSoloPreviouslyEndedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC} - slashingRateSolo = make(proto.ColFloat64, 0, len(userIDArray)) - createdAt = proto.ColDateTime{Data: make([]proto.DateTime, 0), Location: stdlibtime.UTC} - resurrectSoloUsedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC} - balanceSolo = make(proto.ColFloat64, 0, len(userIDArray)) - balanceT1Pending = make(proto.ColFloat64, 0, len(userIDArray)) - balanceT1PendingApplied = make(proto.ColFloat64, 0, len(userIDArray)) - balanceT2Pending = make(proto.ColFloat64, 0, len(userIDArray)) - balanceT2PendingApplied = make(proto.ColFloat64, 0, len(userIDArray)) - res = make([]*AdjustUserInfo, 0, len(userIDArray)) - ) - if err := db.pools[atomic.AddUint64(&db.currentIndex, 1)%uint64(len(db.pools))].Do(ctx, ch.Query{ - Body: fmt.Sprintf(`SELECT id, - user_id, - mining_session_solo_started_at, - mining_session_solo_ended_at, - mining_session_solo_previously_ended_at, - slashing_rate_solo, - created_at, - resurrect_solo_used_at, - balance_solo, - balance_t1_pending, - balance_t1_pending_applied, - balance_t2_pending, - balance_t2_pending_applied - FROM %[1]v - WHERE id IN [%[2]v] - ORDER BY id ASC, created_at ASC - LIMIT %[3]v, %[4]v - `, tableName, strings.Join(userIDArray, ","), offset, limit), - Result: append(make(proto.Results, 0, 13), - proto.ResultColumn{Name: "id", Data: &id}, - proto.ResultColumn{Name: "user_id", Data: userID}, - proto.ResultColumn{Name: "mining_session_solo_started_at", Data: &miningSessionSoloStartedAt}, - proto.ResultColumn{Name: "mining_session_solo_ended_at", Data: &miningSessionSoloEndedAt}, - proto.ResultColumn{Name: "mining_session_solo_previously_ended_at", Data: &miningSessionSoloPreviouslyEndedAt}, - proto.ResultColumn{Name: "slashing_rate_solo", Data: &slashingRateSolo}, - proto.ResultColumn{Name: "created_at", Data: &createdAt}, - proto.ResultColumn{Name: "resurrect_solo_used_at", Data: &resurrectSoloUsedAt}, - proto.ResultColumn{Name: "balance_solo", Data: &balanceSolo}, - proto.ResultColumn{Name: "balance_t1_pending", Data: &balanceT1Pending}, - proto.ResultColumn{Name: "balance_t1_pending_applied", Data: &balanceT1PendingApplied}, - proto.ResultColumn{Name: "balance_t2_pending", Data: &balanceT2Pending}, - proto.ResultColumn{Name: "balance_t2_pending_applied", Data: &balanceT2PendingApplied}, - ), - OnResult: func(_ context.Context, block proto.Block) error { - for ix := 0; ix < block.Rows; ix++ { - res = append(res, &AdjustUserInfo{ - ID: (&id).Row(ix), - UserID: userID.Row(ix), - MiningSessionSoloStartedAt: time.New((&miningSessionSoloStartedAt).Row(ix)), - MiningSessionSoloEndedAt: time.New((&miningSessionSoloEndedAt).Row(ix)), - MiningSessionSoloPreviouslyEndedAt: time.New((&miningSessionSoloPreviouslyEndedAt).Row(ix)), - SlashingRateSolo: (&slashingRateSolo).Row(ix), - CreatedAt: time.New((&createdAt).Row(ix)), - ResurrectSoloUsedAt: time.New((&resurrectSoloUsedAt).Row(ix)), - BalanceSolo: (&balanceSolo).Row(ix), - BalanceT1Pending: (&balanceT1Pending).Row(ix), - BalanceT1PendingApplied: (&balanceT1PendingApplied).Row(ix), - BalanceT2Pending: (&balanceT2Pending).Row(ix), - BalanceT2PendingApplied: (&balanceT2PendingApplied).Row(ix), - }) - } - (&id).Reset() - userID.Reset() - (&miningSessionSoloStartedAt).Reset() - (&miningSessionSoloEndedAt).Reset() - (&miningSessionSoloPreviouslyEndedAt).Reset() - (&slashingRateSolo).Reset() - (&createdAt).Reset() - (&resurrectSoloUsedAt).Reset() - (&balanceSolo).Reset() - (&balanceT1Pending).Reset() - (&balanceT1PendingApplied).Reset() - (&balanceT2Pending).Reset() - (&balanceT2PendingApplied).Reset() - - return nil - }, - Secret: "", - InitialUser: "", - }); err != nil { - return nil, err - } - - return res, nil -} - func (db *db) SelectTotalCoins(ctx context.Context, createdAts []stdlibtime.Time) ([]*TotalCoins, error) { var ( createdAt = proto.ColDateTime{Data: make([]proto.DateTime, 0, len(createdAts)), Location: stdlibtime.UTC} diff --git a/miner/.testdata/DDL.sql b/miner/.testdata/DDL.sql index 16423e7..42204d4 100644 --- a/miner/.testdata/DDL.sql +++ b/miner/.testdata/DDL.sql @@ -18,3 +18,13 @@ CREATE TABLE IF NOT EXISTS balance_recalculation_metrics ( worker BIGINT NOT NULL PRIMARY KEY ) WITH (fillfactor = 70); --************************************************************************************************************************************ +-- balance_recalculation_dry_run +DROP TABLE IF EXISTS balance_recalculation_dry_run; +CREATE TABLE IF NOT EXISTS balance_recalculation_dry_run ( + diff_t1_balance DOUBLE PRECISION NOT NULL, + diff_t2_balance DOUBLE PRECISION NOT NULL, + diff_t1_active_counts DOUBLE PRECISION NOT NULL, + diff_t2_active_counts DOUBLE PRECISION NOT NULL, + user_id text PRIMARY KEY + ) WITH (fillfactor = 70); +--************************************************************************************************************************************ diff --git a/miner/adoption_range_test.go b/miner/adoption_range_test.go deleted file mode 100644 index b5143d3..0000000 --- a/miner/adoption_range_test.go +++ /dev/null @@ -1,151 +0,0 @@ -// SPDX-License-Identifier: ice License 1.0 - -package miner - -import ( - "testing" - stdlibtime "time" - - "github.com/stretchr/testify/assert" - - "github.com/ice-blockchain/freezer/tokenomics" - "github.com/ice-blockchain/wintr/time" -) - -func TestGetAdoptionsRange_1AdoptionPerRange(t *testing.T) { - var adoptions []*tokenomics.Adoption[float64] - adoptions = append(adoptions, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-58 * stdlibtime.Minute)), - BaseMiningRate: 16.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-55 * stdlibtime.Minute)), - BaseMiningRate: 8.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Minute)), - BaseMiningRate: 4.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-10 * stdlibtime.Minute)), - BaseMiningRate: 2.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 1.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.5, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.25, - Milestone: 1, - TotalActiveUsers: 1, - }) - startedAt := time.New(time.Now().Add(-1 * stdlibtime.Minute)) - endedAt := time.Now() - ranges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) - assert.Equal(t, 2., ranges[0].BaseMiningRate) - assert.Equal(t, 2., ranges[1].BaseMiningRate) -} - -func TestGetAdoptionsRange_2AdoptionsPerRange(t *testing.T) { - var adoptions []*tokenomics.Adoption[float64] - adoptions = append(adoptions, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-58 * stdlibtime.Minute)), - BaseMiningRate: 16.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-55 * stdlibtime.Minute)), - BaseMiningRate: 8.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Minute)), - BaseMiningRate: 4.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-10 * stdlibtime.Minute)), - BaseMiningRate: 2.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Second)), - BaseMiningRate: 1.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.5, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.25, - Milestone: 1, - TotalActiveUsers: 1, - }) - startedAt := time.New(time.Now().Add(-1 * stdlibtime.Minute)) - endedAt := time.Now() - ranges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) - assert.Equal(t, 2., ranges[0].BaseMiningRate) - assert.Equal(t, 1., ranges[1].BaseMiningRate) - assert.Equal(t, 1., ranges[2].BaseMiningRate) -} - -func TestGetAdoptionsRange_AdoptionDemotionPerRange(t *testing.T) { - var adoptions []*tokenomics.Adoption[float64] - adoptions = append(adoptions, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Second)), - BaseMiningRate: 16.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-55 * stdlibtime.Minute)), - BaseMiningRate: 8.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Minute)), - BaseMiningRate: 4.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-1 * stdlibtime.Minute)), - BaseMiningRate: 2.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 1.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.5, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.25, - Milestone: 1, - TotalActiveUsers: 1, - }) - startedAt := time.New(time.Now().Add(-2 * stdlibtime.Minute)) - endedAt := time.Now() - ranges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) - assert.Equal(t, 4., ranges[0].BaseMiningRate) - assert.Equal(t, 2., ranges[1].BaseMiningRate) - assert.Equal(t, 16., ranges[2].BaseMiningRate) - assert.Equal(t, 16., ranges[3].BaseMiningRate) -} diff --git a/miner/contract.go b/miner/contract.go index a335a42..c2473de 100644 --- a/miner/contract.go +++ b/miner/contract.go @@ -42,7 +42,7 @@ const ( applicationYamlKey = "miner" parentApplicationYamlKey = "tokenomics" requestDeadline = 30 * stdlibtime.Second - balanceBugFixEnabled = false + balanceBugFixEnabled = true ) // . @@ -122,8 +122,6 @@ type ( model.SlashingRateT2Field model.FirstRecalculatedBalanceT1Field model.FirstRecalculatedBalanceT2Field - model.FirstRecalculatedSlashingRateT1Field - model.FirstRecalculatedSlashingRateT2Field model.DeserializedBackupUsersKey model.ActiveT1ReferralsField model.ActiveT2ReferralsField @@ -141,6 +139,13 @@ type ( model.DeserializedUsersKey } + recalculateReferral struct { + model.BalanceForT0Field + model.BalanceForTMinus1Field + model.UserIDField + model.DeserializedUsersKey + } + referralCountGuardUpdatedUser struct { model.ReferralsCountChangeGuardUpdatedAtField model.DeserializedUsersKey diff --git a/miner/miner.go b/miner/miner.go index 058beec..b2b1542 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -181,12 +181,13 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { referralsUpdated = make([]*referralUpdated, 0, batchSize) histories = make([]*model.User, 0, batchSize) userGlobalRanks = make([]redis.Z, 0, batchSize) + balanceRecalculationDryRunItems = make([]*balanceRecalculationDryRun, 0, batchSize) backupedUsers = make(map[int64]*backupUserUpdated, batchSize) backupUsersUpdated = make([]*backupUserUpdated, 0, batchSize) recalculatedTiersBalancesUsers = make(map[int64]*user, batchSize) historyColumns, historyInsertMetadata = dwh.InsertDDL(int(batchSize)) shouldSynchronizeBalanceFunc = func(batchNumberArg uint64) bool { return false } - recalculationHistory *historyData + recalculationInfo *recalculationData allAdoptions []*tokenomics.Adoption[float64] ) resetVars := func(success bool) { @@ -231,6 +232,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { extraBonusOnlyUpdatedUsers = extraBonusOnlyUpdatedUsers[:0] referralsCountGuardOnlyUpdatedUsers = referralsCountGuardOnlyUpdatedUsers[:0] referralsUpdated = referralsUpdated[:0] + balanceRecalculationDryRunItems = balanceRecalculationDryRunItems[:0] histories = histories[:0] userGlobalRanks = userGlobalRanks[:0] referralsThatStoppedMining = referralsThatStoppedMining[:0] @@ -369,7 +371,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { if balanceBugFixEnabled { if !balanceBackupMode { reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) - recalculationHistory, err = m.gatherHistoryAndReferralsInformation(reqCtx, userResults) + recalculationInfo, err = m.gatherReferralsInformation(reqCtx, userResults) if err != nil { log.Error(errors.New("tiers diff balances error"), workerNumber, err) } @@ -414,44 +416,62 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { } } } else { - if recalculatedUsr := m.recalculateUser(usr, allAdoptions, recalculationHistory); recalculatedUsr != nil { - diffT1ActiveValue := recalculationHistory.T1ActiveCounts[usr.UserID] - usr.ActiveT1Referrals - diffT2ActiveValue := recalculationHistory.T2ActiveCounts[usr.UserID] - usr.ActiveT2Referrals - - oldBalanceT1 := usr.BalanceT1 - oldBalanceT2 := usr.BalanceT2 + if !backupExists && recalculationInfo != nil { + if _, ok := recalculationInfo.NeedToBeRecalculatedUsers[usr.UserID]; ok { + balanceT1, balanceT2 := 0.0, 0.0 + if recalculationInfo.T1Referrals != nil { + if referralsT1, ok := recalculationInfo.T1Referrals[usr.UserID]; ok { + for _, ref := range referralsT1 { + if _, ok := recalculationInfo.Referrals[ref]; ok { + balanceT1 += recalculationInfo.Referrals[ref].BalanceForT0 + } + } + } + } + if recalculationInfo.T2Referrals != nil { + if referralsT2, ok := recalculationInfo.T2Referrals[usr.UserID]; ok { + for _, ref := range referralsT2 { + if _, ok := recalculationInfo.Referrals[ref]; ok { + balanceT2 += recalculationInfo.Referrals[ref].BalanceForTMinus1 + } + } + } + } - if diffT1ActiveValue < 0 && diffT1ActiveValue*-1 > usr.ActiveT1Referrals { - diffT1ActiveValue = -usr.ActiveT1Referrals - } - if diffT2ActiveValue < 0 && diffT2ActiveValue*-1 > usr.ActiveT2Referrals { - diffT2ActiveValue = -usr.ActiveT2Referrals - } + diffT1ActiveValue := recalculationInfo.T1ActiveCounts[usr.UserID] - usr.ActiveT1Referrals + diffT2ActiveValue := recalculationInfo.T2ActiveCounts[usr.UserID] - usr.ActiveT2Referrals - oldSlashingT1Rate := usr.SlashingRateT1 - oldSlashingT2Rate := usr.SlashingRateT2 + oldBalanceT1 := usr.BalanceT1 + oldBalanceT2 := usr.BalanceT2 - if false { - t1ReferralsToIncrementActiveValue[usr.ID] += diffT1ActiveValue - t2ReferralsToIncrementActiveValue[usr.ID] += diffT2ActiveValue + if diffT1ActiveValue < 0 && diffT1ActiveValue*-1 > usr.ActiveT1Referrals { + diffT1ActiveValue = -usr.ActiveT1Referrals + } + if diffT2ActiveValue < 0 && diffT2ActiveValue*-1 > usr.ActiveT2Referrals { + diffT2ActiveValue = -usr.ActiveT2Referrals + } + if false { + t1ReferralsToIncrementActiveValue[usr.ID] += diffT1ActiveValue + t2ReferralsToIncrementActiveValue[usr.ID] += diffT2ActiveValue - usr.BalanceT1 = recalculatedUsr.BalanceT1 - usr.BalanceT2 = recalculatedUsr.BalanceT2 + usr.BalanceT1 = balanceT1 + usr.BalanceT2 = balanceT2 + } - usr.SlashingRateT1 = recalculatedUsr.SlashingRateT1 - usr.SlashingRateT2 = recalculatedUsr.SlashingRateT2 - } - if !backupExists { metrics.AffectedUsers += 1 - if recalculatedUsr.BalanceT1-oldBalanceT1 >= 0 { - metrics.T1BalancePositive += recalculatedUsr.BalanceT1 - oldBalanceT1 + + t1BalanceDiff := balanceT1 - oldBalanceT1 + t2BalanceDiff := balanceT2 - oldBalanceT2 + + if t1BalanceDiff >= 0 { + metrics.T1BalancePositive += t1BalanceDiff } else { - metrics.T1BalanceNegative += recalculatedUsr.BalanceT1 - oldBalanceT1 + metrics.T1BalanceNegative += t1BalanceDiff } - if recalculatedUsr.BalanceT2-oldBalanceT2 >= 0 { - metrics.T2BalancePositive += recalculatedUsr.BalanceT2 - oldBalanceT2 + if t2BalanceDiff >= 0 { + metrics.T2BalancePositive += t2BalanceDiff } else { - metrics.T2BalanceNegative += recalculatedUsr.BalanceT2 - oldBalanceT2 + metrics.T2BalanceNegative += t2BalanceDiff } if diffT1ActiveValue < 0 { metrics.T1ActiveCountsNegative += int64(diffT1ActiveValue) @@ -463,20 +483,49 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { } else { metrics.T2ActiveCountsPositive += int64(diffT2ActiveValue) } + if !backupExists { + metrics.AffectedUsers += 1 + if t1BalanceDiff >= 0 { + metrics.T1BalancePositive += t1BalanceDiff + } else { + metrics.T1BalanceNegative += t1BalanceDiff + } + if t2BalanceDiff >= 0 { + metrics.T2BalancePositive += t2BalanceDiff + } else { + metrics.T2BalanceNegative += t2BalanceDiff + } + if diffT1ActiveValue < 0 { + metrics.T1ActiveCountsNegative += int64(diffT1ActiveValue) + } else { + metrics.T1ActiveCountsPositive += int64(diffT1ActiveValue) + } + if diffT2ActiveValue < 0 { + metrics.T2ActiveCountsNegative += int64(diffT2ActiveValue) + } else { + metrics.T2ActiveCountsPositive += int64(diffT2ActiveValue) + } + } + + balanceRecalculationDryRunItems = append(balanceRecalculationDryRunItems, &balanceRecalculationDryRun{ + T1BalanceDiff: t1BalanceDiff, + T2BalanceDiff: t2BalanceDiff, + T1ActiveCountsDiff: diffT1ActiveValue, + T2ActiveCountsDiff: diffT2ActiveValue, + UserID: usr.UserID, + }) backupUsersUpdated = append(backupUsersUpdated, &backupUserUpdated{ DeserializedBackupUsersKey: model.DeserializedBackupUsersKey{ID: usr.ID}, UserIDField: usr.UserIDField, BalanceT1Field: model.BalanceT1Field{BalanceT1: oldBalanceT1}, BalanceT2Field: model.BalanceT2Field{BalanceT2: oldBalanceT2}, - SlashingRateT1Field: model.SlashingRateT1Field{SlashingRateT1: oldSlashingT1Rate}, - SlashingRateT2Field: model.SlashingRateT2Field{SlashingRateT2: oldSlashingT2Rate}, + SlashingRateT1Field: model.SlashingRateT1Field{SlashingRateT1: usr.SlashingRateT1}, + SlashingRateT2Field: model.SlashingRateT2Field{SlashingRateT2: usr.SlashingRateT2}, ActiveT1ReferralsField: model.ActiveT1ReferralsField{ActiveT1Referrals: usr.ActiveT1Referrals}, ActiveT2ReferralsField: model.ActiveT2ReferralsField{ActiveT2Referrals: usr.ActiveT2Referrals}, - FirstRecalculatedBalanceT1Field: model.FirstRecalculatedBalanceT1Field{FirstRecalculatedBalanceT1: recalculatedUsr.BalanceT1}, - FirstRecalculatedBalanceT2Field: model.FirstRecalculatedBalanceT2Field{FirstRecalculatedBalanceT2: recalculatedUsr.BalanceT2}, - FirstRecalculatedSlashingRateT1Field: model.FirstRecalculatedSlashingRateT1Field{FirstRecalculatedSlashingRateT1: recalculatedUsr.SlashingRateT1}, - FirstRecalculatedSlashingRateT2Field: model.FirstRecalculatedSlashingRateT2Field{FirstRecalculatedSlashingRateT2: recalculatedUsr.SlashingRateT2}, + FirstRecalculatedBalanceT1Field: model.FirstRecalculatedBalanceT1Field{FirstRecalculatedBalanceT1: balanceT1}, + FirstRecalculatedBalanceT2Field: model.FirstRecalculatedBalanceT2Field{FirstRecalculatedBalanceT2: balanceT2}, FirstRecalculatedActiveT1ReferralsField: model.FirstRecalculatedActiveT1ReferralsField{FirstRecalculatedActiveT1Referrals: usr.ActiveT1Referrals + diffT1ActiveValue}, FirstRecalculatedActiveT2ReferralsField: model.FirstRecalculatedActiveT2ReferralsField{FirstRecalculatedActiveT2Referrals: usr.ActiveT2Referrals + diffT2ActiveValue}, }) @@ -732,6 +781,10 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { continue } } + if err := m.insertBalanceRecalculationDryRunBatch(ctx, balanceRecalculationDryRunItems); err != nil { + log.Error(err, fmt.Sprintf("can't insert balance recalcualtion dry run information for users:%#v, workerNumber:%v", userResults, workerNumber)) + } + if len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(updatedUsers)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks) > 0 { go m.telemetry.collectElapsed(7, *before.Time) } diff --git a/miner/recalculate_balance.go b/miner/recalculate_balance.go index efde3f6..db3e610 100644 --- a/miner/recalculate_balance.go +++ b/miner/recalculate_balance.go @@ -4,15 +4,14 @@ package miner import ( "context" - "sort" - stdlibtime "time" + "fmt" + "strings" "github.com/pkg/errors" "github.com/redis/go-redis/v9" "github.com/ice-blockchain/eskimo/users" "github.com/ice-blockchain/freezer/model" - "github.com/ice-blockchain/freezer/tokenomics" storagePG "github.com/ice-blockchain/wintr/connectors/storage/v2" "github.com/ice-blockchain/wintr/connectors/storage/v3" "github.com/ice-blockchain/wintr/log" @@ -34,29 +33,9 @@ type ( ID string } - splittedAdoptionByRange struct { - TimePoint *time.Time - BaseMiningRate float64 - } - - historyRangeTime struct { - MiningSessionSoloStartedAt *time.Time - MiningSessionSoloEndedAt *time.Time - MiningSessionSoloLastStartedAt *time.Time - MiningSessionSoloPreviouslyEndedAt *time.Time - CreatedAt *time.Time - ResurrectSoloUsedAt *time.Time - SlashingRateSolo float64 - BalanceSolo float64 - BalanceT1Pending float64 - BalanceT1PendingApplied float64 - BalanceT2Pending float64 - BalanceT2PendingApplied float64 - } - - historyData struct { + recalculationData struct { + Referrals map[string]*recalculateReferral NeedToBeRecalculatedUsers map[string]struct{} - HistoryTimeRanges map[string][]*historyRangeTime T1Referrals, T2Referrals map[string][]string T1ActiveCounts, T2ActiveCounts map[string]int32 } @@ -76,6 +55,14 @@ type ( AffectedUsers int64 Worker int64 } + + balanceRecalculationDryRun struct { + T1BalanceDiff float64 + T2BalanceDiff float64 + T1ActiveCountsDiff int32 + T2ActiveCountsDiff int32 + UserID string + } ) func (m *miner) getUsers(ctx context.Context, users []*user) (map[string]*pgUserCreated, error) { @@ -111,7 +98,7 @@ func (m *miner) getUsers(ctx context.Context, users []*user) (map[string]*pgUser } func (m *miner) collectTiers(ctx context.Context, needToBeRecalculatedUsers map[string]struct{}) ( - t1Referrals, t2Referrals map[string][]string, t1ActiveCounts, t2ActiveCounts map[string]int32, err error, + referralsUserKeys []string, t1Referrals, t2Referrals map[string][]string, t1ActiveCounts, t2ActiveCounts map[string]int32, err error, ) { var ( userIDs = make([]string, 0, len(needToBeRecalculatedUsers)) @@ -123,6 +110,7 @@ func (m *miner) collectTiers(ctx context.Context, needToBeRecalculatedUsers map[ } t1ActiveCounts, t2ActiveCounts = make(map[string]int32, len(needToBeRecalculatedUsers)), make(map[string]int32, len(needToBeRecalculatedUsers)) t1Referrals, t2Referrals = make(map[string][]string), make(map[string][]string) + referralsUserKeysMap := make(map[string]struct{}) for { sql := `SELECT * FROM( SELECT @@ -160,7 +148,7 @@ func (m *miner) collectTiers(ctx context.Context, needToBeRecalculatedUsers map[ LIMIT $3 OFFSET $4` rows, err := storagePG.Select[pgUser](ctx, m.dbPG, sql, now.Time, userIDs, maxLimit, offset) if err != nil { - return nil, nil, nil, nil, errors.Wrap(err, "can't get referrals from pg for showing actual data") + return nil, nil, nil, nil, nil, errors.Wrap(err, "can't get referrals from pg for showing actual data") } if len(rows) == 0 { break @@ -170,11 +158,13 @@ func (m *miner) collectTiers(ctx context.Context, needToBeRecalculatedUsers map[ if row.ReferredBy != "bogus" && row.ReferredBy != "icenetwork" && row.ID != "bogus" && row.ID != "icenetwork" { if row.ReferralType == "T1" { t1Referrals[row.ReferredBy] = append(t1Referrals[row.ReferredBy], row.ID) + referralsUserKeysMap[model.SerializedUsersKey(row.ID)] = struct{}{} if row.Active != nil && *row.Active { t1ActiveCounts[row.ReferredBy]++ } } else if row.ReferralType == "T2" { t2Referrals[row.ReferredBy] = append(t2Referrals[row.ReferredBy], row.ID) + referralsUserKeysMap[model.SerializedUsersKey(row.ID)] = struct{}{} if row.Active != nil && *row.Active { t2ActiveCounts[row.ReferredBy]++ } @@ -184,93 +174,14 @@ func (m *miner) collectTiers(ctx context.Context, needToBeRecalculatedUsers map[ } } } - - return t1Referrals, t2Referrals, t1ActiveCounts, t2ActiveCounts, nil -} - -func splitByAdoptionTimeRanges(adoptions []*tokenomics.Adoption[float64], startedAt, endedAt *time.Time) []splittedAdoptionByRange { - var result []splittedAdoptionByRange - - currentMBR := adoptions[0].BaseMiningRate - lastAchievedAt := adoptions[0].AchievedAt - currentAchievedAtIdx := 0 - - for idx, adptn := range adoptions { - if adptn.AchievedAt.IsNil() { - continue - } - if adptn.AchievedAt.Before(*startedAt.Time) { - currentMBR = adptn.BaseMiningRate - } - if (adptn.AchievedAt.After(*startedAt.Time) || adptn.AchievedAt.Equal(*startedAt.Time)) && - adptn.AchievedAt.Before(*endedAt.Time) { - result = append(result, splittedAdoptionByRange{ - TimePoint: adptn.AchievedAt, - BaseMiningRate: adptn.BaseMiningRate, - }) - } - if adptn.AchievedAt.After(*lastAchievedAt.Time) { - currentAchievedAtIdx = idx - lastAchievedAt = adptn.AchievedAt - } - } - result = append(result, - splittedAdoptionByRange{ - TimePoint: startedAt, - BaseMiningRate: currentMBR, - }, - ) - if endedAt.After(*adoptions[currentAchievedAtIdx].AchievedAt.Time) { - result = append(result, - splittedAdoptionByRange{ - TimePoint: endedAt, - BaseMiningRate: adoptions[currentAchievedAtIdx].BaseMiningRate, - }) - } else { - result = append(result, - splittedAdoptionByRange{ - TimePoint: endedAt, - BaseMiningRate: currentMBR, - }) - } - sort.Slice(result, func(i, j int) bool { - return result[i].TimePoint.Before(*result[j].TimePoint.Time) - }) - - return result -} - -func calculateTimeBounds(refTimeRange, usrRange *historyRangeTime) (*time.Time, *time.Time) { - if refTimeRange.MiningSessionSoloStartedAt.After(*usrRange.MiningSessionSoloEndedAt.Time) || refTimeRange.MiningSessionSoloEndedAt.Before(*usrRange.MiningSessionSoloStartedAt.Time) || refTimeRange.SlashingRateSolo > 0 { - return nil, nil - } - var startedAt, endedAt *time.Time - if refTimeRange.MiningSessionSoloStartedAt.After(*usrRange.MiningSessionSoloStartedAt.Time) || refTimeRange.MiningSessionSoloStartedAt.Equal(*usrRange.MiningSessionSoloStartedAt.Time) { - startedAt = refTimeRange.MiningSessionSoloStartedAt - } else { - startedAt = usrRange.MiningSessionSoloStartedAt - } - if refTimeRange.MiningSessionSoloEndedAt.Before(*usrRange.MiningSessionSoloEndedAt.Time) || refTimeRange.MiningSessionSoloEndedAt.Equal(*usrRange.MiningSessionSoloEndedAt.Time) { - endedAt = refTimeRange.MiningSessionSoloEndedAt - } else { - endedAt = usrRange.MiningSessionSoloEndedAt + for key, _ := range referralsUserKeysMap { + referralsUserKeys = append(referralsUserKeys, key) } - return startedAt, endedAt -} - -func initializeEmptyUser(updatedUser, usr *user) *user { - var newUser user - newUser.ID = usr.ID - newUser.UserID = usr.UserID - newUser.IDT0 = usr.IDT0 - newUser.IDTMinus1 = usr.IDTMinus1 - newUser.BalanceLastUpdatedAt = nil - - return &newUser + return referralsUserKeys, t1Referrals, t2Referrals, t1ActiveCounts, t2ActiveCounts, nil } -func getInternalIDs(ctx context.Context, db storage.DB, keys ...string) ([]string, error) { +func getInternalIDsBatch(ctx context.Context, db storage.DB, keys ...string) ([]string, error) { if cmdResults, err := db.Pipelined(ctx, func(pipeliner redis.Pipeliner) error { if err := pipeliner.MGet(ctx, keys...).Err(); err != nil { return err @@ -287,7 +198,7 @@ func getInternalIDs(ctx context.Context, db storage.DB, keys ...string) ([]strin if val == nil { continue } - results = append(results, val.(string)) + results = append(results, model.SerializedUsersKey(val.(string))) } } @@ -295,14 +206,68 @@ func getInternalIDs(ctx context.Context, db storage.DB, keys ...string) ([]strin } } -func (m *miner) gatherHistoryAndReferralsInformation(ctx context.Context, users []*user) (history *historyData, err error) { +func getInternalIDs(ctx context.Context, db storage.DB, keys ...string) (result []string, err error) { + var batchKeys []string + for _, key := range keys { + batchKeys = append(batchKeys, key) + if len(batchKeys) >= int(cfg.BatchSize) { + res, err := getInternalIDsBatch(ctx, db, batchKeys...) + if err != nil { + return nil, err + } + result = append(result, res...) + batchKeys = batchKeys[:0] + } + } + if len(batchKeys) > 0 { + res, err := getInternalIDsBatch(ctx, db, batchKeys...) + if err != nil { + return nil, err + } + result = append(result, res...) + } + + return result, nil +} + +func getReferralsBatch(ctx context.Context, db storage.DB, keys ...string) ([]*recalculateReferral, error) { + referrals := make([]*recalculateReferral, 0) + if err := storage.Bind[recalculateReferral](ctx, db, keys, &referrals); err != nil { + return nil, errors.Wrapf(err, "failed to get referrals for:%v", keys) + } + + return referrals, nil +} + +func getReferrals(ctx context.Context, db storage.DB, keys ...string) (result []*recalculateReferral, err error) { + var batchKeys []string + for _, key := range keys { + batchKeys = append(batchKeys, key) + if len(batchKeys) >= int(cfg.BatchSize) { + referrals, err := getReferralsBatch(ctx, db, batchKeys...) + if err != nil { + return nil, err + } + result = append(result, referrals...) + batchKeys = batchKeys[:0] + } + } + if len(batchKeys) > 0 { + referrals, err := getReferralsBatch(ctx, db, batchKeys...) + if err != nil { + return nil, err + } + result = append(result, referrals...) + } + + return result, nil +} + +func (m *miner) gatherReferralsInformation(ctx context.Context, users []*user) (history *recalculationData, err error) { if len(users) == 0 { return nil, nil } - var ( - needToBeRecalculatedUsers = make(map[string]struct{}, len(users)) - historyTimeRanges = make(map[string][]*historyRangeTime) - ) + needToBeRecalculatedUsers := make(map[string]struct{}, len(users)) usrs, err := m.getUsers(ctx, users) if err != nil { return nil, errors.Wrapf(err, "can't get CreatedAt information for users:%#v", usrs) @@ -321,63 +286,29 @@ func (m *miner) gatherHistoryAndReferralsInformation(ctx context.Context, users if len(needToBeRecalculatedUsers) == 0 { return nil, nil } - t1Referrals, t2Referrals, t1ActiveCounts, t2ActiveCounts, err := m.collectTiers(ctx, needToBeRecalculatedUsers) + referralsUserKeys, t1Referrals, t2Referrals, t1ActiveCounts, t2ActiveCounts, err := m.collectTiers(ctx, needToBeRecalculatedUsers) if err != nil { return nil, errors.Wrap(err, "can't get active users for users") } if len(t1Referrals) == 0 && len(t2Referrals) == 0 { return nil, nil } - userKeys := make([]string, 0, len(t1Referrals)+len(t2Referrals)+len(needToBeRecalculatedUsers)) - for _, values := range t1Referrals { - for _, val := range values { - userKeys = append(userKeys, model.SerializedUsersKey(val)) - } - } - for _, values := range t2Referrals { - for _, val := range values { - userKeys = append(userKeys, model.SerializedUsersKey(val)) - } - } - for key, _ := range needToBeRecalculatedUsers { - userKeys = append(userKeys, model.SerializedUsersKey(key)) - } - userResults, err := getInternalIDs(ctx, m.db, userKeys...) + internalIDKeys, err := getInternalIDs(ctx, m.db, referralsUserKeys...) if err != nil { - return nil, errors.Wrapf(err, "failed to get internal ids for:%v", userKeys) + return nil, errors.Wrapf(err, "can't get internal ids for:%#v", referralsUserKeys) } - offset := int64(0) - for { - historyInformation, err := m.dwhClient.GetAdjustUserInformation(ctx, userResults, maxLimit, offset) - if err != nil { - return nil, errors.Wrapf(err, "can't get adjust user information for ids:#%v", userResults) - } - if len(historyInformation) == 0 { - break - } - offset += maxLimit - for _, info := range historyInformation { - historyTimeRanges[info.UserID] = append(historyTimeRanges[info.UserID], &historyRangeTime{ - MiningSessionSoloPreviouslyEndedAt: info.MiningSessionSoloPreviouslyEndedAt, - MiningSessionSoloStartedAt: info.MiningSessionSoloStartedAt, - MiningSessionSoloEndedAt: info.MiningSessionSoloEndedAt, - ResurrectSoloUsedAt: info.ResurrectSoloUsedAt, - CreatedAt: info.CreatedAt, - SlashingRateSolo: info.SlashingRateSolo, - BalanceT1Pending: info.BalanceT1Pending, - BalanceT1PendingApplied: info.BalanceT1PendingApplied, - BalanceT2Pending: info.BalanceT2Pending, - BalanceT2PendingApplied: info.BalanceT2PendingApplied, - }) - } + referrals, err := getReferrals(ctx, m.db, internalIDKeys...) + if err != nil { + return nil, errors.Wrapf(err, "failed to get referrals for:%v", users) } - if len(historyTimeRanges) == 0 { - return nil, nil + referralsCollection := make(map[string]*recalculateReferral, len(referrals)) + for _, ref := range referrals { + referralsCollection[ref.UserID] = ref } - return &historyData{ + return &recalculationData{ + Referrals: referralsCollection, NeedToBeRecalculatedUsers: needToBeRecalculatedUsers, - HistoryTimeRanges: historyTimeRanges, T1Referrals: t1Referrals, T2Referrals: t2Referrals, T1ActiveCounts: t1ActiveCounts, @@ -385,223 +316,6 @@ func (m *miner) gatherHistoryAndReferralsInformation(ctx context.Context, users }, nil } -func (m *miner) recalculateUser(usr *user, adoptions []*tokenomics.Adoption[float64], history *historyData) *user { - if history == nil || history.HistoryTimeRanges == nil || (history.T1Referrals == nil && history.T2Referrals == nil) || adoptions == nil { - return nil - } - if _, ok := history.NeedToBeRecalculatedUsers[usr.UserID]; !ok { - return nil - } - if _, ok := history.HistoryTimeRanges[usr.UserID]; ok { - var ( - isResurrected bool - slashingLastEndedAt *time.Time - lastMiningSessionSoloEndedAt *time.Time - previousUserStartedAt, previousUserEndedAt *time.Time - now = time.Now() - ) - clonedUser1 := *usr - updatedUser := &clonedUser1 - updatedUser.BalanceT1 = 0 - updatedUser.BalanceT2 = 0 - updatedUser.BalanceLastUpdatedAt = nil - - for _, usrRange := range history.HistoryTimeRanges[usr.UserID] { - if updatedUser == nil { - updatedUser = initializeEmptyUser(updatedUser, usr) - } - lastMiningSessionSoloEndedAt = usrRange.MiningSessionSoloEndedAt - - updatedUser.BalanceT1Pending = usrRange.BalanceT1Pending - updatedUser.BalanceT1PendingApplied = usrRange.BalanceT1PendingApplied - updatedUser.BalanceT2Pending = usrRange.BalanceT2Pending - updatedUser.BalanceT2PendingApplied = usrRange.BalanceT2PendingApplied - /****************************************************************************************************************************************************** - 1. Resurrection check & handling. - ******************************************************************************************************************************************************/ - if !usrRange.ResurrectSoloUsedAt.IsNil() && usrRange.ResurrectSoloUsedAt.Unix() > 0 && !isResurrected { - var resurrectDelta float64 - if timeSpent := usrRange.MiningSessionSoloStartedAt.Sub(*usrRange.MiningSessionSoloPreviouslyEndedAt.Time); cfg.Development { - resurrectDelta = timeSpent.Minutes() - } else { - resurrectDelta = timeSpent.Hours() - } - updatedUser.BalanceT1 += updatedUser.SlashingRateT1 * resurrectDelta - updatedUser.BalanceT2 += updatedUser.SlashingRateT2 * resurrectDelta - updatedUser.SlashingRateT1 = 0 - updatedUser.SlashingRateT2 = 0 - - isResurrected = true - } - /****************************************************************************************************************************************************** - 2. Slashing calculations. - ******************************************************************************************************************************************************/ - if usrRange.SlashingRateSolo > 0 { - if slashingLastEndedAt.IsNil() { - slashingLastEndedAt = usrRange.MiningSessionSoloEndedAt - } - updatedUser.BalanceLastUpdatedAt = slashingLastEndedAt - updatedUser.ResurrectSoloUsedAt = nil - updatedUser, _, _ = mine(0., usrRange.CreatedAt, updatedUser, nil, nil) - slashingLastEndedAt = usrRange.CreatedAt - - continue - } - if !slashingLastEndedAt.IsNil() && usrRange.MiningSessionSoloStartedAt.Sub(*slashingLastEndedAt.Time).Nanoseconds() > 0 { - updatedUser.BalanceLastUpdatedAt = slashingLastEndedAt - updatedUser.ResurrectSoloUsedAt = nil - now := usrRange.MiningSessionSoloStartedAt - updatedUser, _, _ = mine(0., now, updatedUser, nil, nil) - slashingLastEndedAt = nil - } - /****************************************************************************************************************************************************** - 3. Saving time range state for the next range for streaks case. - ******************************************************************************************************************************************************/ - if previousUserStartedAt != nil && previousUserStartedAt.Equal(*usrRange.MiningSessionSoloStartedAt.Time) && - previousUserEndedAt != nil && (usrRange.MiningSessionSoloEndedAt.After(*previousUserEndedAt.Time) || - usrRange.MiningSessionSoloEndedAt.Equal(*previousUserEndedAt.Time)) { - - previousUserStartedAt = usrRange.MiningSessionSoloStartedAt - - usrRange.MiningSessionSoloStartedAt = previousUserEndedAt - previousUserEndedAt = usrRange.MiningSessionSoloEndedAt - } else { - previousUserStartedAt = usrRange.MiningSessionSoloStartedAt - previousUserEndedAt = usrRange.MiningSessionSoloEndedAt - } - /****************************************************************************************************************************************************** - 4. T1 Balance calculation for the current user time range. - ******************************************************************************************************************************************************/ - if _, ok := history.T1Referrals[usr.UserID]; ok { - for _, refID := range history.T1Referrals[usr.UserID] { - if _, ok := history.HistoryTimeRanges[refID]; ok { - var previousT1MiningSessionStartedAt, previousT1MiningSessionEndedAt *time.Time - for _, timeRange := range history.HistoryTimeRanges[refID] { - if timeRange.SlashingRateSolo > 0 { - continue - } - if previousT1MiningSessionStartedAt != nil && previousT1MiningSessionStartedAt.Equal(*timeRange.MiningSessionSoloStartedAt.Time) && - previousT1MiningSessionEndedAt != nil && (timeRange.MiningSessionSoloEndedAt.After(*previousT1MiningSessionEndedAt.Time) || - timeRange.MiningSessionSoloEndedAt.Equal(*previousT1MiningSessionEndedAt.Time)) { - - previousT1MiningSessionStartedAt = timeRange.MiningSessionSoloStartedAt - timeRange.MiningSessionSoloStartedAt = previousT1MiningSessionEndedAt - previousT1MiningSessionEndedAt = timeRange.MiningSessionSoloEndedAt - } else { - previousT1MiningSessionStartedAt = timeRange.MiningSessionSoloStartedAt - previousT1MiningSessionEndedAt = timeRange.MiningSessionSoloEndedAt - } - startedAt, endedAt := calculateTimeBounds(timeRange, usrRange) - if startedAt == nil && endedAt == nil { - continue - } - - adoptionRanges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) - - var previousTimePoint *time.Time - for _, adoptionRange := range adoptionRanges { - if previousTimePoint == nil { - previousTimePoint = adoptionRange.TimePoint - - continue - } - if previousTimePoint.Equal(*adoptionRange.TimePoint.Time) { - continue - } - updatedUser.ActiveT1Referrals = 1 - updatedUser.ActiveT2Referrals = 0 - updatedUser.MiningSessionSoloStartedAt = previousTimePoint - updatedUser.MiningSessionSoloEndedAt = time.New(adoptionRange.TimePoint.Add(1 * stdlibtime.Nanosecond)) - updatedUser.BalanceLastUpdatedAt = nil - updatedUser.ResurrectSoloUsedAt = nil - now := adoptionRange.TimePoint - - updatedUser, _, _ = mine(adoptionRange.BaseMiningRate, now, updatedUser, nil, nil) - - previousTimePoint = adoptionRange.TimePoint - } - } - } - } - } - /****************************************************************************************************************************************************** - 5. T2 Balance calculation for the current user time range. - ******************************************************************************************************************************************************/ - if _, ok := history.T2Referrals[usr.UserID]; ok { - for _, refID := range history.T2Referrals[usr.UserID] { - if _, ok := history.HistoryTimeRanges[refID]; ok { - var previousT2MiningSessionStartedAt, previousT2MiningSessionEndedAt *time.Time - for _, timeRange := range history.HistoryTimeRanges[refID] { - if timeRange.SlashingRateSolo > 0 { - continue - } - if previousT2MiningSessionStartedAt != nil && previousT2MiningSessionStartedAt.Equal(*timeRange.MiningSessionSoloStartedAt.Time) && - previousT2MiningSessionEndedAt != nil && (timeRange.MiningSessionSoloEndedAt.After(*previousT2MiningSessionEndedAt.Time) || - timeRange.MiningSessionSoloEndedAt.Equal(*previousT2MiningSessionEndedAt.Time)) { - - previousT2MiningSessionStartedAt = timeRange.MiningSessionSoloStartedAt - timeRange.MiningSessionSoloStartedAt = previousT2MiningSessionEndedAt - previousT2MiningSessionEndedAt = timeRange.MiningSessionSoloEndedAt - } else { - previousT2MiningSessionEndedAt = timeRange.MiningSessionSoloEndedAt - previousT2MiningSessionStartedAt = timeRange.MiningSessionSoloStartedAt - } - startedAt, endedAt := calculateTimeBounds(timeRange, usrRange) - if startedAt == nil && endedAt == nil { - continue - } - - adoptionRanges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) - - var previousTimePoint *time.Time - for _, adoptionRange := range adoptionRanges { - if previousTimePoint == nil { - previousTimePoint = adoptionRange.TimePoint - - continue - } - if previousTimePoint.Equal(*adoptionRange.TimePoint.Time) { - continue - } - updatedUser.ActiveT1Referrals = 0 - updatedUser.ActiveT2Referrals = 1 - updatedUser.MiningSessionSoloPreviouslyEndedAt = usr.MiningSessionSoloPreviouslyEndedAt - updatedUser.MiningSessionSoloStartedAt = previousTimePoint - updatedUser.MiningSessionSoloEndedAt = time.New(adoptionRange.TimePoint.Add(1 * stdlibtime.Nanosecond)) - updatedUser.BalanceLastUpdatedAt = nil - updatedUser.ResurrectSoloUsedAt = nil - now := adoptionRange.TimePoint - - updatedUser, _, _ = mine(adoptionRange.BaseMiningRate, now, updatedUser, nil, nil) - - previousTimePoint = adoptionRange.TimePoint - } - } - } - } - } - } - if !lastMiningSessionSoloEndedAt.IsNil() { - if timeDiff := now.Sub(*lastMiningSessionSoloEndedAt.Time); cfg.Development { - if timeDiff >= 60*stdlibtime.Minute { - updatedUser = nil - } - } else { - if timeDiff >= 60*stdlibtime.Hour*24 { - updatedUser = nil - } - } - } - if updatedUser == nil { - updatedUser = initializeEmptyUser(updatedUser, usr) - } - - return updatedUser - } - - return nil -} - func (m *miner) getBalanceRecalculationMetrics(ctx context.Context, workerNumber int64) (brm *balanceRecalculationMetrics, err error) { sql := `SELECT * FROM balance_recalculation_metrics WHERE worker = $1` res, err := storagePG.Get[balanceRecalculationMetrics](ctx, m.dbPG, sql, workerNumber) @@ -653,3 +367,30 @@ func (b *balanceRecalculationMetrics) reset() { b.T2ActiveCountsNegative = 0 b.StartedAt = time.Now() } + +func (m *miner) insertBalanceRecalculationDryRunBatch(ctx context.Context, infos []*balanceRecalculationDryRun) error { + if len(infos) == 0 { + return nil + } + paramCounter := 1 + params := []any{} + sqlParams := []string{} + for _, info := range infos { + sqlParams = append(sqlParams, fmt.Sprintf("($%v, $%v, $%v, $%v, $%v)", paramCounter, paramCounter+1, paramCounter+2, paramCounter+3, paramCounter+4)) + paramCounter += 5 + params = append(params, info.T1BalanceDiff, info.T2BalanceDiff, info.T1ActiveCountsDiff, info.T2ActiveCountsDiff, info.UserID) + } + + sql := fmt.Sprintf(`INSERT INTO balance_recalculation_dry_run( + diff_t1_balance, + diff_t2_balance, + diff_t1_active_counts, + diff_t2_active_counts, + user_id + ) + VALUES %v + ON CONFLICT(user_id) DO NOTHING`, strings.Join(sqlParams, ",")) + _, err := storagePG.Exec(ctx, m.dbPG, sql, params...) + + return errors.Wrap(err, "failed to insert dry run info") +}