From 957efd13799cbb56d8720c11d6ff568bb969aae3 Mon Sep 17 00:00:00 2001 From: ice-myles <96409608+ice-myles@users.noreply.github.com> Date: Mon, 18 Dec 2023 15:47:07 +0300 Subject: [PATCH] Balance tminus1 recalculations optimization. (#63) Decreased clickhouse requests. Fixed timeout issues. Added flag to remove debug information. --- go.mod | 2 +- go.sum | 4 +- miner/contract.go | 9 +- miner/miner.go | 289 ++++++++++++++++++++++------------- miner/recalculate_balance.go | 11 +- model/model.go | 43 ++++++ 6 files changed, 241 insertions(+), 117 deletions(-) diff --git a/go.mod b/go.mod index 16f915f..a88a4f0 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/goccy/go-json v0.10.2 github.com/hashicorp/go-multierror v1.1.1 - github.com/ice-blockchain/eskimo v1.225.0 + github.com/ice-blockchain/eskimo v1.226.0 github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb github.com/ice-blockchain/wintr v1.127.0 github.com/imroc/req/v3 v3.42.2 diff --git a/go.sum b/go.sum index e925f7d..f35561f 100644 --- a/go.sum +++ b/go.sum @@ -208,8 +208,8 @@ github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mO github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/ice-blockchain/eskimo v1.225.0 h1:aA+PL0vXbD52dRc9QZMZoNDkMyVd0r8qAwUQNMjrmno= -github.com/ice-blockchain/eskimo v1.225.0/go.mod h1:Du7qkQbFadYFb9YddPfGrErk0tX3zEX7ckIzxBZXwQY= +github.com/ice-blockchain/eskimo v1.226.0 h1:mZw2efPMUUQVGw3Eh43G/XDE5JTmaX/50gZzaRv9cBk= +github.com/ice-blockchain/eskimo v1.226.0/go.mod h1:Btj5U8KPf2+dkGtajQER1SjfXv40t8RqaihVNd4FE1s= github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb h1:8TnFP3mc7O+tc44kv2e0/TpZKnEVUaKH+UstwfBwRkk= github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb/go.mod h1:ZsQU7i3mxhgBBu43Oev7WPFbIjP4TniN/b1UPNGbrq8= github.com/ice-blockchain/wintr v1.127.0 h1:YuGfLCGu91mLtsH0AcdNKnDERZPD6+3er93T/m7vF2Q= diff --git a/miner/contract.go b/miner/contract.go index 1a4dacc..88151cb 100644 --- a/miner/contract.go +++ b/miner/contract.go @@ -51,6 +51,8 @@ const ( balanceT2BugfixDryRunEnabled = false balanceT2BugfixEnabled = false + + clearBugfixDebugInfoEnabled = false ) // . @@ -139,7 +141,6 @@ type ( recalculated struct { model.RecalculatedBalanceForTMinus1AtField - model.RecalculatedBalanceT2AtField model.DeserializedRecalculatedUsersKey } @@ -153,6 +154,12 @@ type ( ID, IDT0, IDTMinus1 int64 } + dryrunUser struct { + model.DeserializedDryRunUsersKey + model.IDTMinus1Field + model.RecalculatedBalanceForTMinus1AtField + } + miner struct { mb messagebroker.Client db storage.DB diff --git a/miner/miner.go b/miner/miner.go index e8700bf..e78da4e 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -123,39 +123,41 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { log.Error(dwhClient.Close()) }() var ( - batchNumber int64 - totalBatches uint64 - iteration uint64 - now, lastIterationStartedAt = time.Now(), time.Now() - currentAdoption = m.getAdoption(ctx, m.db, workerNumber) - workers = cfg.Workers - batchSize = cfg.BatchSize - userKeys, userHistoryKeys, referralKeys, referralIDTMinus1KeysOnly, userRecalculatedKeys = make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, 2*batchSize), make([]string, 0, batchSize), make([]string, 0, batchSize) - userResults, referralResults, userRecalculatedResults = make([]*user, 0, batchSize), make([]*referral, 0, 2*batchSize), make([]*recalculated, 0, batchSize) - t0Referrals, tMinus1Referrals = make(map[int64]*referral, batchSize), make(map[int64]*referral, batchSize) - t1ReferralsToIncrementActiveValue, t2ReferralsToIncrementActiveValue = make(map[int64]int32, batchSize), make(map[int64]int32, batchSize) - t1ReferralsThatStoppedMining, t2ReferralsThatStoppedMining = make(map[int64]uint32, batchSize), make(map[int64]uint32, batchSize) - history = make(map[int64][]*dwh.AdjustUserInfo, 2*batchSize) - recalculatedUsers = make(map[int64]*recalculated, batchSize) - recalculatedUsersUpdated = make([]*recalculated, 0, batchSize) - referralsThatStoppedMining = make([]*referralThatStoppedMining, 0, batchSize) - msgResponder = make(chan error, 3*batchSize) - msgs = make([]*messagebroker.Message, 0, 3*batchSize) - errs = make([]error, 0, 3*batchSize) - updatedUsers = make([]*UpdatedUser, 0, batchSize) - extraBonusOnlyUpdatedUsers = make([]*extrabonusnotifier.UpdatedUser, 0, batchSize) - referralsCountGuardOnlyUpdatedUsers = make([]*referralCountGuardUpdatedUser, 0, batchSize) - referralsUpdated = make([]*referralUpdated, 0, batchSize) - histories = make([]*model.User, 0, batchSize) - userGlobalRanks = make([]redis.Z, 0, batchSize) - balanceTMinus1RecalculationDryRunItems = make([]*balanceTMinus1RecalculationDryRun, 0, batchSize) - balanceT2RecalculationDryRunItems = make([]*balanceT2RecalculationDryRun, 0, batchSize) - balances = make(map[int64]float64, 0) - historyColumns, historyInsertMetadata = dwh.InsertDDL(int(batchSize)) - shouldSynchronizeBalanceFunc = func(batchNumberArg uint64) bool { return false } - allAdoptions []*tokenomics.Adoption[float64] - referralsCollection = make(map[string]*recalculateReferral, 0) - t2Referrals = make(map[string][]string, 0) + batchNumber int64 + totalBatches uint64 + iteration uint64 + now, lastIterationStartedAt = time.Now(), time.Now() + currentAdoption = m.getAdoption(ctx, m.db, workerNumber) + workers = cfg.Workers + batchSize = cfg.BatchSize + userKeys, userHistoryKeys, userInitialBalanceKeys, referralKeys, recalculatedKeys = make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, 2*batchSize), make([]string, 0, batchSize), make([]string, 0, batchSize) + dryRunKeys = make([]string, 0, batchSize) + userResults, referralResults, userRecalculatedResults, dryrunResults = make([]*user, 0, batchSize), make([]*referral, 0, 2*batchSize), make([]*recalculated, 0, batchSize), make([]*dryrunUser, 0, batchSize) + t0Referrals, tMinus1Referrals = make(map[int64]*referral, batchSize), make(map[int64]*referral, batchSize) + t1ReferralsToIncrementActiveValue, t2ReferralsToIncrementActiveValue = make(map[int64]int32, batchSize), make(map[int64]int32, batchSize) + t1ReferralsThatStoppedMining, t2ReferralsThatStoppedMining = make(map[int64]uint32, batchSize), make(map[int64]uint32, batchSize) + history = make(map[int64][]*dwh.AdjustUserInfo, 2*batchSize) + recalculatedUsers, dryRunUsers = make(map[int64]*recalculated, batchSize), make(map[int64]*dryrunUser, batchSize) + recalculatedUsersUpdated, dryRunUsersUpdated = make([]*recalculated, 0, batchSize), make([]*dryrunUser, 0, batchSize) + referralsThatStoppedMining = make([]*referralThatStoppedMining, 0, batchSize) + msgResponder = make(chan error, 3*batchSize) + msgs = make([]*messagebroker.Message, 0, 3*batchSize) + errs = make([]error, 0, 3*batchSize) + updatedUsers = make([]*UpdatedUser, 0, batchSize) + extraBonusOnlyUpdatedUsers = make([]*extrabonusnotifier.UpdatedUser, 0, batchSize) + referralsCountGuardOnlyUpdatedUsers = make([]*referralCountGuardUpdatedUser, 0, batchSize) + referralsUpdated = make([]*referralUpdated, 0, batchSize) + histories = make([]*model.User, 0, batchSize) + userGlobalRanks = make([]redis.Z, 0, batchSize) + balanceTMinus1RecalculationDryRunItems = make([]*balanceTMinus1RecalculationDryRun, 0, batchSize) + balanceT2RecalculationDryRunItems = make([]*balanceT2RecalculationDryRun, 0, batchSize) + balances = make(map[int64]float64, 0) + historyColumns, historyInsertMetadata = dwh.InsertDDL(int(batchSize)) + shouldSynchronizeBalanceFunc = func(batchNumberArg uint64) bool { return false } + allAdoptions []*tokenomics.Adoption[float64] + referralsCollection = make(map[string]*recalculateReferral, 0) + t2Referrals = make(map[string][]string, 0) + clickhouseKeysMap = make(map[string]struct{}, batchSize) ) resetVars := func(success bool) { if success && len(userKeys) == int(batchSize) && len(userResults) == 0 { @@ -177,8 +179,8 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { if batchNumber == 0 || currentAdoption == nil { currentAdoption = m.getAdoption(ctx, m.db, workerNumber) } - userKeys, userHistoryKeys, referralKeys, referralIDTMinus1KeysOnly, userRecalculatedKeys = userKeys[:0], userHistoryKeys[:0], referralKeys[:0], referralIDTMinus1KeysOnly[:0], userRecalculatedKeys[:0] - userResults, referralResults, userRecalculatedResults, recalculatedUsersUpdated = userResults[:0], referralResults[:0], userRecalculatedResults[:0], recalculatedUsersUpdated[:0] + userKeys, userHistoryKeys, referralKeys, recalculatedKeys, dryRunKeys = userKeys[:0], userHistoryKeys[:0], referralKeys[:0], recalculatedKeys[:0], dryRunKeys[:0] + userResults, referralResults, userRecalculatedResults, recalculatedUsersUpdated, dryrunResults = userResults[:0], referralResults[:0], userRecalculatedResults[:0], recalculatedUsersUpdated[:0], dryrunResults[:0] msgs, errs = msgs[:0], errs[:0] updatedUsers = updatedUsers[:0] extraBonusOnlyUpdatedUsers = extraBonusOnlyUpdatedUsers[:0] @@ -190,6 +192,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { allAdoptions = allAdoptions[:0] balanceTMinus1RecalculationDryRunItems = balanceTMinus1RecalculationDryRunItems[:0] balanceT2RecalculationDryRunItems = balanceT2RecalculationDryRunItems[:0] + userInitialBalanceKeys = userInitialBalanceKeys[:0] for k := range t0Referrals { delete(t0Referrals, k) } @@ -223,6 +226,9 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { for k := range recalculatedUsers { delete(recalculatedUsers, k) } + for k := range clickhouseKeysMap { + delete(clickhouseKeysMap, k) + } } for ctx.Err() == nil { /****************************************************************************************************************************************************** @@ -231,8 +237,16 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { if len(userKeys) == 0 { for ix := batchNumber * batchSize; ix < (batchNumber+1)*batchSize; ix++ { userKeys = append(userKeys, model.SerializedUsersKey((workers*ix)+workerNumber)) - if balanceForTMinusBugfixEnabled || balanceT2BugfixEnabled { - userRecalculatedKeys = append(userRecalculatedKeys, model.SerializedRecalculatedUsersKey((workers*ix)+workerNumber)) + if balanceForTMinusBugfixEnabled { + if balanceForTMinusBugfixDryRunEnabled { + dryRunKeys = append(dryRunKeys, model.SerializedDryRunUsersKey((workers*ix)+workerNumber)) + } else { + recalculatedKeys = append(recalculatedKeys, model.SerializedRecalculatedUsersKey((workers*ix)+workerNumber)) + } + } + if clearBugfixDebugInfoEnabled { + dryRunKeys = append(dryRunKeys, model.SerializedDryRunUsersKey((workers*ix)+workerNumber)) + recalculatedKeys = append(recalculatedKeys, model.SerializedRecalculatedUsersKey((workers*ix)+workerNumber)) } } } @@ -245,14 +259,17 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { continue } - reqCancel() - if balanceForTMinusBugfixEnabled || balanceT2BugfixEnabled { - reqCtx, reqCancel := context.WithTimeout(context.Background(), requestDeadline) - if err := storage.Bind[recalculated](reqCtx, m.db, userRecalculatedKeys, &userRecalculatedResults); err != nil { - log.Error(errors.Wrapf(err, "[miner] failed to get recalculated users for batchNumber:%v,workerNumber:%v", batchNumber, workerNumber)) + if clearBugfixDebugInfoEnabled { + _, err := m.db.Del(reqCtx, recalculatedKeys...).Result() + if err != nil { + log.Error(errors.Wrap(err, fmt.Sprintf("can't remove recalculated keys:%#v", recalculatedKeys))) + } + _, err = m.db.Del(reqCtx, dryRunKeys...).Result() + if err != nil { + log.Error(errors.Wrap(err, fmt.Sprintf("can't remove dry run keys:%#v", dryRunKeys))) } - reqCancel() } + reqCancel() if len(userKeys) > 0 { go m.telemetry.collectElapsed(2, *before.Time) } @@ -261,11 +278,51 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { 2. Fetching T0 & T-1 referrals of the fetched users. ******************************************************************************************************************************************************/ - if balanceForTMinusBugfixEnabled || balanceForTMinusBugfixDryRunEnabled { - var err error - balances, err = dwhClient.GetBaseBalanceForTMinus1(ctx, userHistoryKeys, int64(len(userKeys)), 0) - if err != nil { - log.Error(errors.Wrapf(err, "[miner] failed to fetch base balances for tminus1 batchNumber:%v,workerNumber:%v", batchNumber, workerNumber)) + if balanceForTMinusBugfixEnabled { + if balanceForTMinusBugfixDryRunEnabled { + if len(dryRunKeys) > 0 { + reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) + if err := storage.Bind[dryrunUser](reqCtx, m.db, dryRunKeys, &dryrunResults); err != nil { + log.Error(errors.Wrapf(err, "[miner] failed to get dry run users for batchNumber:%v,workerNumber:%v", batchNumber, workerNumber)) + } + reqCancel() + for _, usr := range dryrunResults { + dryRunUsers[usr.ID] = usr + } + for _, usr := range userResults { + if _, ok := dryRunUsers[usr.ID]; !ok && usr.IDTMinus1 != 0 { + userInitialBalanceKeys = append(userInitialBalanceKeys, fmt.Sprint(usr.ID)) + clickhouseKeysMap[fmt.Sprint(usr.ID)] = struct{}{} + } + } + } + } else { + if len(recalculatedKeys) > 0 { + reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) + if err := storage.Bind[recalculated](reqCtx, m.db, recalculatedKeys, &userRecalculatedResults); err != nil { + log.Error(errors.Wrapf(err, "[miner] failed to get user recalculated users for batchNumber:%v,workerNumber:%v", batchNumber, workerNumber)) + } + reqCancel() + for _, usr := range userRecalculatedResults { + recalculatedUsers[usr.ID] = usr + } + for _, usr := range userResults { + if _, ok := recalculatedUsers[usr.ID]; !ok && usr.IDTMinus1 != 0 { + userInitialBalanceKeys = append(userInitialBalanceKeys, fmt.Sprint(usr.ID)) + clickhouseKeysMap[fmt.Sprint(usr.ID)] = struct{}{} + } + } + } + } + + if len(userInitialBalanceKeys) > 0 { + var err error + reqCtx, reqCancel := context.WithTimeout(context.Background(), requestDeadline) + balances, err = dwhClient.GetBaseBalanceForTMinus1(reqCtx, userInitialBalanceKeys, int64(len(userKeys)), 0) + if err != nil { + log.Error(errors.Wrapf(err, "[miner] failed to fetch base balances for tminus1 batchNumber:%v,workerNumber:%v", batchNumber, workerNumber)) + } + reqCancel() } } @@ -279,34 +336,44 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { if usr.IDT0 < 0 { t0Referrals[-usr.IDT0] = nil } + idTMinus1Key := usr.IDTMinus1 if usr.IDTMinus1 > 0 { tMinus1Referrals[usr.IDTMinus1] = nil } if usr.IDTMinus1 < 0 { tMinus1Referrals[-usr.IDTMinus1] = nil + idTMinus1Key *= -1 + } + if balanceForTMinusBugfixEnabled { + if balanceForTMinusBugfixDryRunEnabled { + if _, ok := dryRunUsers[usr.ID]; !ok { + clickhouseKeysMap[fmt.Sprint(idTMinus1Key)] = struct{}{} + } + } else { + if _, ok := recalculatedUsers[usr.ID]; !ok { + clickhouseKeysMap[fmt.Sprint(idTMinus1Key)] = struct{}{} + } + } } } for idT0 := range t0Referrals { referralKeys = append(referralKeys, model.SerializedUsersKey(idT0)) } - for idTMinus1 := range tMinus1Referrals { - referralKeys = append(referralKeys, model.SerializedUsersKey(idTMinus1)) - - if balanceForTMinusBugfixEnabled || balanceForTMinusBugfixDryRunEnabled { - key := idTMinus1 - if idTMinus1 < 0 { - key *= -1 - } - referralIDTMinus1KeysOnly = append(referralIDTMinus1KeysOnly, fmt.Sprint(key)) - } - } if balanceForTMinusBugfixEnabled || balanceForTMinusBugfixDryRunEnabled { var err error - reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) - history, err = gatherHistory(reqCtx, dwhClient, userResults, referralIDTMinus1KeysOnly) - if err != nil { - log.Error(err, fmt.Sprintf("can't gather history for: %#v", userResults)) + if len(clickhouseKeysMap) > 0 { + var clickhouseKeys []string + for key, _ := range clickhouseKeysMap { + clickhouseKeys = append(clickhouseKeys, key) + } + reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) + history, err = gatherHistory(reqCtx, dwhClient, clickhouseKeys) + if err != nil { + log.Error(err, fmt.Sprintf("can't gather history for: %#v", userResults)) + } + reqCancel() } + reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) allAdoptions, err = tokenomics.GetAllAdoptions[float64](reqCtx, m.db) if err != nil { log.Error(err, fmt.Sprintf("can't gather adoptions for workerNumber: %v", workerNumber)) @@ -336,12 +403,6 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { go m.telemetry.collectElapsed(3, *before.Time) } - if balanceForTMinusBugfixEnabled || balanceForTMinusBugfixDryRunEnabled || balanceT2BugfixEnabled || balanceT2BugfixDryRunEnabled { - for _, usr := range userRecalculatedResults { - recalculatedUsers[usr.ID] = usr - } - } - /****************************************************************************************************************************************************** 3. Mining for the users. ******************************************************************************************************************************************************/ @@ -375,21 +436,41 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { if isAdvancedTeamDisabled(usr.LatestDevice) { usr.ActiveT2Referrals = 0 } - if balanceForTMinusBugfixEnabled || balanceForTMinusBugfixDryRunEnabled { - if recalculatedVal, ok := recalculatedUsers[usr.ID]; !ok || (recalculatedVal != nil && recalculatedVal.RecalculatedBalanceForTMinus1At.IsNil()) { - if recalculatedUsr := m.recalculateBalanceTMinus1(usr, allAdoptions, history, balances); recalculatedUsr != nil { - tMinus1ID := "" - if tMinus1Ref != nil { - tMinus1ID = tMinus1Ref.UserID + if balanceForTMinusBugfixEnabled && history != nil && allAdoptions != nil && balances != nil { + if balanceForTMinusBugfixDryRunEnabled { + if _, ok := dryRunUsers[usr.ID]; !ok { + if recalculatedUsr := m.recalculateBalanceTMinus1(usr, allAdoptions, history, balances); recalculatedUsr != nil { + tMinus1ID := "" + if tMinus1Ref != nil { + tMinus1ID = tMinus1Ref.UserID + } + balanceTMinus1RecalculationDryRunItems = append(balanceTMinus1RecalculationDryRunItems, &balanceTMinus1RecalculationDryRun{ + OldTMinus1Balance: usr.BalanceForTMinus1, + NewTMinus1Balance: recalculatedUsr.BalanceForTMinus1, + UserID: usr.UserID, + TMinus1ID: tMinus1ID, + }) + + dryRunUsersUpdated = append(dryRunUsersUpdated, &dryrunUser{ + DeserializedDryRunUsersKey: model.DeserializedDryRunUsersKey{ID: usr.ID}, + RecalculatedBalanceForTMinus1AtField: model.RecalculatedBalanceForTMinus1AtField{RecalculatedBalanceForTMinus1At: now}, + }) } - balanceTMinus1RecalculationDryRunItems = append(balanceTMinus1RecalculationDryRunItems, &balanceTMinus1RecalculationDryRun{ - OldTMinus1Balance: usr.BalanceForTMinus1, - NewTMinus1Balance: recalculatedUsr.BalanceForTMinus1, - UserID: usr.UserID, - TMinus1ID: tMinus1ID, - }) - - if !balanceForTMinusBugfixDryRunEnabled { + } + } else { + if _, ok := recalculatedUsers[usr.ID]; !ok { + if recalculatedUsr := m.recalculateBalanceTMinus1(usr, allAdoptions, history, balances); recalculatedUsr != nil { + tMinus1ID := "" + if tMinus1Ref != nil { + tMinus1ID = tMinus1Ref.UserID + } + balanceTMinus1RecalculationDryRunItems = append(balanceTMinus1RecalculationDryRunItems, &balanceTMinus1RecalculationDryRun{ + OldTMinus1Balance: usr.BalanceForTMinus1, + NewTMinus1Balance: recalculatedUsr.BalanceForTMinus1, + UserID: usr.UserID, + TMinus1ID: tMinus1ID, + }) + usr.BalanceForTMinus1 = recalculatedUsr.BalanceForTMinus1 recalculatedUsersUpdated = append(recalculatedUsersUpdated, &recalculated{ @@ -401,29 +482,22 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { } } if (balanceT2BugfixEnabled || balanceT2BugfixDryRunEnabled) && !balanceForTMinusBugfixEnabled { - if recalculatedVal, ok := recalculatedUsers[usr.ID]; !ok || (recalculatedVal != nil && recalculatedVal.RecalculatedBalanceT2At.IsNil()) { - balanceT2 := 0.0 - if referralsT2, ok := t2Referrals[usr.UserID]; ok { - for _, ref := range referralsT2 { - if _, ok := referralsCollection[ref]; ok { - balanceT2 += referralsCollection[ref].BalanceForTMinus1 - } + balanceT2 := 0.0 + if referralsT2, ok := t2Referrals[usr.UserID]; ok { + for _, ref := range referralsT2 { + if _, ok := referralsCollection[ref]; ok { + balanceT2 += referralsCollection[ref].BalanceForTMinus1 } } - balanceT2RecalculationDryRunItems = append(balanceT2RecalculationDryRunItems, &balanceT2RecalculationDryRun{ - OldT2Balance: usr.BalanceT2, - NewT2Balance: balanceT2, - UserID: usr.UserID, - }) - - if !balanceT2BugfixDryRunEnabled { - usr.BalanceT2 = balanceT2 - - recalculatedUsersUpdated = append(recalculatedUsersUpdated, &recalculated{ - DeserializedRecalculatedUsersKey: model.DeserializedRecalculatedUsersKey{ID: usr.ID}, - RecalculatedBalanceT2AtField: model.RecalculatedBalanceT2AtField{RecalculatedBalanceT2At: now}, - }) - } + } + balanceT2RecalculationDryRunItems = append(balanceT2RecalculationDryRunItems, &balanceT2RecalculationDryRun{ + OldT2Balance: usr.BalanceT2, + NewT2Balance: balanceT2, + UserID: usr.UserID, + }) + + if !balanceT2BugfixDryRunEnabled { + usr.BalanceT2 = balanceT2 } } @@ -569,7 +643,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { } var pipeliner redis.Pipeliner - if len(recalculatedUsersUpdated)+len(t1ReferralsToIncrementActiveValue)+len(t2ReferralsToIncrementActiveValue)+len(referralsCountGuardOnlyUpdatedUsers)+len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks) > 0 { + if len(dryRunUsersUpdated)+len(recalculatedUsersUpdated)+len(t1ReferralsToIncrementActiveValue)+len(t2ReferralsToIncrementActiveValue)+len(referralsCountGuardOnlyUpdatedUsers)+len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks) > 0 { pipeliner = m.db.TxPipeline() } else { pipeliner = m.db.Pipeline() @@ -613,6 +687,11 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { return err } } + for _, value := range dryRunUsersUpdated { + if err := pipeliner.HSet(reqCtx, value.Key(), storage.SerializeValue(value)...).Err(); err != nil { + return err + } + } for _, value := range extraBonusOnlyUpdatedUsers { if err := pipeliner.HSet(reqCtx, value.Key(), storage.SerializeValue(value)...).Err(); err != nil { return err diff --git a/miner/recalculate_balance.go b/miner/recalculate_balance.go index d02ef25..3b35b2d 100644 --- a/miner/recalculate_balance.go +++ b/miner/recalculate_balance.go @@ -276,15 +276,10 @@ func calculateTimeBounds(refTimeRange, usrRange *dwh.AdjustUserInfo) (*time.Time return startedAt, endedAt } -func gatherHistory(ctx context.Context, dwhClient dwh.Client, users []*user, referralIDTMinus1Keys []string) (history map[int64][]*dwh.AdjustUserInfo, err error) { - if len(users) == 0 { +func gatherHistory(ctx context.Context, dwhClient dwh.Client, keys []string) (history map[int64][]*dwh.AdjustUserInfo, err error) { + if len(keys) == 0 { return nil, nil } - var keys []string - for _, usr := range users { - keys = append(keys, fmt.Sprint(usr.ID)) - } - keys = append(keys, referralIDTMinus1Keys...) offset := int64(0) historyTimeRanges := make(map[int64][]*dwh.AdjustUserInfo, 0) for { @@ -308,7 +303,7 @@ func gatherHistory(ctx context.Context, dwhClient dwh.Client, users []*user, ref } func (m *miner) recalculateBalanceTMinus1(usr *user, adoptions []*tokenomics.Adoption[float64], history map[int64][]*dwh.AdjustUserInfo, baseTMinus1Balances map[int64]float64) *user { - if history == nil || adoptions == nil { + if adoptions == nil || history == nil || baseTMinus1Balances == nil || len(adoptions) == 0 || len(history) == 0 || len(baseTMinus1Balances) == 0 { return nil } startTime, err := stdlibtime.Parse(timeLayout, startRecalculationsFrom) diff --git a/model/model.go b/model/model.go index 3642aec..82eea2a 100644 --- a/model/model.go +++ b/model/model.go @@ -308,6 +308,10 @@ type ( DeserializedRecalculatedUsersKey struct { ID int64 `redis:"-"` } + + DeserializedDryRunUsersKey struct { + ID int64 `redis:"-"` + } ) func (k *DeserializedUsersKey) Key() string { @@ -532,3 +536,42 @@ func SerializedRecalculatedUsersKey(val any) string { panic(fmt.Sprintf("%#v cannot be used as recalculated key", val)) } } + +func (k *DeserializedDryRunUsersKey) Key() string { + if k == nil || k.ID == 0 { + return "" + } + + return SerializedDryRunUsersKey(k.ID) +} + +func (k *DeserializedDryRunUsersKey) SetKey(val string) { + if val == "" || val == "dryrun:" { + return + } + if val[0] == 'd' { + val = val[7:] + } + var err error + k.ID, err = strconv.ParseInt(val, 10, 64) + log.Panic(err) +} + +func SerializedDryRunUsersKey(val any) string { + switch typedVal := val.(type) { + case string: + if typedVal == "" { + return "" + } + + return "dryrun:" + typedVal + case int64: + if typedVal == 0 { + return "" + } + + return "dryrun:" + strconv.FormatInt(typedVal, 10) + default: + panic(fmt.Sprintf("%#v cannot be used as dryrun key", val)) + } +}