diff --git a/bookkeeper/storage/contract.go b/bookkeeper/storage/contract.go index 411b1281..855735df 100644 --- a/bookkeeper/storage/contract.go +++ b/bookkeeper/storage/contract.go @@ -24,28 +24,6 @@ type ( Ping(ctx context.Context) error Insert(ctx context.Context, columns *Columns, input InsertMetadata, usrs []*model.User) error SelectBalanceHistory(ctx context.Context, id int64, createdAts []stdlibtime.Time) ([]*BalanceHistory, error) - GetAdjustUserInformation(ctx context.Context, userIDs []string, limit, offset int64) ([]*AdjustUserInfo, error) - } - AdjustUserInfo struct { - MiningSessionSoloStartedAt *time.Time - MiningSessionSoloEndedAt *time.Time - MiningSessionSoloLastStartedAt *time.Time - MiningSessionSoloPreviouslyEndedAt *time.Time - CreatedAt *time.Time - ResurrectSoloUsedAt *time.Time - UserID string - ID int64 - SlashingRateSolo float64 - SlashingRateT1 float64 - SlashingRateT2 float64 - BalanceSolo float64 - BalanceT0 float64 - BalanceT1Pending float64 - BalanceT1PendingApplied float64 - BalanceT2Pending float64 - BalanceT2PendingApplied float64 - PrestakingAllocation uint16 - PrestakingBonus uint16 } BalanceHistory struct { CreatedAt *time.Time diff --git a/bookkeeper/storage/storage.go b/bookkeeper/storage/storage.go index 205f42a7..108d4c9e 100644 --- a/bookkeeper/storage/storage.go +++ b/bookkeeper/storage/storage.go @@ -14,7 +14,6 @@ import ( "github.com/ClickHouse/ch-go/chpool" "github.com/ClickHouse/ch-go/proto" "github.com/hashicorp/go-multierror" - "github.com/pkg/errors" "go.uber.org/zap" "github.com/ice-blockchain/freezer/model" @@ -443,132 +442,3 @@ func (db *db) SelectBalanceHistory(ctx context.Context, id int64, createdAts []s return res, nil } - -func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs []string, limit, offset int64) ([]*AdjustUserInfo, error) { - const ( - maxIDCount = 25_000 - ) - var ( - res = make([]*AdjustUserInfo, 0, len(userIDs)) - counter = 0 - ) - var userIDArray []string - for _, id := range userIDs { - userIDArray = append(userIDArray, id) - counter++ - if counter >= maxIDCount { // Hack not to have 'Max query size exceeded' error. - result, err := db.getAdjustUserInformation(ctx, userIDArray, limit, offset) - if err != nil { - return nil, errors.Wrapf(err, "can't get adjust user information for userIDs:%#v", userIDArray) - } - res = append(res, result...) - userIDArray = userIDArray[:0] - counter = 0 - - continue - } - } - if len(userIDArray) > 0 { - result, err := db.getAdjustUserInformation(ctx, userIDArray, limit, offset) - if err != nil { - return nil, errors.Wrapf(err, "can't get adjust user information for userIDs:%#v", userIDArray) - } - res = append(res, result...) - } - - return res, nil -} - -func (db *db) getAdjustUserInformation(ctx context.Context, userIDArray []string, limit, offset int64) ([]*AdjustUserInfo, error) { - var ( - id = make(proto.ColInt64, 0, len(userIDArray)) - userID = &proto.ColStr{Buf: make([]byte, 0, 40*len(userIDArray)), Pos: make([]proto.Position, 0, len(userIDArray))} - miningSessionSoloStartedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC} - miningSessionSoloEndedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC} - miningSessionSoloPreviouslyEndedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC} - slashingRateSolo = make(proto.ColFloat64, 0, len(userIDArray)) - createdAt = proto.ColDateTime{Data: make([]proto.DateTime, 0), Location: stdlibtime.UTC} - resurrectSoloUsedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC} - balanceSolo = make(proto.ColFloat64, 0, len(userIDArray)) - balanceT1Pending = make(proto.ColFloat64, 0, len(userIDArray)) - balanceT1PendingApplied = make(proto.ColFloat64, 0, len(userIDArray)) - balanceT2Pending = make(proto.ColFloat64, 0, len(userIDArray)) - balanceT2PendingApplied = make(proto.ColFloat64, 0, len(userIDArray)) - res = make([]*AdjustUserInfo, 0, len(userIDArray)) - ) - if err := db.pools[atomic.AddUint64(&db.currentIndex, 1)%uint64(len(db.pools))].Do(ctx, ch.Query{ - Body: fmt.Sprintf(`SELECT id, - user_id, - mining_session_solo_started_at, - mining_session_solo_ended_at, - mining_session_solo_previously_ended_at, - slashing_rate_solo, - created_at, - resurrect_solo_used_at, - balance_solo, - balance_t1_pending, - balance_t1_pending_applied, - balance_t2_pending, - balance_t2_pending_applied - FROM %[1]v - WHERE id IN [%[2]v] - ORDER BY created_at ASC - LIMIT %[3]v, %[4]v - `, tableName, strings.Join(userIDArray, ","), offset, limit), - Result: append(make(proto.Results, 0, 13), - proto.ResultColumn{Name: "id", Data: &id}, - proto.ResultColumn{Name: "user_id", Data: userID}, - proto.ResultColumn{Name: "mining_session_solo_started_at", Data: &miningSessionSoloStartedAt}, - proto.ResultColumn{Name: "mining_session_solo_ended_at", Data: &miningSessionSoloEndedAt}, - proto.ResultColumn{Name: "mining_session_solo_previously_ended_at", Data: &miningSessionSoloPreviouslyEndedAt}, - proto.ResultColumn{Name: "slashing_rate_solo", Data: &slashingRateSolo}, - proto.ResultColumn{Name: "created_at", Data: &createdAt}, - proto.ResultColumn{Name: "resurrect_solo_used_at", Data: &resurrectSoloUsedAt}, - proto.ResultColumn{Name: "balance_solo", Data: &balanceSolo}, - proto.ResultColumn{Name: "balance_t1_pending", Data: &balanceT1Pending}, - proto.ResultColumn{Name: "balance_t1_pending_applied", Data: &balanceT1PendingApplied}, - proto.ResultColumn{Name: "balance_t2_pending", Data: &balanceT2Pending}, - proto.ResultColumn{Name: "balance_t2_pending_applied", Data: &balanceT2PendingApplied}, - ), - OnResult: func(_ context.Context, block proto.Block) error { - for ix := 0; ix < block.Rows; ix++ { - res = append(res, &AdjustUserInfo{ - ID: (&id).Row(ix), - UserID: userID.Row(ix), - MiningSessionSoloStartedAt: time.New((&miningSessionSoloStartedAt).Row(ix)), - MiningSessionSoloEndedAt: time.New((&miningSessionSoloEndedAt).Row(ix)), - MiningSessionSoloPreviouslyEndedAt: time.New((&miningSessionSoloPreviouslyEndedAt).Row(ix)), - SlashingRateSolo: (&slashingRateSolo).Row(ix), - CreatedAt: time.New((&createdAt).Row(ix)), - ResurrectSoloUsedAt: time.New((&resurrectSoloUsedAt).Row(ix)), - BalanceSolo: (&balanceSolo).Row(ix), - BalanceT1Pending: (&balanceT1Pending).Row(ix), - BalanceT1PendingApplied: (&balanceT1PendingApplied).Row(ix), - BalanceT2Pending: (&balanceT2Pending).Row(ix), - BalanceT2PendingApplied: (&balanceT2PendingApplied).Row(ix), - }) - } - (&id).Reset() - userID.Reset() - (&miningSessionSoloStartedAt).Reset() - (&miningSessionSoloEndedAt).Reset() - (&miningSessionSoloPreviouslyEndedAt).Reset() - (&slashingRateSolo).Reset() - (&createdAt).Reset() - (&resurrectSoloUsedAt).Reset() - (&balanceSolo).Reset() - (&balanceT1Pending).Reset() - (&balanceT1PendingApplied).Reset() - (&balanceT2Pending).Reset() - (&balanceT2PendingApplied).Reset() - - return nil - }, - Secret: "", - InitialUser: "", - }); err != nil { - return nil, err - } - - return res, nil -} diff --git a/bookkeeper/storage/storage_test.go b/bookkeeper/storage/storage_test.go index a362fdd8..519a6d10 100644 --- a/bookkeeper/storage/storage_test.go +++ b/bookkeeper/storage/storage_test.go @@ -113,40 +113,3 @@ func TestStorage(t *testing.T) { sort.SliceStable(h2, func(ii, jj int) bool { return h2[ii].CreatedAt.Before(*h2[jj].CreatedAt.Time) }) assert.EqualValues(t, []*BalanceHistory{}, h2) } - -func TestStorage_SelectAdjustUserInformation_NoError_On_LongValues(t *testing.T) { - cl := MustConnect(context.Background(), "self") - defer func() { - if err := recover(); err != nil { - cl.Close() - panic(err) - } - cl.Close() - }() - require.NoError(t, cl.Ping(context.Background())) - - limit := int64(1000) - offset := int64(0) - userIDs := make(map[int64]struct{}, 1_000_000) - for ix := 1_000_000; ix < 2_000_000; ix++ { - userIDs[int64(ix)] = struct{}{} - } - _, err := cl.GetAdjustUserInformation(context.Background(), userIDs, limit, offset) - require.NoError(t, err) - - userIDs = nil - userIDs = make(map[int64]struct{}, 1_000_000) - for ix := 10_000_000; ix < 11_000_000; ix++ { - userIDs[int64(ix)] = struct{}{} - } - _, err = cl.GetAdjustUserInformation(context.Background(), userIDs, limit, offset) - require.NoError(t, err) - - userIDs = nil - userIDs = make(map[int64]struct{}, 1_000_000) - for ix := 54_000_000; ix < 55_000_000; ix++ { - userIDs[int64(ix)] = struct{}{} - } - _, err = cl.GetAdjustUserInformation(context.Background(), userIDs, limit, offset) - require.NoError(t, err) -} diff --git a/miner/adoption_range_test.go b/miner/adoption_range_test.go deleted file mode 100644 index b5143d32..00000000 --- a/miner/adoption_range_test.go +++ /dev/null @@ -1,151 +0,0 @@ -// SPDX-License-Identifier: ice License 1.0 - -package miner - -import ( - "testing" - stdlibtime "time" - - "github.com/stretchr/testify/assert" - - "github.com/ice-blockchain/freezer/tokenomics" - "github.com/ice-blockchain/wintr/time" -) - -func TestGetAdoptionsRange_1AdoptionPerRange(t *testing.T) { - var adoptions []*tokenomics.Adoption[float64] - adoptions = append(adoptions, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-58 * stdlibtime.Minute)), - BaseMiningRate: 16.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-55 * stdlibtime.Minute)), - BaseMiningRate: 8.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Minute)), - BaseMiningRate: 4.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-10 * stdlibtime.Minute)), - BaseMiningRate: 2.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 1.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.5, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.25, - Milestone: 1, - TotalActiveUsers: 1, - }) - startedAt := time.New(time.Now().Add(-1 * stdlibtime.Minute)) - endedAt := time.Now() - ranges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) - assert.Equal(t, 2., ranges[0].BaseMiningRate) - assert.Equal(t, 2., ranges[1].BaseMiningRate) -} - -func TestGetAdoptionsRange_2AdoptionsPerRange(t *testing.T) { - var adoptions []*tokenomics.Adoption[float64] - adoptions = append(adoptions, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-58 * stdlibtime.Minute)), - BaseMiningRate: 16.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-55 * stdlibtime.Minute)), - BaseMiningRate: 8.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Minute)), - BaseMiningRate: 4.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-10 * stdlibtime.Minute)), - BaseMiningRate: 2.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Second)), - BaseMiningRate: 1.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.5, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.25, - Milestone: 1, - TotalActiveUsers: 1, - }) - startedAt := time.New(time.Now().Add(-1 * stdlibtime.Minute)) - endedAt := time.Now() - ranges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) - assert.Equal(t, 2., ranges[0].BaseMiningRate) - assert.Equal(t, 1., ranges[1].BaseMiningRate) - assert.Equal(t, 1., ranges[2].BaseMiningRate) -} - -func TestGetAdoptionsRange_AdoptionDemotionPerRange(t *testing.T) { - var adoptions []*tokenomics.Adoption[float64] - adoptions = append(adoptions, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Second)), - BaseMiningRate: 16.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-55 * stdlibtime.Minute)), - BaseMiningRate: 8.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Minute)), - BaseMiningRate: 4.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-1 * stdlibtime.Minute)), - BaseMiningRate: 2.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 1.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.5, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.25, - Milestone: 1, - TotalActiveUsers: 1, - }) - startedAt := time.New(time.Now().Add(-2 * stdlibtime.Minute)) - endedAt := time.Now() - ranges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) - assert.Equal(t, 4., ranges[0].BaseMiningRate) - assert.Equal(t, 2., ranges[1].BaseMiningRate) - assert.Equal(t, 16., ranges[2].BaseMiningRate) - assert.Equal(t, 16., ranges[3].BaseMiningRate) -} diff --git a/miner/contract.go b/miner/contract.go index 0503dcc1..cc4fb1ff 100644 --- a/miner/contract.go +++ b/miner/contract.go @@ -4,7 +4,6 @@ package miner import ( "context" - _ "embed" "io" "sync" "sync/atomic" @@ -14,7 +13,6 @@ import ( "github.com/ice-blockchain/freezer/model" "github.com/ice-blockchain/freezer/tokenomics" messagebroker "github.com/ice-blockchain/wintr/connectors/message_broker" - storagePG "github.com/ice-blockchain/wintr/connectors/storage/v2" "github.com/ice-blockchain/wintr/connectors/storage/v3" "github.com/ice-blockchain/wintr/time" ) @@ -48,9 +46,6 @@ const ( var ( //nolint:gochecknoglobals // Singleton & global config mounted only during bootstrap. cfg config - - //go:embed .testdata/DDL.sql - eskimoDDL string ) type ( @@ -62,7 +57,6 @@ type ( model.ExtraBonusStartedAtField model.LatestDeviceField model.UserIDField - model.UsernameField UpdatedUser model.BalanceSoloPendingField model.BalanceT1PendingField @@ -112,24 +106,6 @@ type ( model.IDTMinus1Field } - backupUserUpdated struct { - model.DeserializedBackupUsersKey - model.UserIDField - model.BalanceT1Field - model.BalanceT2Field - model.SlashingRateT1Field - model.SlashingRateT2Field - model.ActiveT1ReferralsField - model.ActiveT2ReferralsField - model.FirstRecalculatedBalanceT1Field - model.FirstRecalculatedBalanceT2Field - model.FirstRecalculatedActiveT1ReferralsField - model.FirstRecalculatedActiveT2ReferralsField - model.FirstRecalculatedSlashingRateT1Field - model.FirstRecalculatedSlashingRateT2Field - model.BalancesBackupUsedAtField - } - referral struct { model.MiningSessionSoloStartedAtField model.MiningSessionSoloEndedAtField @@ -148,14 +124,12 @@ type ( miner struct { mb messagebroker.Client db storage.DB - dbPG *storagePG.DB dwhClient dwh.Client cancel context.CancelFunc telemetry *telemetry wg *sync.WaitGroup extraBonusStartDate *time.Time extraBonusIndicesDistribution map[uint16]map[uint16]uint16 - recalculationBalanceStartDate *time.Time } config struct { disableAdvancedTeam *atomic.Pointer[[]string] diff --git a/miner/miner.go b/miner/miner.go index 9c88d263..aaff2a89 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -23,7 +23,6 @@ import ( "github.com/ice-blockchain/freezer/tokenomics" appCfg "github.com/ice-blockchain/wintr/config" messagebroker "github.com/ice-blockchain/wintr/connectors/message_broker" - storagePG "github.com/ice-blockchain/wintr/connectors/storage/v2" "github.com/ice-blockchain/wintr/connectors/storage/v3" "github.com/ice-blockchain/wintr/log" "github.com/ice-blockchain/wintr/time" @@ -39,7 +38,6 @@ func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client { mi := &miner{ mb: messagebroker.MustConnect(context.Background(), parentApplicationYamlKey), db: storage.MustConnect(context.Background(), parentApplicationYamlKey, int(cfg.Workers)), - dbPG: storagePG.MustConnect(ctx, eskimoDDL, applicationYamlKey), dwhClient: dwh.MustConnect(context.Background(), applicationYamlKey), wg: new(sync.WaitGroup), telemetry: new(telemetry).mustInit(cfg), @@ -49,7 +47,6 @@ func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client { mi.cancel = cancel mi.extraBonusStartDate = extrabonusnotifier.MustGetExtraBonusStartDate(ctx, mi.db) mi.extraBonusIndicesDistribution = extrabonusnotifier.MustGetExtraBonusIndicesDistribution(ctx, mi.db) - mi.recalculationBalanceStartDate = mustGetRecalculationBalancesStartDate(ctx, mi.db) for workerNumber := int64(0); workerNumber < cfg.Workers; workerNumber++ { go func(wn int64) { @@ -68,7 +65,6 @@ func (m *miner) Close() error { return multierror.Append( errors.Wrap(m.mb.Close(), "failed to close mb"), errors.Wrap(m.db.Close(), "failed to close db"), - errors.Wrap(m.dbPG.Close(), "failed to close db pg"), errors.Wrap(m.dwhClient.Close(), "failed to close dwh"), ).ErrorOrNil() } @@ -114,38 +110,6 @@ func (m *miner) checkDBHealth(ctx context.Context) error { return nil } -func mustGetRecalculationBalancesStartDate(ctx context.Context, db storage.DB) (recalculationBalancesStartDate *time.Time) { - recalculationBalancesStartDateString, err := db.Get(ctx, "recalculation_balances_start_date").Result() - if err != nil && errors.Is(err, redis.Nil) { - err = nil - } - log.Panic(errors.Wrap(err, "failed to get recalculation_balances_start_date")) - if recalculationBalancesStartDateString != "" { - recalculationBalancesStartDate = new(time.Time) - log.Panic(errors.Wrapf(recalculationBalancesStartDate.UnmarshalText([]byte(recalculationBalancesStartDateString)), "failed to parse recalculation_balances_start_date `%v`", recalculationBalancesStartDateString)) //nolint:lll // . - recalculationBalancesStartDate = time.New(recalculationBalancesStartDate.UTC()) - - return - } - recalculationBalancesStartDate = time.Now() - set, sErr := db.SetNX(ctx, "recalculation_balances_start_date", recalculationBalancesStartDate, 0).Result() - log.Panic(errors.Wrap(sErr, "failed to set recalculation_balances_start_date")) - if !set { - return mustGetRecalculationBalancesStartDate(ctx, db) - } - - return recalculationBalancesStartDate -} - -func mustGetBalancesBackupMode(ctx context.Context, db storage.DB) (result bool, err error) { - balancesBackupModeString, err := db.Get(ctx, "balances_backup_mode").Result() - if err != nil && errors.Is(err, redis.Nil) { - err = nil - } - - return balancesBackupModeString == "true", err -} - func (m *miner) mine(ctx context.Context, workerNumber int64) { dwhClient := dwh.MustConnect(context.Background(), applicationYamlKey) defer func() { @@ -163,9 +127,8 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { currentAdoption = m.getAdoption(ctx, m.db, workerNumber) workers = cfg.Workers batchSize = cfg.BatchSize - metrics = new(balanceRecalculationMetrics) - userKeys, userBackupKeys, userHistoryKeys, referralKeys = make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, 2*batchSize) - userResults, backupUserResults, referralResults = make([]*user, 0, batchSize), make([]*backupUserUpdated, 0, batchSize), make([]*referral, 0, 2*batchSize) + userKeys, userHistoryKeys, referralKeys = make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, 2*batchSize) + userResults, referralResults = make([]*user, 0, batchSize), make([]*referral, 0, 2*batchSize) t0Referrals, tMinus1Referrals = make(map[int64]*referral, batchSize), make(map[int64]*referral, batchSize) t1ReferralsToIncrementActiveValue, t2ReferralsToIncrementActiveValue = make(map[int64]int32, batchSize), make(map[int64]int32, batchSize) t1ReferralsThatStoppedMining, t2ReferralsThatStoppedMining = make(map[int64]uint32, batchSize), make(map[int64]uint32, batchSize) @@ -178,13 +141,8 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { referralsUpdated = make([]*referralUpdated, 0, batchSize) histories = make([]*model.User, 0, batchSize) userGlobalRanks = make([]redis.Z, 0, batchSize) - backupedUsers = make(map[int64]*backupUserUpdated, batchSize) - backupUsersUpdated = make([]*backupUserUpdated, 0, batchSize) - recalculatedTiersBalancesUsers = make(map[int64]*user, batchSize) historyColumns, historyInsertMetadata = dwh.InsertDDL(int(batchSize)) shouldSynchronizeBalanceFunc = func(batchNumberArg uint64) bool { return false } - recalculationHistory *historyData - allAdoptions []*tokenomics.Adoption[float64] ) resetVars := func(success bool) { if success && len(userKeys) == int(batchSize) && len(userResults) == 0 { @@ -195,19 +153,6 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { panic("unexpected batch number: " + fmt.Sprint(batchNumber)) } totalBatches = uint64(batchNumber - 1) - metricsExists, err := m.getBalanceRecalculationMetrics(ctx, workerNumber) - if err != nil { - log.Error(err, "can't get balance recalculation metrics for worker:", workerNumber) - } - if metricsExists == nil && err == nil { - metrics.IterationsNum = int64(totalBatches) - metrics.EndedAt = time.Now() - metrics.Worker = workerNumber - if err := m.insertBalanceRecalculationMetrics(ctx, metrics); err != nil { - log.Error(err, "can't insert balance recalculation metrics for worker:", workerNumber) - } - metrics.reset() - } if totalBatches != 0 && iteration > 2 { shouldSynchronizeBalanceFunc = m.telemetry.shouldSynchronizeBalanceFunc(uint64(workerNumber), totalBatches, iteration) } @@ -219,7 +164,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { if batchNumber == 0 || currentAdoption == nil { currentAdoption = m.getAdoption(ctx, m.db, workerNumber) } - userKeys, userBackupKeys, userHistoryKeys, referralKeys = userKeys[:0], userBackupKeys[:0], userHistoryKeys[:0], referralKeys[:0] + userKeys, userHistoryKeys, referralKeys = userKeys[:0], userHistoryKeys[:0], referralKeys[:0] userResults, referralResults = userResults[:0], referralResults[:0] msgs, errs = msgs[:0], errs[:0] updatedUsers = updatedUsers[:0] @@ -228,8 +173,6 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { histories = histories[:0] userGlobalRanks = userGlobalRanks[:0] referralsThatStoppedMining = referralsThatStoppedMining[:0] - allAdoptions = allAdoptions[:0] - backupUsersUpdated = backupUsersUpdated[:0] for k := range t0Referrals { delete(t0Referrals, k) } @@ -248,14 +191,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { for k := range t2ReferralsToIncrementActiveValue { delete(t2ReferralsToIncrementActiveValue, k) } - for k := range backupedUsers { - delete(backupedUsers, k) - } - for k := range recalculatedTiersBalancesUsers { - delete(recalculatedTiersBalancesUsers, k) - } } - metrics.StartedAt = time.Now() for ctx.Err() == nil { /****************************************************************************************************************************************************** 1. Fetching a new batch of users. @@ -278,28 +214,6 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { if len(userKeys) > 0 { go m.telemetry.collectElapsed(2, *before.Time) } - for ix := batchNumber * batchSize; ix < (batchNumber+1)*batchSize; ix++ { - userBackupKeys = append(userBackupKeys, model.SerializedBackupUsersKey((workers*ix)+workerNumber)) - } - reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) - if err := storage.Bind[backupUserUpdated](reqCtx, m.db, userBackupKeys, &backupUserResults); err != nil { - log.Error(errors.Wrapf(err, "[miner] failed to get backuped users for batchNumber:%v,workerNumber:%v", batchNumber, workerNumber)) - reqCancel() - now = time.Now() - - continue - } - balanceBackupMode, err := mustGetBalancesBackupMode(reqCtx, m.db) - if err != nil { - log.Error(errors.Wrapf(err, "[miner] failed to get backup flag for batchNumber:%v,workerNumber:%v", batchNumber, workerNumber)) - reqCancel() - - continue - } - reqCancel() - for _, usr := range backupUserResults { - backupedUsers[usr.ID] = usr - } /****************************************************************************************************************************************************** 2. Fetching T0 & T-1 referrals of the fetched users. @@ -355,121 +269,12 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { t0Referrals[ref.ID] = ref } } - if !balanceBackupMode { - reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) - recalculationHistory, err = m.gatherHistoryAndReferralsInformation(ctx, userResults) - if err != nil { - log.Error(errors.New("tiers diff balances error"), err) - } - reqCancel() - reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) - allAdoptions, err = tokenomics.GetAllAdoptions[float64](ctx, m.db) - if err != nil { - log.Error(errors.New("can't get all adoptions"), err) - } - reqCancel() - } + shouldSynchronizeBalance := shouldSynchronizeBalanceFunc(uint64(batchNumber)) for _, usr := range userResults { if usr.UserID == "" { continue } - backupedUsr, backupExists := backupedUsers[usr.ID] - if balanceBackupMode { - if backupExists { - diffT1ActiveValue := backupedUsr.ActiveT1Referrals - usr.ActiveT1Referrals - diffT2ActiveValue := backupedUsr.ActiveT2Referrals - usr.ActiveT2Referrals - if diffT1ActiveValue < 0 && diffT1ActiveValue*-1 > usr.ActiveT1Referrals { - diffT1ActiveValue = -usr.ActiveT1Referrals - } - if diffT2ActiveValue < 0 && diffT2ActiveValue*-1 > usr.ActiveT2Referrals { - diffT2ActiveValue = -usr.ActiveT2Referrals - } - if false { - t1ReferralsToIncrementActiveValue[usr.ID] += diffT1ActiveValue - t2ReferralsToIncrementActiveValue[usr.ID] += diffT2ActiveValue - - usr.BalanceT1 = backupedUsr.BalanceT1 - usr.BalanceT2 = backupedUsr.BalanceT2 - - usr.SlashingRateT1 = backupedUsr.SlashingRateT1 - usr.SlashingRateT2 = backupedUsr.SlashingRateT2 - - backupedUsr.BalancesBackupUsedAt = time.Now() - backupUsersUpdated = append(backupUsersUpdated, backupedUsr) - } - } - } else { - if recalculatedUsr := m.recalculateUser(usr, allAdoptions, recalculationHistory); recalculatedUsr != nil { - diffT1ActiveValue := recalculationHistory.T1ActiveCounts[usr.UserID] - usr.ActiveT1Referrals - diffT2ActiveValue := recalculationHistory.T2ActiveCounts[usr.UserID] - usr.ActiveT2Referrals - - oldBalanceT1 := usr.BalanceT1 - oldBalanceT2 := usr.BalanceT2 - - if diffT1ActiveValue < 0 && diffT1ActiveValue*-1 > usr.ActiveT1Referrals { - diffT1ActiveValue = -usr.ActiveT1Referrals - } - if diffT2ActiveValue < 0 && diffT2ActiveValue*-1 > usr.ActiveT2Referrals { - diffT2ActiveValue = -usr.ActiveT2Referrals - } - - oldSlashingT1Rate := usr.SlashingRateT1 - oldSlashingT2Rate := usr.SlashingRateT2 - - if false { - t1ReferralsToIncrementActiveValue[usr.ID] += diffT1ActiveValue - t2ReferralsToIncrementActiveValue[usr.ID] += diffT2ActiveValue - - usr.BalanceT1 = recalculatedUsr.BalanceT1 - usr.BalanceT2 = recalculatedUsr.BalanceT2 - - usr.SlashingRateT1 = recalculatedUsr.SlashingRateT1 - usr.SlashingRateT2 = recalculatedUsr.SlashingRateT2 - } - if !backupExists { - metrics.AffectedUsers += 1 - if recalculatedUsr.BalanceT1-oldBalanceT1 >= 0 { - metrics.T1BalancePositive += recalculatedUsr.BalanceT1 - oldBalanceT1 - } else { - metrics.T1BalanceNegative += recalculatedUsr.BalanceT1 - oldBalanceT1 - } - if recalculatedUsr.BalanceT2-oldBalanceT2 >= 0 { - metrics.T2BalancePositive += recalculatedUsr.BalanceT2 - oldBalanceT2 - } else { - metrics.T2BalanceNegative += recalculatedUsr.BalanceT2 - oldBalanceT2 - } - if diffT1ActiveValue < 0 { - metrics.T1ActiveCountsNegative += int64(diffT1ActiveValue) - } else { - metrics.T1ActiveCountsPositive += int64(diffT1ActiveValue) - } - if diffT2ActiveValue < 0 { - metrics.T2ActiveCountsNegative += int64(diffT2ActiveValue) - } else { - metrics.T2ActiveCountsPositive += int64(diffT2ActiveValue) - } - - backupUsersUpdated = append(backupUsersUpdated, &backupUserUpdated{ - DeserializedBackupUsersKey: model.DeserializedBackupUsersKey{ID: usr.ID}, - UserIDField: usr.UserIDField, - BalanceT1Field: model.BalanceT1Field{BalanceT1: oldBalanceT1}, - BalanceT2Field: model.BalanceT2Field{BalanceT2: oldBalanceT2}, - SlashingRateT1Field: model.SlashingRateT1Field{SlashingRateT1: oldSlashingT1Rate}, - SlashingRateT2Field: model.SlashingRateT2Field{SlashingRateT2: oldSlashingT2Rate}, - ActiveT1ReferralsField: model.ActiveT1ReferralsField{ActiveT1Referrals: usr.ActiveT1Referrals}, - ActiveT2ReferralsField: model.ActiveT2ReferralsField{ActiveT2Referrals: usr.ActiveT2Referrals}, - FirstRecalculatedBalanceT1Field: model.FirstRecalculatedBalanceT1Field{FirstRecalculatedBalanceT1: recalculatedUsr.BalanceT1}, - FirstRecalculatedBalanceT2Field: model.FirstRecalculatedBalanceT2Field{FirstRecalculatedBalanceT2: recalculatedUsr.BalanceT2}, - FirstRecalculatedSlashingRateT1Field: model.FirstRecalculatedSlashingRateT1Field{FirstRecalculatedSlashingRateT1: recalculatedUsr.SlashingRateT1}, - FirstRecalculatedSlashingRateT2Field: model.FirstRecalculatedSlashingRateT2Field{FirstRecalculatedSlashingRateT2: recalculatedUsr.SlashingRateT2}, - FirstRecalculatedActiveT1ReferralsField: model.FirstRecalculatedActiveT1ReferralsField{FirstRecalculatedActiveT1Referrals: usr.ActiveT1Referrals + diffT1ActiveValue}, - FirstRecalculatedActiveT2ReferralsField: model.FirstRecalculatedActiveT2ReferralsField{FirstRecalculatedActiveT2Referrals: usr.ActiveT2Referrals + diffT2ActiveValue}, - }) - } - } - } - var t0Ref, tMinus1Ref *referral if usr.IDT0 > 0 { t0Ref = t0Referrals[usr.IDT0] @@ -499,10 +304,8 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { updatedUser.ExtraBonusDaysClaimNotAvailable = 0 updatedUser.ExtraBonusLastClaimAvailableAt = nil } - if balanceBackupMode || !backupExists { - if userStoppedMining := didReferralJustStopMining(now, usr, t0Ref, tMinus1Ref); userStoppedMining != nil { - referralsThatStoppedMining = append(referralsThatStoppedMining, userStoppedMining) - } + if userStoppedMining := didReferralJustStopMining(now, usr, t0Ref, tMinus1Ref); userStoppedMining != nil { + referralsThatStoppedMining = append(referralsThatStoppedMining, userStoppedMining) } if dayOffStarted := didANewDayOffJustStart(now, usr); dayOffStarted != nil { msgs = append(msgs, dayOffStartedMessage(reqCtx, dayOffStarted)) @@ -627,11 +430,12 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { } var pipeliner redis.Pipeliner - if len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks)+len(backupUsersUpdated) > 0 { + if len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks) > 0 { pipeliner = m.db.TxPipeline() } else { pipeliner = m.db.Pipeline() } + before = time.Now() reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) if responses, err := pipeliner.Pipelined(reqCtx, func(pipeliner redis.Pipeliner) error { @@ -670,11 +474,6 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { return err } } - for _, value := range backupUsersUpdated { - if err := pipeliner.HSet(reqCtx, value.Key(), storage.SerializeValue(value)...).Err(); err != nil { - return err - } - } if len(userGlobalRanks) > 0 { if err := pipeliner.ZAdd(reqCtx, "top_miners", userGlobalRanks...).Err(); err != nil { return err diff --git a/miner/recalculate_balance.go b/miner/recalculate_balance.go deleted file mode 100644 index dddfe48c..00000000 --- a/miner/recalculate_balance.go +++ /dev/null @@ -1,655 +0,0 @@ -// SPDX-License-Identifier: ice License 1.0 - -package miner - -import ( - "context" - "sort" - stdlibtime "time" - - "github.com/pkg/errors" - "github.com/redis/go-redis/v9" - - "github.com/ice-blockchain/eskimo/users" - "github.com/ice-blockchain/freezer/model" - "github.com/ice-blockchain/freezer/tokenomics" - storagePG "github.com/ice-blockchain/wintr/connectors/storage/v2" - "github.com/ice-blockchain/wintr/connectors/storage/v3" - "github.com/ice-blockchain/wintr/log" - "github.com/ice-blockchain/wintr/time" -) - -const ( - maxLimit int64 = 10000 -) - -type ( - pgUser struct { - Active *users.NotExpired - ID, ReferredBy string - ReferralType string - } - pgUserCreated struct { - ID string - CreatedAt *time.Time - } - - splittedAdoptionByRange struct { - TimePoint *time.Time - BaseMiningRate float64 - } - - historyRangeTime struct { - MiningSessionSoloStartedAt *time.Time - MiningSessionSoloEndedAt *time.Time - MiningSessionSoloLastStartedAt *time.Time - MiningSessionSoloPreviouslyEndedAt *time.Time - CreatedAt *time.Time - ResurrectSoloUsedAt *time.Time - SlashingRateSolo float64 - BalanceSolo float64 - BalanceT1Pending float64 - BalanceT1PendingApplied float64 - BalanceT2Pending float64 - BalanceT2PendingApplied float64 - } - - historyData struct { - NeedToBeRecalculatedUsers map[string]struct{} - HistoryTimeRanges map[string][]*historyRangeTime - T1Referrals, T2Referrals map[string][]string - T1ActiveCounts, T2ActiveCounts map[string]int32 - } - - balanceRecalculationMetrics struct { - StartedAt *time.Time - EndedAt *time.Time - T1BalancePositive float64 - T1BalanceNegative float64 - T2BalancePositive float64 - T2BalanceNegative float64 - T1ActiveCountsPositive int64 - T1ActiveCountsNegative int64 - T2ActiveCountsPositive int64 - T2ActiveCountsNegative int64 - IterationsNum int64 - AffectedUsers int64 - Worker int64 - } -) - -func (m *miner) getUsers(ctx context.Context, users []*user) (map[string]*pgUserCreated, error) { - var ( - userIDs []string - offset int64 = 0 - result = make(map[string]*pgUserCreated, len(users)) - ) - for _, val := range users { - userIDs = append(userIDs, val.UserID) - } - for { - sql := `SELECT - id, - created_at - FROM users - WHERE id = ANY($1) - LIMIT $2 OFFSET $3` - rows, err := storagePG.Select[pgUserCreated](ctx, m.dbPG, sql, userIDs, maxLimit, offset) - if err != nil { - return nil, errors.Wrapf(err, "can't get users from pg for: %#v", userIDs) - } - if len(rows) == 0 { - break - } - offset += maxLimit - for _, row := range rows { - result[row.ID] = row - } - } - - return result, nil -} - -func (m *miner) collectTiers(ctx context.Context, needToBeRecalculatedUsers map[string]struct{}) ( - t1Referrals, t2Referrals map[string][]string, t1ActiveCounts, t2ActiveCounts map[string]int32, err error, -) { - var ( - userIDs = make([]string, 0, len(needToBeRecalculatedUsers)) - offset int64 = 0 - now = time.Now() - ) - for key := range needToBeRecalculatedUsers { - userIDs = append(userIDs, key) - } - t1ActiveCounts, t2ActiveCounts = make(map[string]int32, len(needToBeRecalculatedUsers)), make(map[string]int32, len(needToBeRecalculatedUsers)) - t1Referrals, t2Referrals = make(map[string][]string), make(map[string][]string) - for { - sql := `SELECT * FROM( - SELECT - id, - referred_by, - 'T1' AS referral_type, - (CASE - WHEN COALESCE(last_mining_ended_at, to_timestamp(1)) > $1 - THEN COALESCE(last_mining_ended_at, to_timestamp(1)) - ELSE NULL - END) AS active - FROM users - WHERE referred_by = ANY($2) - AND referred_by != id - AND username != id - UNION ALL - SELECT - t2.id AS id, - t0.id AS referred_by, - 'T2' AS referral_type, - (CASE - WHEN COALESCE(t2.last_mining_ended_at, to_timestamp(1)) > $1 - THEN COALESCE(t2.last_mining_ended_at, to_timestamp(1)) - ELSE NULL - END) AS active - FROM users t0 - JOIN users t1 - ON t1.referred_by = t0.id - JOIN users t2 - ON t2.referred_by = t1.id - WHERE t0.id = ANY($2) - AND t2.referred_by != t2.id - AND t2.username != t2.id - ) X - LIMIT $3 OFFSET $4` - rows, err := storagePG.Select[pgUser](ctx, m.dbPG, sql, now.Time, userIDs, maxLimit, offset) - if err != nil { - return nil, nil, nil, nil, errors.Wrap(err, "can't get referrals from pg for showing actual data") - } - if len(rows) == 0 { - break - } - offset += maxLimit - for _, row := range rows { - if row.ReferredBy != "bogus" && row.ReferredBy != "icenetwork" && row.ID != "bogus" && row.ID != "icenetwork" { - if row.ReferralType == "T1" { - t1Referrals[row.ReferredBy] = append(t1Referrals[row.ReferredBy], row.ID) - if row.Active != nil && *row.Active { - t1ActiveCounts[row.ReferredBy]++ - } - } else if row.ReferralType == "T2" { - t2Referrals[row.ReferredBy] = append(t2Referrals[row.ReferredBy], row.ID) - if row.Active != nil && *row.Active { - t2ActiveCounts[row.ReferredBy]++ - } - } else { - log.Panic("wrong tier type") - } - } - } - } - - return t1Referrals, t2Referrals, t1ActiveCounts, t2ActiveCounts, nil -} - -func splitByAdoptionTimeRanges(adoptions []*tokenomics.Adoption[float64], startedAt, endedAt *time.Time) []splittedAdoptionByRange { - var result []splittedAdoptionByRange - - currentMBR := adoptions[0].BaseMiningRate - lastAchievedAt := adoptions[0].AchievedAt - currentAchievedAtIdx := 0 - - for idx, adptn := range adoptions { - if adptn.AchievedAt.IsNil() { - continue - } - if adptn.AchievedAt.Before(*startedAt.Time) { - currentMBR = adptn.BaseMiningRate - } - if (adptn.AchievedAt.After(*startedAt.Time) || adptn.AchievedAt.Equal(*startedAt.Time)) && - adptn.AchievedAt.Before(*endedAt.Time) { - result = append(result, splittedAdoptionByRange{ - TimePoint: adptn.AchievedAt, - BaseMiningRate: adptn.BaseMiningRate, - }) - } - if adptn.AchievedAt.After(*lastAchievedAt.Time) { - currentAchievedAtIdx = idx - lastAchievedAt = adptn.AchievedAt - } - } - result = append(result, - splittedAdoptionByRange{ - TimePoint: startedAt, - BaseMiningRate: currentMBR, - }, - ) - if endedAt.After(*adoptions[currentAchievedAtIdx].AchievedAt.Time) { - result = append(result, - splittedAdoptionByRange{ - TimePoint: endedAt, - BaseMiningRate: adoptions[currentAchievedAtIdx].BaseMiningRate, - }) - } else { - result = append(result, - splittedAdoptionByRange{ - TimePoint: endedAt, - BaseMiningRate: currentMBR, - }) - } - sort.Slice(result, func(i, j int) bool { - return result[i].TimePoint.Before(*result[j].TimePoint.Time) - }) - - return result -} - -func calculateTimeBounds(refTimeRange, usrRange *historyRangeTime) (*time.Time, *time.Time) { - if refTimeRange.MiningSessionSoloStartedAt.After(*usrRange.MiningSessionSoloEndedAt.Time) || refTimeRange.MiningSessionSoloEndedAt.Before(*usrRange.MiningSessionSoloStartedAt.Time) || refTimeRange.SlashingRateSolo > 0 { - return nil, nil - } - var startedAt, endedAt *time.Time - if refTimeRange.MiningSessionSoloStartedAt.After(*usrRange.MiningSessionSoloStartedAt.Time) || refTimeRange.MiningSessionSoloStartedAt.Equal(*usrRange.MiningSessionSoloStartedAt.Time) { - startedAt = refTimeRange.MiningSessionSoloStartedAt - } else { - startedAt = usrRange.MiningSessionSoloStartedAt - } - if refTimeRange.MiningSessionSoloEndedAt.Before(*usrRange.MiningSessionSoloEndedAt.Time) || refTimeRange.MiningSessionSoloEndedAt.Equal(*usrRange.MiningSessionSoloEndedAt.Time) { - endedAt = refTimeRange.MiningSessionSoloEndedAt - } else { - endedAt = usrRange.MiningSessionSoloEndedAt - } - - return startedAt, endedAt -} - -func initializeEmptyUser(updatedUser, usr *user) *user { - var newUser user - newUser.ID = usr.ID - newUser.UserID = usr.UserID - newUser.IDT0 = usr.IDT0 - newUser.IDTMinus1 = usr.IDTMinus1 - newUser.BalanceLastUpdatedAt = nil - - return &newUser -} - -func getInternalIDs(ctx context.Context, db storage.DB, keys ...string) ([]string, error) { - if cmdResults, err := db.Pipelined(ctx, func(pipeliner redis.Pipeliner) error { - if err := pipeliner.MGet(ctx, keys...).Err(); err != nil { - return err - } - - return nil - }); err != nil { - return nil, err - } else { - results := make([]string, 0, len(cmdResults)) - for _, cmdResult := range cmdResults { - sliceResult := cmdResult.(*redis.SliceCmd) - for _, val := range sliceResult.Val() { - if val == nil { - continue - } - results = append(results, val.(string)) - } - } - - return results, nil - } -} - -func (m *miner) gatherHistoryAndReferralsInformation(ctx context.Context, users []*user) (history *historyData, err error) { - if len(users) == 0 { - return nil, nil - } - var ( - needToBeRecalculatedUsers = make(map[string]struct{}, len(users)) - historyTimeRanges = make(map[string][]*historyRangeTime) - ) - usrs, err := m.getUsers(ctx, users) - if err != nil { - return nil, errors.Wrapf(err, "can't get CreatedAt information for users:%#v", usrs) - } - for _, usr := range users { - if usr.UserID == "" || usr.Username == "" { - continue - } - if _, ok := usrs[usr.UserID]; ok { - if usrs[usr.UserID].CreatedAt == nil || usrs[usr.UserID].CreatedAt.After(*m.recalculationBalanceStartDate.Time) { - continue - } - } - needToBeRecalculatedUsers[usr.UserID] = struct{}{} - } - if len(needToBeRecalculatedUsers) == 0 { - return nil, nil - } - t1Referrals, t2Referrals, t1ActiveCounts, t2ActiveCounts, err := m.collectTiers(ctx, needToBeRecalculatedUsers) - if err != nil { - return nil, errors.Wrap(err, "can't get active users for users") - } - if len(t1Referrals) == 0 && len(t2Referrals) == 0 { - return nil, nil - } - userKeys := make([]string, 0, len(t1Referrals)+len(t2Referrals)+len(needToBeRecalculatedUsers)) - for _, values := range t1Referrals { - for _, val := range values { - userKeys = append(userKeys, model.SerializedUsersKey(val)) - } - } - for _, values := range t2Referrals { - for _, val := range values { - userKeys = append(userKeys, model.SerializedUsersKey(val)) - } - } - for key, _ := range needToBeRecalculatedUsers { - userKeys = append(userKeys, model.SerializedUsersKey(key)) - } - userResults, err := getInternalIDs(ctx, m.db, userKeys...) - if err != nil { - return nil, errors.Wrapf(err, "failed to get internal ids for:%v", userKeys) - } - offset := int64(0) - for { - historyInformation, err := m.dwhClient.GetAdjustUserInformation(ctx, userResults, maxLimit, offset) - if err != nil { - return nil, errors.Wrapf(err, "can't get adjust user information for ids:#%v", userResults) - } - if len(historyInformation) == 0 { - break - } - offset += maxLimit - for _, info := range historyInformation { - historyTimeRanges[info.UserID] = append(historyTimeRanges[info.UserID], &historyRangeTime{ - MiningSessionSoloPreviouslyEndedAt: info.MiningSessionSoloPreviouslyEndedAt, - MiningSessionSoloStartedAt: info.MiningSessionSoloStartedAt, - MiningSessionSoloEndedAt: info.MiningSessionSoloEndedAt, - ResurrectSoloUsedAt: info.ResurrectSoloUsedAt, - CreatedAt: info.CreatedAt, - SlashingRateSolo: info.SlashingRateSolo, - BalanceT1Pending: info.BalanceT1Pending, - BalanceT1PendingApplied: info.BalanceT1PendingApplied, - BalanceT2Pending: info.BalanceT2Pending, - BalanceT2PendingApplied: info.BalanceT2PendingApplied, - }) - } - } - if len(historyTimeRanges) == 0 { - return nil, nil - } - - return &historyData{ - NeedToBeRecalculatedUsers: needToBeRecalculatedUsers, - HistoryTimeRanges: historyTimeRanges, - T1Referrals: t1Referrals, - T2Referrals: t2Referrals, - T1ActiveCounts: t1ActiveCounts, - T2ActiveCounts: t2ActiveCounts, - }, nil -} - -func (m *miner) recalculateUser(usr *user, adoptions []*tokenomics.Adoption[float64], history *historyData) *user { - if history == nil || history.HistoryTimeRanges == nil || (history.T1Referrals == nil && history.T2Referrals == nil) || adoptions == nil { - return nil - } - if _, ok := history.NeedToBeRecalculatedUsers[usr.UserID]; !ok { - return nil - } - if _, ok := history.HistoryTimeRanges[usr.UserID]; ok { - var ( - isResurrected bool - slashingLastEndedAt *time.Time - lastMiningSessionSoloEndedAt *time.Time - previousUserStartedAt, previousUserEndedAt *time.Time - now = time.Now() - ) - clonedUser1 := *usr - updatedUser := &clonedUser1 - updatedUser.BalanceT1 = 0 - updatedUser.BalanceT2 = 0 - updatedUser.BalanceLastUpdatedAt = nil - - for _, usrRange := range history.HistoryTimeRanges[usr.UserID] { - if updatedUser == nil { - updatedUser = initializeEmptyUser(updatedUser, usr) - } - lastMiningSessionSoloEndedAt = usrRange.MiningSessionSoloEndedAt - - updatedUser.BalanceT1Pending = usrRange.BalanceT1Pending - updatedUser.BalanceT1PendingApplied = usrRange.BalanceT1PendingApplied - updatedUser.BalanceT2Pending = usrRange.BalanceT2Pending - updatedUser.BalanceT2PendingApplied = usrRange.BalanceT2PendingApplied - /****************************************************************************************************************************************************** - 1. Resurrection check & handling. - ******************************************************************************************************************************************************/ - if !usrRange.ResurrectSoloUsedAt.IsNil() && usrRange.ResurrectSoloUsedAt.Unix() > 0 && !isResurrected { - var resurrectDelta float64 - if timeSpent := usrRange.MiningSessionSoloStartedAt.Sub(*usrRange.MiningSessionSoloPreviouslyEndedAt.Time); cfg.Development { - resurrectDelta = timeSpent.Minutes() - } else { - resurrectDelta = timeSpent.Hours() - } - updatedUser.BalanceT1 += updatedUser.SlashingRateT1 * resurrectDelta - updatedUser.BalanceT2 += updatedUser.SlashingRateT2 * resurrectDelta - updatedUser.SlashingRateT1 = 0 - updatedUser.SlashingRateT2 = 0 - - isResurrected = true - } - /****************************************************************************************************************************************************** - 2. Slashing calculations. - ******************************************************************************************************************************************************/ - if usrRange.SlashingRateSolo > 0 { - if slashingLastEndedAt.IsNil() { - slashingLastEndedAt = usrRange.MiningSessionSoloEndedAt - } - updatedUser.BalanceLastUpdatedAt = slashingLastEndedAt - updatedUser.ResurrectSoloUsedAt = nil - updatedUser, _, _ = mine(0., usrRange.CreatedAt, updatedUser, nil, nil) - slashingLastEndedAt = usrRange.CreatedAt - - continue - } - if !slashingLastEndedAt.IsNil() && usrRange.MiningSessionSoloStartedAt.Sub(*slashingLastEndedAt.Time).Nanoseconds() > 0 { - updatedUser.BalanceLastUpdatedAt = slashingLastEndedAt - updatedUser.ResurrectSoloUsedAt = nil - now := usrRange.MiningSessionSoloStartedAt - updatedUser, _, _ = mine(0., now, updatedUser, nil, nil) - slashingLastEndedAt = nil - } - /****************************************************************************************************************************************************** - 3. Saving time range state for the next range for streaks case. - ******************************************************************************************************************************************************/ - if previousUserStartedAt != nil && previousUserStartedAt.Equal(*usrRange.MiningSessionSoloStartedAt.Time) && - previousUserEndedAt != nil && (usrRange.MiningSessionSoloEndedAt.After(*previousUserEndedAt.Time) || - usrRange.MiningSessionSoloEndedAt.Equal(*previousUserEndedAt.Time)) { - - previousUserStartedAt = usrRange.MiningSessionSoloStartedAt - - usrRange.MiningSessionSoloStartedAt = previousUserEndedAt - previousUserEndedAt = usrRange.MiningSessionSoloEndedAt - } else { - previousUserStartedAt = usrRange.MiningSessionSoloStartedAt - previousUserEndedAt = usrRange.MiningSessionSoloEndedAt - } - /****************************************************************************************************************************************************** - 4. T1 Balance calculation for the current user time range. - ******************************************************************************************************************************************************/ - if _, ok := history.T1Referrals[usr.UserID]; ok { - for _, refID := range history.T1Referrals[usr.UserID] { - if _, ok := history.HistoryTimeRanges[refID]; ok { - var previousT1MiningSessionStartedAt, previousT1MiningSessionEndedAt *time.Time - for _, timeRange := range history.HistoryTimeRanges[refID] { - if timeRange.SlashingRateSolo > 0 { - continue - } - if previousT1MiningSessionStartedAt != nil && previousT1MiningSessionStartedAt.Equal(*timeRange.MiningSessionSoloStartedAt.Time) && - previousT1MiningSessionEndedAt != nil && (timeRange.MiningSessionSoloEndedAt.After(*previousT1MiningSessionEndedAt.Time) || - timeRange.MiningSessionSoloEndedAt.Equal(*previousT1MiningSessionEndedAt.Time)) { - - previousT1MiningSessionStartedAt = timeRange.MiningSessionSoloStartedAt - timeRange.MiningSessionSoloStartedAt = previousT1MiningSessionEndedAt - previousT1MiningSessionEndedAt = timeRange.MiningSessionSoloEndedAt - } else { - previousT1MiningSessionStartedAt = timeRange.MiningSessionSoloStartedAt - previousT1MiningSessionEndedAt = timeRange.MiningSessionSoloEndedAt - } - startedAt, endedAt := calculateTimeBounds(timeRange, usrRange) - if startedAt == nil && endedAt == nil { - continue - } - - adoptionRanges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) - - var previousTimePoint *time.Time - for _, adoptionRange := range adoptionRanges { - if previousTimePoint == nil { - previousTimePoint = adoptionRange.TimePoint - - continue - } - if previousTimePoint.Equal(*adoptionRange.TimePoint.Time) { - continue - } - updatedUser.ActiveT1Referrals = 1 - updatedUser.ActiveT2Referrals = 0 - updatedUser.MiningSessionSoloStartedAt = previousTimePoint - updatedUser.MiningSessionSoloEndedAt = time.New(adoptionRange.TimePoint.Add(1 * stdlibtime.Nanosecond)) - updatedUser.BalanceLastUpdatedAt = nil - updatedUser.ResurrectSoloUsedAt = nil - now := adoptionRange.TimePoint - - updatedUser, _, _ = mine(adoptionRange.BaseMiningRate, now, updatedUser, nil, nil) - - previousTimePoint = adoptionRange.TimePoint - } - } - } - } - } - /****************************************************************************************************************************************************** - 5. T2 Balance calculation for the current user time range. - ******************************************************************************************************************************************************/ - if _, ok := history.T2Referrals[usr.UserID]; ok { - for _, refID := range history.T2Referrals[usr.UserID] { - if _, ok := history.HistoryTimeRanges[refID]; ok { - var previousT2MiningSessionStartedAt, previousT2MiningSessionEndedAt *time.Time - for _, timeRange := range history.HistoryTimeRanges[refID] { - if timeRange.SlashingRateSolo > 0 { - continue - } - if previousT2MiningSessionStartedAt != nil && previousT2MiningSessionStartedAt.Equal(*timeRange.MiningSessionSoloStartedAt.Time) && - previousT2MiningSessionEndedAt != nil && (timeRange.MiningSessionSoloEndedAt.After(*previousT2MiningSessionEndedAt.Time) || - timeRange.MiningSessionSoloEndedAt.Equal(*previousT2MiningSessionEndedAt.Time)) { - - previousT2MiningSessionStartedAt = timeRange.MiningSessionSoloStartedAt - timeRange.MiningSessionSoloStartedAt = previousT2MiningSessionEndedAt - previousT2MiningSessionEndedAt = timeRange.MiningSessionSoloEndedAt - } else { - previousT2MiningSessionEndedAt = timeRange.MiningSessionSoloEndedAt - previousT2MiningSessionStartedAt = timeRange.MiningSessionSoloStartedAt - } - startedAt, endedAt := calculateTimeBounds(timeRange, usrRange) - if startedAt == nil && endedAt == nil { - continue - } - - adoptionRanges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) - - var previousTimePoint *time.Time - for _, adoptionRange := range adoptionRanges { - if previousTimePoint == nil { - previousTimePoint = adoptionRange.TimePoint - - continue - } - if previousTimePoint.Equal(*adoptionRange.TimePoint.Time) { - continue - } - updatedUser.ActiveT1Referrals = 0 - updatedUser.ActiveT2Referrals = 1 - updatedUser.MiningSessionSoloPreviouslyEndedAt = usr.MiningSessionSoloPreviouslyEndedAt - updatedUser.MiningSessionSoloStartedAt = previousTimePoint - updatedUser.MiningSessionSoloEndedAt = time.New(adoptionRange.TimePoint.Add(1 * stdlibtime.Nanosecond)) - updatedUser.BalanceLastUpdatedAt = nil - updatedUser.ResurrectSoloUsedAt = nil - now := adoptionRange.TimePoint - - updatedUser, _, _ = mine(adoptionRange.BaseMiningRate, now, updatedUser, nil, nil) - - previousTimePoint = adoptionRange.TimePoint - } - } - } - } - } - } - if !lastMiningSessionSoloEndedAt.IsNil() { - if timeDiff := now.Sub(*lastMiningSessionSoloEndedAt.Time); cfg.Development { - if timeDiff >= 60*stdlibtime.Minute { - updatedUser = nil - } - } else { - if timeDiff >= 60*stdlibtime.Hour*24 { - updatedUser = nil - } - } - } - if updatedUser == nil { - updatedUser = initializeEmptyUser(updatedUser, usr) - } - - return updatedUser - } - - return nil -} - -func (m *miner) getBalanceRecalculationMetrics(ctx context.Context, workerNumber int64) (brm *balanceRecalculationMetrics, err error) { - sql := `SELECT * FROM balance_recalculation_metrics WHERE worker = $1` - res, err := storagePG.Get[balanceRecalculationMetrics](ctx, m.dbPG, sql, workerNumber) - if err != nil { - if err == storagePG.ErrNotFound { - return nil, nil - } - - return nil, errors.Wrapf(err, "failed to get balance recalculation metrics:%v", res) - } - - return res, nil -} - -func (m *miner) insertBalanceRecalculationMetrics(ctx context.Context, brm *balanceRecalculationMetrics) error { - sql := `INSERT INTO balance_recalculation_metrics( - worker, - started_at, - ended_at, - t1_balance_positive, - t1_balance_negative, - t2_balance_positive, - t2_balance_negative, - t1_active_counts_positive, - t1_active_counts_negative, - t2_active_counts_positive, - t2_active_counts_negative, - iterations_num, - affected_users - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)` - _, err := storagePG.Exec(ctx, m.dbPG, sql, brm.Worker, brm.StartedAt.Time, brm.EndedAt.Time, brm.T1BalancePositive, brm.T1BalanceNegative, brm.T2BalancePositive, brm.T2BalanceNegative, - brm.T1ActiveCountsPositive, brm.T1ActiveCountsNegative, brm.T2ActiveCountsPositive, brm.T2ActiveCountsNegative, brm.IterationsNum, brm.AffectedUsers) - - return errors.Wrapf(err, "failed to insert metrics for worker:%v, params:%#v", brm.Worker, brm) -} - -func (b *balanceRecalculationMetrics) reset() { - b.EndedAt = nil - b.AffectedUsers = 0 - b.IterationsNum = 0 - b.T1BalancePositive = 0 - b.T1BalanceNegative = 0 - b.T2BalancePositive = 0 - b.T2BalanceNegative = 0 - b.T1ActiveCountsPositive = 0 - b.T1ActiveCountsNegative = 0 - b.T2ActiveCountsPositive = 0 - b.T2ActiveCountsNegative = 0 - b.StartedAt = time.Now() -} diff --git a/model/model.go b/model/model.go index 07e18999..7d0f2a8f 100644 --- a/model/model.go +++ b/model/model.go @@ -96,9 +96,6 @@ type ( ExtraBonusLastClaimAvailableAtField struct { ExtraBonusLastClaimAvailableAt *time.Time `redis:"extra_bonus_last_claim_available_at,omitempty"` } - BalancesBackupUsedAtField struct { - BalancesBackupUsedAt *time.Time `redis:"balance_backup_used_at,omitempty"` - } UserIDField struct { UserID string `redis:"user_id,omitempty"` } @@ -159,12 +156,6 @@ type ( BalanceT2Field struct { BalanceT2 float64 `redis:"balance_t2"` } - FirstRecalculatedBalanceT1Field struct { - FirstRecalculatedBalanceT1 float64 `redis:"first_recalculated_balance_t1"` - } - FirstRecalculatedBalanceT2Field struct { - FirstRecalculatedBalanceT2 float64 `redis:"first_recalculated_balance_t2"` - } BalanceForT0Field struct { BalanceForT0 float64 `redis:"balance_for_t0"` } @@ -183,12 +174,6 @@ type ( SlashingRateT2Field struct { SlashingRateT2 float64 `redis:"slashing_rate_t2"` } - FirstRecalculatedSlashingRateT1Field struct { - FirstRecalculatedSlashingRateT1 float64 `redis:"first_recalculated_slashing_rate_t1"` - } - FirstRecalculatedSlashingRateT2Field struct { - FirstRecalculatedSlashingRateT2 float64 `redis:"first_recalculated_slashing_rate_t2"` - } SlashingRateForT0Field struct { SlashingRateForT0 float64 `redis:"slashing_rate_for_t0"` } @@ -225,12 +210,6 @@ type ( ActiveT2ReferralsField struct { ActiveT2Referrals int32 `redis:"active_t2_referrals,omitempty"` } - FirstRecalculatedActiveT1ReferralsField struct { - FirstRecalculatedActiveT1Referrals int32 `redis:"first_recalculated_active_t1_referrals,omitempty"` - } - FirstRecalculatedActiveT2ReferralsField struct { - FirstRecalculatedActiveT2Referrals int32 `redis:"first_recalculated_active_t2_referrals,omitempty"` - } NewsSeenField struct { NewsSeen uint16 `redis:"news_seen"` } @@ -246,9 +225,6 @@ type ( HideRankingField struct { HideRanking bool `redis:"hide_ranking"` } - DeserializedBackupUsersKey struct { - ID int64 `redis:"-"` - } ) func (k *DeserializedUsersKey) Key() string { @@ -289,42 +265,3 @@ func SerializedUsersKey(val any) string { panic(fmt.Sprintf("%#v cannot be used as users key", val)) } } - -func (k *DeserializedBackupUsersKey) Key() string { - if k == nil || k.ID == 0 { - return "" - } - - return SerializedBackupUsersKey(k.ID) -} - -func (k *DeserializedBackupUsersKey) SetKey(val string) { - if val == "" || val == "backup:" { - return - } - if val[0] == 'b' { - val = val[7:] - } - var err error - k.ID, err = strconv.ParseInt(val, 10, 64) - log.Panic(err) -} - -func SerializedBackupUsersKey(val any) string { - switch typedVal := val.(type) { - case string: - if typedVal == "" { - return "" - } - - return "backup:" + typedVal - case int64: - if typedVal == 0 { - return "" - } - - return "backup:" + strconv.FormatInt(typedVal, 10) - default: - panic(fmt.Sprintf("%#v cannot be used as backup key", val)) - } -} diff --git a/tokenomics/adoption.go b/tokenomics/adoption.go index 5e2e47e9..d9a808bc 100644 --- a/tokenomics/adoption.go +++ b/tokenomics/adoption.go @@ -30,7 +30,7 @@ func (r *repository) GetAdoptionSummary(ctx context.Context) (as *AdoptionSummar if as.TotalActiveUsers, err = r.db.Get(ctx, r.totalActiveUsersKey(*time.Now().Time)).Uint64(); err != nil && !errors.Is(err, redis.Nil) { return nil, errors.Wrap(err, "failed to get current totalActiveUsers") } - if as.Milestones, err = GetAllAdoptions[string](ctx, r.db); err != nil { + if as.Milestones, err = getAllAdoptions[string](ctx, r.db); err != nil { return nil, errors.Wrap(err, "failed to get all adoption milestones") } @@ -213,7 +213,7 @@ func getAdoption(ctx context.Context, db storage.DB, milestone uint64) (*Adoptio return resp[0], nil } -func GetAllAdoptions[DENOM ~string | ~float64](ctx context.Context, db storage.DB) ([]*Adoption[DENOM], error) { +func getAllAdoptions[DENOM ~string | ~float64](ctx context.Context, db storage.DB) ([]*Adoption[DENOM], error) { const max = 20 // We try to get 20 just to be sure we get all of them. We're never going to have more than 20 milestones. keys := make([]string, max) for ix := uint64(1); ix <= max; ix++ { diff --git a/tokenomics/mining_sessions.go b/tokenomics/mining_sessions.go index f814457f..ef19835b 100644 --- a/tokenomics/mining_sessions.go +++ b/tokenomics/mining_sessions.go @@ -251,15 +251,6 @@ func (s *miningSessionsTableSource) Process(ctx context.Context, msg *messagebro ).ErrorOrNil() } -func mustGetBalancesBackupMode(ctx context.Context, db storage.DB) (result bool, err error) { - balancesBackupModeString, err := db.Get(ctx, "balances_backup_mode").Result() - if err != nil && errors.Is(err, redis.Nil) { - err = nil - } - - return balancesBackupModeString == "true", err -} - //nolint:funlen,revive,gocognit // . func (s *miningSessionsTableSource) incrementActiveReferralCountForT0AndTMinus1(ctx context.Context, ms *MiningSession) (err error) { if ctx.Err() != nil || !ms.LastNaturalMiningStartedAt.Equal(*ms.StartedAt.Time) { @@ -287,20 +278,6 @@ func (s *miningSessionsTableSource) incrementActiveReferralCountForT0AndTMinus1( if err != nil { return errors.Wrapf(err, "failed to getOrInitInternalID for userID:%v", *ms.UserID) } - backupUsr, err := storage.Get[struct { - model.DeserializedBackupUsersKey - model.UserIDField - }](ctx, s.db, model.SerializedBackupUsersKey(id)) - if err != nil { - return errors.Wrapf(err, "failed to get backupUser for id:%v, userID:%v", id, *ms.UserID) - } - balanceBackupMode, err := mustGetBalancesBackupMode(ctx, s.db) - if err != nil { - return errors.Wrapf(err, "failed to get backup flag for id:%v", id) - } - if !balanceBackupMode && backupUsr != nil { - return nil - } referees, err := storage.Get[struct { model.UserIDField model.DeserializedUsersKey diff --git a/tokenomics/users.go b/tokenomics/users.go index 08f2b395..ebc75461 100644 --- a/tokenomics/users.go +++ b/tokenomics/users.go @@ -172,9 +172,7 @@ func (s *usersTableSource) replaceUser(ctx context.Context, usr *users.User) err model.BlockchainAccountAddressField model.DeserializedUsersKey model.IDT0Field - model.IDTMinus1Field model.HideRankingField - model.BalanceForTMinus1Field } ) dbUser, err := storage.Get[user](ctx, s.db, model.SerializedUsersKey(internalID)) @@ -202,12 +200,12 @@ func (s *usersTableSource) replaceUser(ctx context.Context, usr *users.User) err return multierror.Append( //nolint:wrapcheck // Not Needed. errors.Wrapf(err, "failed to replace user:%#v", usr), - errors.Wrapf(s.updateReferredBy(ctx, internalID, dbUser[0].IDT0, dbUser[0].IDTMinus1, usr.ID, usr.ReferredBy, dbUser[0].BalanceForTMinus1), "failed to updateReferredBy for user:%#v", usr), + errors.Wrapf(s.updateReferredBy(ctx, internalID, dbUser[0].IDT0, usr.ID, usr.ReferredBy), "failed to updateReferredBy for user:%#v", usr), errors.Wrapf(s.updateUsernameKeywords(ctx, internalID, dbUser[0].Username, usr.Username), "failed to updateUsernameKeywords for oldUser:%#v, user:%#v", dbUser, usr), //nolint:lll // . ).ErrorOrNil() } -func (s *usersTableSource) updateReferredBy(ctx context.Context, id, oldIDT0, oldTMinus1 int64, userID, referredBy string, balanceForTMinus1 float64) error { +func (s *usersTableSource) updateReferredBy(ctx context.Context, id, oldIDT0 int64, userID, referredBy string) error { if referredBy == userID || referredBy == "" || referredBy == "bogus" || @@ -241,41 +239,6 @@ func (s *usersTableSource) updateReferredBy(ctx context.Context, id, oldIDT0, ol return errors.Wrapf(err3, "failed to get users entry for tMinus1ID:%v", t0Referral[0].IDT0) } else if len(tMinus1Referral) == 1 { newPartialState.IDTMinus1 = -tMinus1Referral[0].ID - if balanceForTMinus1 > 0.0 { - results, err4 := s.db.TxPipelined(ctx, func(pipeliner redis.Pipeliner) error { - if oldTMinus1 < 0 { - oldTMinus1 *= -1 - } - if oldIdTMinus1Key := model.SerializedUsersKey(oldTMinus1); oldIdTMinus1Key != "" { - if err = pipeliner.HIncrByFloat(ctx, oldIdTMinus1Key, "balance_t2_pending", -balanceForTMinus1).Err(); err != nil { - return err - } - } - newTMinus1 := tMinus1Referral[0].ID - if newTMinus1 < 0 { - newTMinus1 *= -1 - } - if newIdTMinus1Key := model.SerializedUsersKey(newTMinus1); newIdTMinus1Key != "" { - if err = pipeliner.HIncrByFloat(ctx, newIdTMinus1Key, "balance_t2_pending", balanceForTMinus1).Err(); err != nil { - return err - } - } - - return nil - }) - if err4 != nil { - return errors.Wrapf(err4, "failed to move t2 balance from:%v to:%v", oldTMinus1, newPartialState.IDTMinus1) - } - errs := make([]error, 0, len(results)) - for _, result := range results { - if err = result.Err(); err != nil { - errs = append(errs, errors.Wrapf(err, "failed to run `%#v`", result.FullName())) - } - } - if errs := multierror.Append(nil, errs...); errs.ErrorOrNil() != nil { - return errors.Wrapf(errs.ErrorOrNil(), "failed to move t2 balances for id:%v,id:%v", userID, id) - } - } } } }