diff --git a/bookkeeper/storage/contract.go b/bookkeeper/storage/contract.go index 9b26e87..f38dcf7 100644 --- a/bookkeeper/storage/contract.go +++ b/bookkeeper/storage/contract.go @@ -24,7 +24,7 @@ type ( Ping(ctx context.Context) error Insert(ctx context.Context, columns *Columns, input InsertMetadata, usrs []*model.User) error SelectBalanceHistory(ctx context.Context, id int64, createdAts []stdlibtime.Time) ([]*BalanceHistory, error) - GetAdjustUserInformation(ctx context.Context, userIDs map[int64]struct{}) ([]*AdjustUserInfo, error) + GetAdjustUserInformation(ctx context.Context, userIDs map[int64]struct{}, limit, offset int64) ([]*AdjustUserInfo, error) } AdjustUserInfo struct { MiningSessionSoloStartedAt *time.Time diff --git a/bookkeeper/storage/storage.go b/bookkeeper/storage/storage.go index bd38b38..f56c056 100644 --- a/bookkeeper/storage/storage.go +++ b/bookkeeper/storage/storage.go @@ -443,7 +443,7 @@ func (db *db) SelectBalanceHistory(ctx context.Context, id int64, createdAts []s return res, nil } -func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs map[int64]struct{}) ([]*AdjustUserInfo, error) { +func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs map[int64]struct{}, limit, offset int64) ([]*AdjustUserInfo, error) { var ( id = make(proto.ColInt64, 0, len(userIDs)) miningSessionSoloStartedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDs)), Location: stdlibtime.UTC} @@ -463,6 +463,10 @@ func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs map[int64]st for key, _ := range userIDs { userIDArray = append(userIDArray, fmt.Sprint(key)) } + var offsetStr string + if offset > 0 { + offsetStr = fmt.Sprintf(", %v", offset) + } if err := db.pools[atomic.AddUint64(&db.currentIndex, 1)%uint64(len(db.pools))].Do(ctx, ch.Query{ Body: fmt.Sprintf(`SELECT id, mining_session_solo_started_at, @@ -479,7 +483,8 @@ func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs map[int64]st FROM %[1]v WHERE id IN [%[2]v] ORDER BY created_at ASC - `, tableName, strings.Join(userIDArray, ",")), + LIMIT %[3]v %[4]v + `, tableName, strings.Join(userIDArray, ","), limit, offsetStr), Result: append(make(proto.Results, 0, 12), proto.ResultColumn{Name: "id", Data: &id}, proto.ResultColumn{Name: "mining_session_solo_started_at", Data: &miningSessionSoloStartedAt}, diff --git a/miner/DDL.sql b/miner/DDL.sql deleted file mode 100644 index 08da3ce..0000000 --- a/miner/DDL.sql +++ /dev/null @@ -1,32 +0,0 @@ --- SPDX-License-Identifier: ice License 1.0 -CREATE TABLE IF NOT EXISTS users ( - created_at timestamp NOT NULL, - updated_at timestamp NOT NULL, - last_mining_started_at timestamp, - last_mining_ended_at timestamp, - last_ping_cooldown_ended_at timestamp, - hash_code bigint not null generated always as identity, - kyc_step_passed smallint NOT NULL DEFAULT 0, - kyc_step_blocked smallint NOT NULL DEFAULT 0, - random_referred_by BOOLEAN NOT NULL DEFAULT FALSE, - client_data text, - hidden_profile_elements text[], - phone_number text NOT NULL UNIQUE, - email text NOT NULL UNIQUE, - first_name text, - last_name text, - country text NOT NULL, - city text NOT NULL, - id text primary key, - username text NOT NULL UNIQUE, - profile_picture_name text NOT NULL, - referred_by text NOT NULL REFERENCES users(id), - phone_number_hash text NOT NULL UNIQUE, - agenda_contact_user_ids text[], - kyc_steps_last_updated_at timestamp[], - kyc_steps_created_at timestamp[], - mining_blockchain_account_address text NOT NULL UNIQUE, - blockchain_account_address text NOT NULL UNIQUE, - language text NOT NULL DEFAULT 'en', - lookup tsvector NOT NULL) - WITH (FILLFACTOR = 70); diff --git a/miner/adoption_range_test.go b/miner/adoption_range_test.go new file mode 100644 index 0000000..0e1e53c --- /dev/null +++ b/miner/adoption_range_test.go @@ -0,0 +1,150 @@ +// SPDX-License-Identifier: ice License 1.0 + +package miner + +import ( + "testing" + stdlibtime "time" + + "github.com/ice-blockchain/freezer/tokenomics" + "github.com/ice-blockchain/wintr/time" + "github.com/stretchr/testify/assert" +) + +func TestGetAdoptionsRange_1AdoptionPerRange(t *testing.T) { + var adoptions []*tokenomics.Adoption[float64] + adoptions = append(adoptions, &tokenomics.Adoption[float64]{ + AchievedAt: time.New(time.Now().Add(-58 * stdlibtime.Minute)), + BaseMiningRate: 16.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: time.New(time.Now().Add(-55 * stdlibtime.Minute)), + BaseMiningRate: 8.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Minute)), + BaseMiningRate: 4.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: time.New(time.Now().Add(-10 * stdlibtime.Minute)), + BaseMiningRate: 2.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: nil, + BaseMiningRate: 1.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: nil, + BaseMiningRate: 0.5, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: nil, + BaseMiningRate: 0.25, + Milestone: 1, + TotalActiveUsers: 1, + }) + startedAt := time.New(time.Now().Add(-1 * stdlibtime.Minute)) + endedAt := time.Now() + ranges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) + assert.Equal(t, 2., ranges[0].BaseMiningRate) + assert.Equal(t, 2., ranges[1].BaseMiningRate) +} + +func TestGetAdoptionsRange_2AdoptionsPerRange(t *testing.T) { + var adoptions []*tokenomics.Adoption[float64] + adoptions = append(adoptions, &tokenomics.Adoption[float64]{ + AchievedAt: time.New(time.Now().Add(-58 * stdlibtime.Minute)), + BaseMiningRate: 16.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: time.New(time.Now().Add(-55 * stdlibtime.Minute)), + BaseMiningRate: 8.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Minute)), + BaseMiningRate: 4.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: time.New(time.Now().Add(-10 * stdlibtime.Minute)), + BaseMiningRate: 2.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Second)), + BaseMiningRate: 1.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: nil, + BaseMiningRate: 0.5, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: nil, + BaseMiningRate: 0.25, + Milestone: 1, + TotalActiveUsers: 1, + }) + startedAt := time.New(time.Now().Add(-1 * stdlibtime.Minute)) + endedAt := time.Now() + ranges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) + assert.Equal(t, 2., ranges[0].BaseMiningRate) + assert.Equal(t, 1., ranges[1].BaseMiningRate) + assert.Equal(t, 1., ranges[2].BaseMiningRate) +} + +func TestGetAdoptionsRange_AdoptionDemotionPerRange(t *testing.T) { + var adoptions []*tokenomics.Adoption[float64] + adoptions = append(adoptions, &tokenomics.Adoption[float64]{ + AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Second)), + BaseMiningRate: 16.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: time.New(time.Now().Add(-55 * stdlibtime.Minute)), + BaseMiningRate: 8.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Minute)), + BaseMiningRate: 4.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: time.New(time.Now().Add(-1 * stdlibtime.Minute)), + BaseMiningRate: 2.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: nil, + BaseMiningRate: 1.0, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: nil, + BaseMiningRate: 0.5, + Milestone: 1, + TotalActiveUsers: 1, + }, &tokenomics.Adoption[float64]{ + AchievedAt: nil, + BaseMiningRate: 0.25, + Milestone: 1, + TotalActiveUsers: 1, + }) + startedAt := time.New(time.Now().Add(-2 * stdlibtime.Minute)) + endedAt := time.Now() + ranges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) + assert.Equal(t, 4., ranges[0].BaseMiningRate) + assert.Equal(t, 2., ranges[1].BaseMiningRate) + assert.Equal(t, 16., ranges[2].BaseMiningRate) + assert.Equal(t, 16., ranges[3].BaseMiningRate) +} diff --git a/miner/contract.go b/miner/contract.go index 9ee4c00..2b9c8f0 100644 --- a/miner/contract.go +++ b/miner/contract.go @@ -46,9 +46,6 @@ const ( // . var ( - //go:embed DDL.sql - ddl string - //nolint:gochecknoglobals // Singleton & global config mounted only during bootstrap. cfg config ) @@ -116,11 +113,6 @@ type ( model.BalanceT2Field } - recalculatedUser struct { - model.DeserializedRecalculatedUsersKey - model.RecalculatedTiersBalancesAtField - } - referral struct { model.MiningSessionSoloStartedAtField model.MiningSessionSoloEndedAtField @@ -146,6 +138,7 @@ type ( wg *sync.WaitGroup extraBonusStartDate *time.Time extraBonusIndicesDistribution map[uint16]map[uint16]uint16 + recalculationBalanceStartDate *time.Time } config struct { disableAdvancedTeam *atomic.Pointer[[]string] diff --git a/miner/metrics_test.go b/miner/metrics_test.go index e5edd8f..96dacc2 100644 --- a/miner/metrics_test.go +++ b/miner/metrics_test.go @@ -8,9 +8,6 @@ import ( stdlibtime "time" "github.com/stretchr/testify/assert" - - "github.com/ice-blockchain/freezer/tokenomics" - "github.com/ice-blockchain/wintr/time" ) type ( @@ -178,144 +175,3 @@ func slowTelemetry(workers int64) *telemetry { return tel } - -func TestGetAdoptionsRange_1AdoptionPerRange(t *testing.T) { - var adoptions []*tokenomics.Adoption[float64] - adoptions = append(adoptions, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-58 * stdlibtime.Minute)), - BaseMiningRate: 16.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-55 * stdlibtime.Minute)), - BaseMiningRate: 8.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Minute)), - BaseMiningRate: 4.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-10 * stdlibtime.Minute)), - BaseMiningRate: 2.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 1.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.5, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.25, - Milestone: 1, - TotalActiveUsers: 1, - }) - startedAt := time.New(time.Now().Add(-1 * stdlibtime.Minute)) - endedAt := time.Now() - ranges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) - for _, rng := range ranges { - fmt.Printf("rng.point: %v, rng.bmr: %v\n", rng.TimePoint, rng.BaseMiningRate) - } - assert.Equal(t, 2., ranges[0].BaseMiningRate) - assert.Equal(t, 2., ranges[1].BaseMiningRate) -} - -func TestGetAdoptionsRange_2AdoptionsPerRange(t *testing.T) { - var adoptions []*tokenomics.Adoption[float64] - adoptions = append(adoptions, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-58 * stdlibtime.Minute)), - BaseMiningRate: 16.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-55 * stdlibtime.Minute)), - BaseMiningRate: 8.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Minute)), - BaseMiningRate: 4.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-10 * stdlibtime.Minute)), - BaseMiningRate: 2.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Second)), - BaseMiningRate: 1.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.5, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.25, - Milestone: 1, - TotalActiveUsers: 1, - }) - startedAt := time.New(time.Now().Add(-1 * stdlibtime.Minute)) - endedAt := time.Now() - ranges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) - assert.Equal(t, 2., ranges[0].BaseMiningRate) - assert.Equal(t, 1., ranges[1].BaseMiningRate) - assert.Equal(t, 1., ranges[2].BaseMiningRate) -} - -func TestGetAdoptionsRange_AdoptionDemotionPerRange(t *testing.T) { - var adoptions []*tokenomics.Adoption[float64] - adoptions = append(adoptions, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Second)), - BaseMiningRate: 16.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-55 * stdlibtime.Minute)), - BaseMiningRate: 8.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-30 * stdlibtime.Minute)), - BaseMiningRate: 4.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: time.New(time.Now().Add(-1 * stdlibtime.Minute)), - BaseMiningRate: 2.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 1.0, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.5, - Milestone: 1, - TotalActiveUsers: 1, - }, &tokenomics.Adoption[float64]{ - AchievedAt: nil, - BaseMiningRate: 0.25, - Milestone: 1, - TotalActiveUsers: 1, - }) - startedAt := time.New(time.Now().Add(-2 * stdlibtime.Minute)) - endedAt := time.Now() - ranges := splitByAdoptionTimeRanges(adoptions, startedAt, endedAt) - assert.Equal(t, 4., ranges[0].BaseMiningRate) - assert.Equal(t, 2., ranges[1].BaseMiningRate) - assert.Equal(t, 16., ranges[2].BaseMiningRate) - assert.Equal(t, 16., ranges[3].BaseMiningRate) -} diff --git a/miner/miner.go b/miner/miner.go index 687bccd..dd254bb 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -39,7 +39,7 @@ func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client { mi := &miner{ mb: messagebroker.MustConnect(context.Background(), parentApplicationYamlKey), db: storage.MustConnect(context.Background(), parentApplicationYamlKey, int(cfg.Workers)), - dbPG: storagePG.MustConnect(ctx, ddl, applicationYamlKey), + dbPG: storagePG.MustConnect(ctx, "", applicationYamlKey), dwhClient: dwh.MustConnect(context.Background(), applicationYamlKey), wg: new(sync.WaitGroup), telemetry: new(telemetry).mustInit(cfg), @@ -49,6 +49,7 @@ func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client { mi.cancel = cancel mi.extraBonusStartDate = extrabonusnotifier.MustGetExtraBonusStartDate(ctx, mi.db) mi.extraBonusIndicesDistribution = extrabonusnotifier.MustGetExtraBonusIndicesDistribution(ctx, mi.db) + mi.recalculationBalanceStartDate = MustGetRecalculationBalancesStartDate(ctx, mi.db) for workerNumber := int64(0); workerNumber < cfg.Workers; workerNumber++ { go func(wn int64) { @@ -113,6 +114,29 @@ func (m *miner) checkDBHealth(ctx context.Context) error { return nil } +func MustGetRecalculationBalancesStartDate(ctx context.Context, db storage.DB) (recalculationBalancesStartDate *time.Time) { + recalculationBalancesStartDateString, err := db.Get(ctx, "recalculation_balances_start_date").Result() + if err != nil && errors.Is(err, redis.Nil) { + err = nil + } + log.Panic(errors.Wrap(err, "failed to get recalculation_balances_start_date")) + if recalculationBalancesStartDateString != "" { + recalculationBalancesStartDate = new(time.Time) + log.Panic(errors.Wrapf(recalculationBalancesStartDate.UnmarshalText([]byte(recalculationBalancesStartDateString)), "failed to parse recalculation_balances_start_date `%v`", recalculationBalancesStartDateString)) //nolint:lll // . + recalculationBalancesStartDate = time.New(recalculationBalancesStartDate.UTC()) + + return + } + recalculationBalancesStartDate = time.Now() + set, sErr := db.SetNX(ctx, "recalculation_balances_start_date", recalculationBalancesStartDate, 0).Result() + log.Panic(errors.Wrap(sErr, "failed to set recalculation_balances_start_date")) + if !set { + return MustGetRecalculationBalancesStartDate(ctx, db) + } + + return recalculationBalancesStartDate +} + func (m *miner) mine(ctx context.Context, workerNumber int64) { dwhClient := dwh.MustConnect(context.Background(), applicationYamlKey) defer func() { @@ -130,7 +154,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { currentAdoption = m.getAdoption(ctx, m.db, workerNumber) workers = cfg.Workers batchSize = cfg.BatchSize - userKeys, recalculatedUserKeys, userHistoryKeys, referralKeys = make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, 2*batchSize) + userKeys, userHistoryKeys, referralKeys = make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, 2*batchSize) userResults, referralResults = make([]*user, 0, batchSize), make([]*referral, 0, 2*batchSize) t0Referrals, tMinus1Referrals = make(map[int64]*referral, batchSize), make(map[int64]*referral, batchSize) t1ReferralsToIncrementActiveValue, t2ReferralsToIncrementActiveValue = make(map[int64]int32, batchSize), make(map[int64]int32, batchSize) @@ -140,11 +164,8 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { msgs = make([]*messagebroker.Message, 0, 3*batchSize) errs = make([]error, 0, 3*batchSize) updatedUsers = make([]*UpdatedUser, 0, batchSize) - userRecalculatedResults = make([]*recalculatedUser, 0, batchSize) - recalculatedUsersUpdated = make([]*recalculatedUser, 0, batchSize) extraBonusOnlyUpdatedUsers = make([]*extrabonusnotifier.UpdatedUser, 0, batchSize) referralsUpdated = make([]*referralUpdated, 0, batchSize) - usersBalanceTiersUpdated = make([]*userBalanceTiersUpdated, 0, batchSize) histories = make([]*model.User, 0, batchSize) userGlobalRanks = make([]redis.Z, 0, batchSize) historyColumns, historyInsertMetadata = dwh.InsertDDL(int(batchSize)) @@ -170,13 +191,10 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { if batchNumber == 0 || currentAdoption == nil { currentAdoption = m.getAdoption(ctx, m.db, workerNumber) } - userKeys, recalculatedUserKeys, userHistoryKeys, referralKeys = userKeys[:0], recalculatedUserKeys[:0], userHistoryKeys[:0], referralKeys[:0] + userKeys, userHistoryKeys, referralKeys = userKeys[:0], userHistoryKeys[:0], referralKeys[:0] userResults, referralResults = userResults[:0], referralResults[:0] - userRecalculatedResults = make([]*recalculatedUser, 0, batchSize) msgs, errs = msgs[:0], errs[:0] updatedUsers = updatedUsers[:0] - recalculatedUsersUpdated = recalculatedUsersUpdated[:0] - usersBalanceTiersUpdated = usersBalanceTiersUpdated[:0] extraBonusOnlyUpdatedUsers = extraBonusOnlyUpdatedUsers[:0] referralsUpdated = referralsUpdated[:0] histories = histories[:0] @@ -223,20 +241,6 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { if len(userKeys) > 0 { go m.telemetry.collectElapsed(2, *before.Time) } - if len(recalculatedUserKeys) == 0 { - for ix := batchNumber * batchSize; ix < (batchNumber+1)*batchSize; ix++ { - recalculatedUserKeys = append(recalculatedUserKeys, model.SerializedRecalculatedUsersKey((workers*ix)+workerNumber)) - } - } - reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline) - if err := storage.Bind[recalculatedUser](reqCtx, m.db, recalculatedUserKeys, &userRecalculatedResults); err != nil { - log.Panic(errors.Wrapf(err, "[miner] failed to get users for batchNumber:%v,workerNumber:%v", batchNumber, workerNumber)) - reqCancel() - now = time.Now() - - continue - } - reqCancel() /****************************************************************************************************************************************************** 2. Fetching T0 & T-1 referrals of the fetched users. @@ -292,7 +296,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { t0Referrals[ref.ID] = ref } } - recalculatedTiersBalancesUsers, err := m.showTiersDiffBalances(ctx, userResults, userRecalculatedResults, tMinus1Referrals, t0Referrals) + recalculatedTiersBalancesUsers, err := m.showTiersDiffBalances(ctx, userResults, tMinus1Referrals, t0Referrals) if err != nil { log.Error(errors.New("tiers diff balances error"), err) } @@ -301,35 +305,23 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { if usr.UserID == "" { continue } - for _, recalculatedUsr := range recalculatedTiersBalancesUsers { - if usr.ID == recalculatedUsr.ID { - // Uncomment to adjust t1/t2 balances and t1/t2 active counts. - /* - diffT1ActiveValue := recalculatedUsr.ActiveT1Referrals - usr.ActiveT1Referrals - diffT2ActiveValue := recalculatedUsr.ActiveT2Referrals - usr.ActiveT2Referrals - if diffT1ActiveValue < 0 && diffT1ActiveValue*-1 > usr.ActiveT1Referrals { - diffT1ActiveValue = -usr.ActiveT1Referrals - } - if diffT2ActiveValue < 0 && diffT2ActiveValue*-1 > usr.ActiveT2Referrals { - diffT2ActiveValue = -usr.ActiveT2Referrals - } - t1ReferralsToIncrementActiveValue[usr.ID] += diffT1ActiveValue - t2ReferralsToIncrementActiveValue[usr.ID] += diffT2ActiveValue - - usr.BalanceT1 = recalculatedUsr.BalanceT1 - usr.BalanceT2 = recalculatedUsr.BalanceT2 - - usr.SlashingRateT1 = recalculatedUsr.SlashingRateT1 - usr.SlashingRateT2 = recalculatedUsr.SlashingRateT2 - */ + if recalculatedUsr, ok := recalculatedTiersBalancesUsers[usr.ID]; ok { + diffT1ActiveValue := recalculatedUsr.ActiveT1Referrals - usr.ActiveT1Referrals + diffT2ActiveValue := recalculatedUsr.ActiveT2Referrals - usr.ActiveT2Referrals + if diffT1ActiveValue < 0 && diffT1ActiveValue*-1 > usr.ActiveT1Referrals { + diffT1ActiveValue = -usr.ActiveT1Referrals + } + if diffT2ActiveValue < 0 && diffT2ActiveValue*-1 > usr.ActiveT2Referrals { + diffT2ActiveValue = -usr.ActiveT2Referrals + } + t1ReferralsToIncrementActiveValue[usr.ID] += diffT1ActiveValue + t2ReferralsToIncrementActiveValue[usr.ID] += diffT2ActiveValue - recalculatedUsersUpdated = append(recalculatedUsersUpdated, &recalculatedUser{ - DeserializedRecalculatedUsersKey: model.DeserializedRecalculatedUsersKey{ID: usr.ID}, - RecalculatedTiersBalancesAtField: model.RecalculatedTiersBalancesAtField{RecalculatedTiersBalancesAt: time.Now()}, - }) + usr.BalanceT1 = recalculatedUsr.BalanceT1 + usr.BalanceT2 = recalculatedUsr.BalanceT2 - break - } + usr.SlashingRateT1 = recalculatedUsr.SlashingRateT1 + usr.SlashingRateT2 = recalculatedUsr.SlashingRateT2 } var t0Ref, tMinus1Ref *referral if usr.IDT0 > 0 { @@ -486,7 +478,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { } var pipeliner redis.Pipeliner - if len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks)+len(recalculatedUsersUpdated)+len(usersBalanceTiersUpdated) > 0 { + if len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks) > 0 { pipeliner = m.db.TxPipeline() } else { pipeliner = m.db.Pipeline() @@ -530,11 +522,6 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { return err } } - for _, value := range recalculatedUsersUpdated { - if err := pipeliner.HSet(reqCtx, value.Key(), storage.SerializeValue(value)...).Err(); err != nil { - return err - } - } if len(userGlobalRanks) > 0 { if err := pipeliner.ZAdd(reqCtx, "top_miners", userGlobalRanks...).Err(); err != nil { return err diff --git a/miner/recalculate_balance.go b/miner/recalculate_balance.go index 3c9df0f..3212e33 100644 --- a/miner/recalculate_balance.go +++ b/miner/recalculate_balance.go @@ -4,7 +4,6 @@ package miner import ( "context" - "fmt" "sort" stdlibtime "time" @@ -18,7 +17,7 @@ import ( ) const ( - maxLimit = 10000 + maxLimit int64 = 10000 ) var ( @@ -32,6 +31,10 @@ type ( ID, ReferredBy UserID ReferralType string } + pgUserCreated struct { + ID UserID + CreatedAt *time.Time + } splittedAdoptionByRange struct { TimePoint *time.Time @@ -54,13 +57,51 @@ type ( } ) +func (m *miner) getUsers(ctx context.Context, users []*user) (map[int64]*pgUserCreated, error) { + var ( + userIDs []string + offset int64 = 0 + result = make(map[int64]*pgUserCreated, len(users)) + ) + for _, val := range users { + userIDs = append(userIDs, val.UserID) + } + for { + sql := `SELECT + id, + created_at + FROM users + WHERE id = ANY($1) + LIMIT $2 OFFSET $3` + rows, err := storagePG.Select[pgUserCreated](ctx, m.dbPG, sql, userIDs, maxLimit, offset) + if err != nil { + return nil, errors.Wrap(err, "can't get users from pg") + } + if len(rows) == 0 { + break + } + offset += maxLimit + for _, row := range rows { + id, err := tokenomics.GetInternalID(ctx, m.db, string(row.ID)) + if err != nil { + log.Error(errWrongInternalID, row.ID) + + continue + } + result[id] = row + } + } + + return result, nil +} + func (m *miner) collectTiers(ctx context.Context, users []*user) (map[int64][]int64, map[int64][]int64, map[int64]uint64, map[int64]uint64, error) { var ( referredByIDs []string - now = time.Now() - t1ActiveCounts, t2ActiveCounts = make(map[int64]uint64), make(map[int64]uint64) - t1Referrals, t2Referrals = make(map[int64][]int64), make(map[int64][]int64) - offset = 0 + offset int64 = 0 + now = time.Now() + t1ActiveCounts, t2ActiveCounts = make(map[int64]uint64), make(map[int64]uint64) + t1Referrals, t2Referrals = make(map[int64][]int64), make(map[int64][]int64) ) for _, val := range users { referredByIDs = append(referredByIDs, val.UserID) @@ -102,7 +143,7 @@ func (m *miner) collectTiers(ctx context.Context, users []*user) (map[int64][]in LIMIT $3 OFFSET $4` rows, err := storagePG.Select[pgUser](ctx, m.dbPG, sql, now.Time, referredByIDs, maxLimit, offset) if err != nil { - return nil, nil, nil, nil, errors.Wrap(err, "can't get users from pg for showing actual data") + return nil, nil, nil, nil, errors.Wrap(err, "can't get referrals from pg for showing actual data") } if len(rows) == 0 { break @@ -224,27 +265,29 @@ func initializeEmptyUser(updatedUser, usr *user) *user { return &newUser } -func (m *miner) showTiersDiffBalances(ctx context.Context, users []*user, userRecalculated []*recalculatedUser, tMinus1Referrals map[int64]*referral, t0Referrals map[int64]*referral) ([]*user, error) { +func (m *miner) showTiersDiffBalances(ctx context.Context, users []*user, tMinus1Referrals map[int64]*referral, t0Referrals map[int64]*referral) (map[int64]*user, error) { var ( needToBeRecalculatedUsers []*user actualBalancesT1 = make(map[int64]float64) actualBalancesT2 = make(map[int64]float64) ) -outer: + usrs, err := m.getUsers(ctx, users) + if err != nil { + return nil, errors.Wrapf(err, "can't get CreatedAt information for users:%#v", usrs) + } for _, usr := range users { if usr.UserID == "" { continue } - for _, recalculatedUser := range userRecalculated { - if recalculatedUser.ID == usr.ID && !recalculatedUser.RecalculatedTiersBalancesAt.IsNil() { - continue outer + if _, ok := usrs[usr.ID]; ok { + if usrs[usr.ID].CreatedAt == nil || usrs[usr.ID].CreatedAt.After(*m.recalculationBalanceStartDate.Time) { + continue } } needToBeRecalculatedUsers = append(needToBeRecalculatedUsers, usr) actualBalancesT1[usr.ID] = usr.BalanceT1 actualBalancesT2[usr.ID] = usr.BalanceT2 } - t1Referrals, t2Referrals, t1ActiveCounts, t2ActiveCounts, err := m.collectTiers(ctx, needToBeRecalculatedUsers) if err != nil { return nil, errors.Wrap(err, "can't get active users for users") @@ -262,7 +305,7 @@ outer: now = time.Now() historyTimeRanges = make(map[int64][]*historyRangeTime) usrIDs = make(map[int64]struct{}, len(t1Referrals)+len(t2Referrals)+len(needToBeRecalculatedUsers)) - updatedUsers []*user + updatedUsers = make(map[int64]*user, len(users)) ) for _, values := range t1Referrals { for _, val := range values { @@ -282,27 +325,34 @@ outer: return nil, nil } - historyInformation, err := m.dwhClient.GetAdjustUserInformation(ctx, usrIDs) - if err != nil { - return nil, errors.Wrapf(err, "can't get adjust user information for ids:#%v", usrIDs) - } adoptions, err := tokenomics.GetAllAdoptions[float64](ctx, m.db) if err != nil { return nil, errors.Wrapf(err, "can't get adoptions for users:%#v", needToBeRecalculatedUsers) } - for _, info := range historyInformation { - historyTimeRanges[info.ID] = append(historyTimeRanges[info.ID], &historyRangeTime{ - MiningSessionSoloPreviouslyEndedAt: info.MiningSessionSoloPreviouslyEndedAt, - MiningSessionSoloStartedAt: info.MiningSessionSoloStartedAt, - MiningSessionSoloEndedAt: info.MiningSessionSoloEndedAt, - ResurrectSoloUsedAt: info.ResurrectSoloUsedAt, - CreatedAt: info.CreatedAt, - SlashingRateSolo: info.SlashingRateSolo, - BalanceT1Pending: info.BalanceT1Pending, - BalanceT1PendingApplied: info.BalanceT1PendingApplied, - BalanceT2Pending: info.BalanceT2Pending, - BalanceT2PendingApplied: info.BalanceT2PendingApplied, - }) + offset := int64(0) + for { + historyInformation, err := m.dwhClient.GetAdjustUserInformation(ctx, usrIDs, maxLimit, offset) + if err != nil { + return nil, errors.Wrapf(err, "can't get adjust user information for ids:#%v", usrIDs) + } + if len(historyInformation) == 0 { + break + } + offset += maxLimit + for _, info := range historyInformation { + historyTimeRanges[info.ID] = append(historyTimeRanges[info.ID], &historyRangeTime{ + MiningSessionSoloPreviouslyEndedAt: info.MiningSessionSoloPreviouslyEndedAt, + MiningSessionSoloStartedAt: info.MiningSessionSoloStartedAt, + MiningSessionSoloEndedAt: info.MiningSessionSoloEndedAt, + ResurrectSoloUsedAt: info.ResurrectSoloUsedAt, + CreatedAt: info.CreatedAt, + SlashingRateSolo: info.SlashingRateSolo, + BalanceT1Pending: info.BalanceT1Pending, + BalanceT1PendingApplied: info.BalanceT1PendingApplied, + BalanceT2Pending: info.BalanceT2Pending, + BalanceT2PendingApplied: info.BalanceT2PendingApplied, + }) + } } if len(historyTimeRanges) == 0 { log.Debug("empty history time ranges") @@ -515,25 +565,8 @@ outer: } updatedUser.ActiveT1Referrals = int32(t1ActiveCounts[usr.ID]) updatedUser.ActiveT2Referrals = int32(t2ActiveCounts[usr.ID]) - updatedUsers = append(updatedUsers, updatedUser) + updatedUsers[updatedUser.ID] = updatedUser } - showDiff(updatedUsers, actualBalancesT1, actualBalancesT2) return updatedUsers, nil } - -func showDiff(recalculatedUsers []*user, actualBalancesT1, actualBalancesT2 map[int64]float64) { - log.Info("id,diffBalanceT1,diffBalanceT2,activeT1Count,activeT2Count,recalculatedBalanceT1,recalculatedBalanceT2,actualBalanceT1,actualBalanceT2,SlashingRateT1,SlashingRateT2") - for _, usr := range recalculatedUsers { - balanceT1Diff := usr.BalanceT1 - balanceT2Diff := usr.BalanceT2 - if _, ok := actualBalancesT1[usr.ID]; ok { - balanceT1Diff -= actualBalancesT1[usr.ID] - } - if _, ok := actualBalancesT2[usr.ID]; ok { - balanceT1Diff -= actualBalancesT2[usr.ID] - } - info := fmt.Sprintf("%v,%v,%v,%v,%v,%v,%v,%v,%v,%v,%v", usr.ID, balanceT1Diff, balanceT2Diff, usr.ActiveT1Referrals, usr.ActiveT2Referrals, usr.BalanceT1, usr.BalanceT2, actualBalancesT1[usr.ID], actualBalancesT2[usr.ID], usr.SlashingRateT1, usr.SlashingRateT2) - log.Info(info) - } -} diff --git a/model/model.go b/model/model.go index 2fe4bea..b394d5c 100644 --- a/model/model.go +++ b/model/model.go @@ -195,9 +195,6 @@ type ( DeserializedUsersKey struct { ID int64 `redis:"-"` } - DeserializedRecalculatedUsersKey struct { - ID int64 `redis:"-"` - } IDT0Field struct { IDT0 int64 `redis:"id_t0,omitempty"` } @@ -253,26 +250,6 @@ func (k *DeserializedUsersKey) SetKey(val string) { log.Panic(err) } -func (k *DeserializedRecalculatedUsersKey) Key() string { - if k == nil || k.ID == 0 { - return "" - } - - return SerializedRecalculatedUsersKey(k.ID) -} - -func (k *DeserializedRecalculatedUsersKey) SetKey(val string) { - if val == "" || val == "recalculated:" { - return - } - if val[0] == 'r' { - val = val[13:] - } - var err error - k.ID, err = strconv.ParseInt(val, 10, 64) - log.Panic(err) -} - func SerializedUsersKey(val any) string { switch typedVal := val.(type) { case string: @@ -291,22 +268,3 @@ func SerializedUsersKey(val any) string { panic(fmt.Sprintf("%#v cannot be used as users key", val)) } } - -func SerializedRecalculatedUsersKey(val any) string { - switch typedVal := val.(type) { - case string: - if typedVal == "" { - return "" - } - - return "recalculated:" + typedVal - case int64: - if typedVal == 0 { - return "" - } - - return "recalculated:" + strconv.FormatInt(typedVal, 10) - default: - panic(fmt.Sprintf("%#v cannot be used as recalculated key", val)) - } -} diff --git a/tokenomics/users.go b/tokenomics/users.go index bf1e7de..ebc7546 100644 --- a/tokenomics/users.go +++ b/tokenomics/users.go @@ -43,9 +43,6 @@ func (s *usersTableSource) Process(ctx context.Context, msg *messagebroker.Messa if err := s.replaceUser(ctx, usr.User); err != nil { return errors.Wrapf(err, "failed to replace user:%#v", usr.User) } - if err := s.initializeRecalculated(ctx, usr.User); err != nil { - return errors.Wrapf(err, "failed to initialize recalculated user:%#v", usr.User) - } return nil } @@ -161,25 +158,6 @@ func (s *usersTableSource) deleteUser(ctx context.Context, usr *users.User) erro return errors.Wrapf(multierror.Append(nil, errs...).ErrorOrNil(), "failed to delete userID:%v,id:%v", usr.ID, id) } -func (s *usersTableSource) initializeRecalculated(ctx context.Context, usr *users.User) error { - internalID, err := GetOrInitInternalID(ctx, s.db, usr.ID) - if err != nil { - return errors.Wrapf(err, "failed to getOrInitInternalID for user:%#v", usr) - } - type ( - user struct { - model.DeserializedRecalculatedUsersKey - model.RecalculatedTiersBalancesAtField - } - ) - recalculated := &user{ - DeserializedRecalculatedUsersKey: model.DeserializedRecalculatedUsersKey{ID: internalID}, - RecalculatedTiersBalancesAtField: model.RecalculatedTiersBalancesAtField{RecalculatedTiersBalancesAt: time.Now()}, - } - - return errors.Wrapf(storage.Set(ctx, s.db, recalculated), "failed to initialize recalculated:%#v", recalculated) -} - func (s *usersTableSource) replaceUser(ctx context.Context, usr *users.User) error { //nolint:funlen // . internalID, err := GetOrInitInternalID(ctx, s.db, usr.ID) if err != nil {