diff --git a/bookkeeper/storage/contract.go b/bookkeeper/storage/contract.go index f38dcf7..411b128 100644 --- a/bookkeeper/storage/contract.go +++ b/bookkeeper/storage/contract.go @@ -24,7 +24,7 @@ type ( Ping(ctx context.Context) error Insert(ctx context.Context, columns *Columns, input InsertMetadata, usrs []*model.User) error SelectBalanceHistory(ctx context.Context, id int64, createdAts []stdlibtime.Time) ([]*BalanceHistory, error) - GetAdjustUserInformation(ctx context.Context, userIDs map[int64]struct{}, limit, offset int64) ([]*AdjustUserInfo, error) + GetAdjustUserInformation(ctx context.Context, userIDs []string, limit, offset int64) ([]*AdjustUserInfo, error) } AdjustUserInfo struct { MiningSessionSoloStartedAt *time.Time @@ -33,6 +33,7 @@ type ( MiningSessionSoloPreviouslyEndedAt *time.Time CreatedAt *time.Time ResurrectSoloUsedAt *time.Time + UserID string ID int64 SlashingRateSolo float64 SlashingRateT1 float64 diff --git a/bookkeeper/storage/storage.go b/bookkeeper/storage/storage.go index eca0f42..205f42a 100644 --- a/bookkeeper/storage/storage.go +++ b/bookkeeper/storage/storage.go @@ -444,7 +444,7 @@ func (db *db) SelectBalanceHistory(ctx context.Context, id int64, createdAts []s return res, nil } -func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs map[int64]struct{}, limit, offset int64) ([]*AdjustUserInfo, error) { +func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs []string, limit, offset int64) ([]*AdjustUserInfo, error) { const ( maxIDCount = 25_000 ) @@ -453,8 +453,8 @@ func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs map[int64]st counter = 0 ) var userIDArray []string - for userID, _ := range userIDs { - userIDArray = append(userIDArray, fmt.Sprint(userID)) + for _, id := range userIDs { + userIDArray = append(userIDArray, id) counter++ if counter >= maxIDCount { // Hack not to have 'Max query size exceeded' error. result, err := db.getAdjustUserInformation(ctx, userIDArray, limit, offset) @@ -462,7 +462,7 @@ func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs map[int64]st return nil, errors.Wrapf(err, "can't get adjust user information for userIDs:%#v", userIDArray) } res = append(res, result...) - userIDArray = nil + userIDArray = userIDArray[:0] counter = 0 continue @@ -482,6 +482,7 @@ func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs map[int64]st func (db *db) getAdjustUserInformation(ctx context.Context, userIDArray []string, limit, offset int64) ([]*AdjustUserInfo, error) { var ( id = make(proto.ColInt64, 0, len(userIDArray)) + userID = &proto.ColStr{Buf: make([]byte, 0, 40*len(userIDArray)), Pos: make([]proto.Position, 0, len(userIDArray))} miningSessionSoloStartedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC} miningSessionSoloEndedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC} miningSessionSoloPreviouslyEndedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC} @@ -496,7 +497,8 @@ func (db *db) getAdjustUserInformation(ctx context.Context, userIDArray []string res = make([]*AdjustUserInfo, 0, len(userIDArray)) ) if err := db.pools[atomic.AddUint64(&db.currentIndex, 1)%uint64(len(db.pools))].Do(ctx, ch.Query{ - Body: fmt.Sprintf(`SELECT id, + Body: fmt.Sprintf(`SELECT id, + user_id, mining_session_solo_started_at, mining_session_solo_ended_at, mining_session_solo_previously_ended_at, @@ -513,8 +515,9 @@ func (db *db) getAdjustUserInformation(ctx context.Context, userIDArray []string ORDER BY created_at ASC LIMIT %[3]v, %[4]v `, tableName, strings.Join(userIDArray, ","), offset, limit), - Result: append(make(proto.Results, 0, 12), + Result: append(make(proto.Results, 0, 13), proto.ResultColumn{Name: "id", Data: &id}, + proto.ResultColumn{Name: "user_id", Data: userID}, proto.ResultColumn{Name: "mining_session_solo_started_at", Data: &miningSessionSoloStartedAt}, proto.ResultColumn{Name: "mining_session_solo_ended_at", Data: &miningSessionSoloEndedAt}, proto.ResultColumn{Name: "mining_session_solo_previously_ended_at", Data: &miningSessionSoloPreviouslyEndedAt}, @@ -531,6 +534,7 @@ func (db *db) getAdjustUserInformation(ctx context.Context, userIDArray []string for ix := 0; ix < block.Rows; ix++ { res = append(res, &AdjustUserInfo{ ID: (&id).Row(ix), + UserID: userID.Row(ix), MiningSessionSoloStartedAt: time.New((&miningSessionSoloStartedAt).Row(ix)), MiningSessionSoloEndedAt: time.New((&miningSessionSoloEndedAt).Row(ix)), MiningSessionSoloPreviouslyEndedAt: time.New((&miningSessionSoloPreviouslyEndedAt).Row(ix)), @@ -545,6 +549,7 @@ func (db *db) getAdjustUserInformation(ctx context.Context, userIDArray []string }) } (&id).Reset() + userID.Reset() (&miningSessionSoloStartedAt).Reset() (&miningSessionSoloEndedAt).Reset() (&miningSessionSoloPreviouslyEndedAt).Reset() diff --git a/go.mod b/go.mod index 29809a2..c813002 100644 --- a/go.mod +++ b/go.mod @@ -118,7 +118,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/quic-go/qpack v0.4.0 // indirect github.com/quic-go/qtls-go1-20 v0.4.1 // indirect - github.com/quic-go/quic-go v0.39.3 // indirect + github.com/quic-go/quic-go v0.40.0 // indirect github.com/refraction-networking/utls v1.5.4 // indirect github.com/rs/zerolog v1.31.0 // indirect github.com/sagikazarmark/locafero v0.3.0 // indirect diff --git a/go.sum b/go.sum index 4c02971..21dcae8 100644 --- a/go.sum +++ b/go.sum @@ -403,8 +403,8 @@ github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A= github.com/quic-go/qtls-go1-20 v0.4.1 h1:D33340mCNDAIKBqXuAvexTNMUByrYmFYVfKfDN5nfFs= github.com/quic-go/qtls-go1-20 v0.4.1/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k= -github.com/quic-go/quic-go v0.39.3 h1:o3YB6t2SR+HU/pgwF29kJ6g4jJIJEwEZ8CKia1h1TKg= -github.com/quic-go/quic-go v0.39.3/go.mod h1:T09QsDQWjLiQ74ZmacDfqZmhY/NLnw5BC40MANNNZ1Q= +github.com/quic-go/quic-go v0.40.0 h1:GYd1iznlKm7dpHD7pOVpUvItgMPo/jrMgDWZhMCecqw= +github.com/quic-go/quic-go v0.40.0/go.mod h1:PeN7kuVJ4xZbxSv/4OX6S1USOX8MJvydwpTx31vx60c= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o= diff --git a/miner/miner.go b/miner/miner.go index cab9fb1..03e6d0a 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -215,6 +215,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { userGlobalRanks = userGlobalRanks[:0] referralsThatStoppedMining = referralsThatStoppedMining[:0] allAdoptions = allAdoptions[:0] + backupUsersUpdated = backupUsersUpdated[:0] for k := range t0Referrals { delete(t0Referrals, k) } @@ -378,15 +379,13 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) { usr.SlashingRateT1 = backupedUsr.SlashingRateT1 usr.SlashingRateT2 = backupedUsr.SlashingRateT2 - backupUsersUpdated = append(backupUsersUpdated, &backupUserUpdated{ - DeserializedBackupUsersKey: model.DeserializedBackupUsersKey{ID: usr.ID}, - BalancesBackupUsedAtField: model.BalancesBackupUsedAtField{BalancesBackupUsedAt: time.Now()}, - }) + backupedUsr.BalancesBackupUsedAt = time.Now() + backupUsersUpdated = append(backupUsersUpdated, backupedUsr) } } else { if recalculatedUsr := m.recalculateUser(usr, allAdoptions, recalculationHistory); recalculatedUsr != nil { - diffT1ActiveValue := recalculationHistory.T1ActiveCounts[usr.ID] - usr.ActiveT1Referrals - diffT2ActiveValue := recalculationHistory.T2ActiveCounts[usr.ID] - usr.ActiveT2Referrals + diffT1ActiveValue := recalculationHistory.T1ActiveCounts[usr.UserID] - usr.ActiveT1Referrals + diffT2ActiveValue := recalculationHistory.T2ActiveCounts[usr.UserID] - usr.ActiveT2Referrals if diffT1ActiveValue < 0 && diffT1ActiveValue*-1 > usr.ActiveT1Referrals { diffT1ActiveValue = -usr.ActiveT1Referrals diff --git a/miner/recalculate_balance.go b/miner/recalculate_balance.go index 6c49934..24e13b8 100644 --- a/miner/recalculate_balance.go +++ b/miner/recalculate_balance.go @@ -8,10 +8,13 @@ import ( stdlibtime "time" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" "github.com/ice-blockchain/eskimo/users" + "github.com/ice-blockchain/freezer/model" "github.com/ice-blockchain/freezer/tokenomics" storagePG "github.com/ice-blockchain/wintr/connectors/storage/v2" + "github.com/ice-blockchain/wintr/connectors/storage/v3" "github.com/ice-blockchain/wintr/log" "github.com/ice-blockchain/wintr/time" ) @@ -20,19 +23,14 @@ const ( maxLimit int64 = 10000 ) -var ( - errWrongInternalID = errors.New("can't get internal id") -) - type ( - UserID string pgUser struct { Active *users.NotExpired - ID, ReferredBy UserID + ID, ReferredBy string ReferralType string } pgUserCreated struct { - ID UserID + ID string CreatedAt *time.Time } @@ -57,17 +55,18 @@ type ( } historyData struct { - HistoryTimeRanges map[int64][]*historyRangeTime - T1Referrals, T2Referrals map[int64][]int64 - T1ActiveCounts, T2ActiveCounts map[int64]int32 + NeedToBeRecalculatedUsers map[string]struct{} + HistoryTimeRanges map[string][]*historyRangeTime + T1Referrals, T2Referrals map[string][]string + T1ActiveCounts, T2ActiveCounts map[string]int32 } ) -func (m *miner) getUsers(ctx context.Context, users []*user) (map[int64]*pgUserCreated, error) { +func (m *miner) getUsers(ctx context.Context, users []*user) (map[string]*pgUserCreated, error) { var ( userIDs []string offset int64 = 0 - result = make(map[int64]*pgUserCreated, len(users)) + result = make(map[string]*pgUserCreated, len(users)) ) for _, val := range users { userIDs = append(userIDs, val.UserID) @@ -88,32 +87,26 @@ func (m *miner) getUsers(ctx context.Context, users []*user) (map[int64]*pgUserC } offset += maxLimit for _, row := range rows { - id, err := tokenomics.GetInternalID(ctx, m.db, string(row.ID)) - if err != nil { - log.Error(errWrongInternalID, row.ID) - - continue - } - result[id] = row + result[row.ID] = row } } return result, nil } -func (m *miner) collectTiers(ctx context.Context, users []*user) ( - t1Referrals, t2Referrals map[int64][]int64, t1ActiveCounts, t2ActiveCounts map[int64]int32, err error, +func (m *miner) collectTiers(ctx context.Context, needToBeRecalculatedUsers map[string]struct{}) ( + t1Referrals, t2Referrals map[string][]string, t1ActiveCounts, t2ActiveCounts map[string]int32, err error, ) { var ( - referredByIDs []string - offset int64 = 0 - now = time.Now() + userIDs = make([]string, 0, len(needToBeRecalculatedUsers)) + offset int64 = 0 + now = time.Now() ) - t1ActiveCounts, t2ActiveCounts = make(map[int64]int32, len(users)), make(map[int64]int32, len(users)) - t1Referrals, t2Referrals = make(map[int64][]int64), make(map[int64][]int64) - for _, val := range users { - referredByIDs = append(referredByIDs, val.UserID) + for key := range needToBeRecalculatedUsers { + userIDs = append(userIDs, key) } + t1ActiveCounts, t2ActiveCounts = make(map[string]int32, len(needToBeRecalculatedUsers)), make(map[string]int32, len(needToBeRecalculatedUsers)) + t1Referrals, t2Referrals = make(map[string][]string), make(map[string][]string) for { sql := `SELECT * FROM( SELECT @@ -149,7 +142,7 @@ func (m *miner) collectTiers(ctx context.Context, users []*user) ( AND t2.username != t2.id ) X LIMIT $3 OFFSET $4` - rows, err := storagePG.Select[pgUser](ctx, m.dbPG, sql, now.Time, referredByIDs, maxLimit, offset) + rows, err := storagePG.Select[pgUser](ctx, m.dbPG, sql, now.Time, userIDs, maxLimit, offset) if err != nil { return nil, nil, nil, nil, errors.Wrap(err, "can't get referrals from pg for showing actual data") } @@ -159,27 +152,15 @@ func (m *miner) collectTiers(ctx context.Context, users []*user) ( offset += maxLimit for _, row := range rows { if row.ReferredBy != "bogus" && row.ReferredBy != "icenetwork" && row.ID != "bogus" && row.ID != "icenetwork" { - referredByID, err := tokenomics.GetInternalID(ctx, m.db, string(row.ReferredBy)) - if err != nil { - log.Error(errWrongInternalID, referredByID) - - continue - } - id, err := tokenomics.GetInternalID(ctx, m.db, string(row.ID)) - if err != nil { - log.Error(errWrongInternalID, row.ID) - - continue - } if row.ReferralType == "T1" { - t1Referrals[referredByID] = append(t1Referrals[referredByID], id) + t1Referrals[row.ReferredBy] = append(t1Referrals[row.ReferredBy], row.ID) if row.Active != nil && *row.Active { - t1ActiveCounts[referredByID]++ + t1ActiveCounts[row.ReferredBy]++ } } else if row.ReferralType == "T2" { - t2Referrals[referredByID] = append(t2Referrals[referredByID], id) + t2Referrals[row.ReferredBy] = append(t2Referrals[row.ReferredBy], row.ID) if row.Active != nil && *row.Active { - t2ActiveCounts[referredByID]++ + t2ActiveCounts[row.ReferredBy]++ } } else { log.Panic("wrong tier type") @@ -273,12 +254,38 @@ func initializeEmptyUser(updatedUser, usr *user) *user { return &newUser } +func getInternalIDs(ctx context.Context, db storage.DB, keys ...string) ([]string, error) { + if cmdResults, err := db.Pipelined(ctx, func(pipeliner redis.Pipeliner) error { + if err := pipeliner.MGet(ctx, keys...).Err(); err != nil { + return err + } + + return nil + }); err != nil { + return nil, err + } else { + results := make([]string, 0, len(cmdResults)) + for _, cmdResult := range cmdResults { + sliceResult := cmdResult.(*redis.SliceCmd) + for _, val := range sliceResult.Val() { + if val == nil { + continue + } + results = append(results, val.(string)) + } + } + + return results, nil + } +} + func (m *miner) gatherHistoryAndReferralsInformation(ctx context.Context, users []*user) (history *historyData, err error) { + if len(users) == 0 { + return nil, nil + } var ( - needToBeRecalculatedUsers []*user - actualBalancesT1 = make(map[int64]float64) - actualBalancesT2 = make(map[int64]float64) - historyTimeRanges = make(map[int64][]*historyRangeTime) + needToBeRecalculatedUsers = make(map[string]struct{}, len(users)) + historyTimeRanges = make(map[string][]*historyRangeTime) ) usrs, err := m.getUsers(ctx, users) if err != nil { @@ -288,16 +295,14 @@ func (m *miner) gatherHistoryAndReferralsInformation(ctx context.Context, users if usr.UserID == "" || usr.Username == "" { continue } - if _, ok := usrs[usr.ID]; ok { - if usrs[usr.ID].CreatedAt == nil || usrs[usr.ID].CreatedAt.After(*m.recalculationBalanceStartDate.Time) { + if _, ok := usrs[usr.UserID]; ok { + if usrs[usr.UserID].CreatedAt == nil || usrs[usr.UserID].CreatedAt.After(*m.recalculationBalanceStartDate.Time) { continue } } - needToBeRecalculatedUsers = append(needToBeRecalculatedUsers, usr) - actualBalancesT1[usr.ID] = usr.BalanceT1 - actualBalancesT2[usr.ID] = usr.BalanceT2 + needToBeRecalculatedUsers[usr.UserID] = struct{}{} } - if len(users) == 0 { + if len(needToBeRecalculatedUsers) == 0 { return nil, nil } t1Referrals, t2Referrals, t1ActiveCounts, t2ActiveCounts, err := m.collectTiers(ctx, needToBeRecalculatedUsers) @@ -307,35 +312,36 @@ func (m *miner) gatherHistoryAndReferralsInformation(ctx context.Context, users if len(t1Referrals) == 0 && len(t2Referrals) == 0 { return nil, nil } - usrIDs := make(map[int64]struct{}, len(t1Referrals)+len(t2Referrals)+len(needToBeRecalculatedUsers)) + userKeys := make([]string, 0, len(t1Referrals)+len(t2Referrals)+len(needToBeRecalculatedUsers)) for _, values := range t1Referrals { for _, val := range values { - usrIDs[val] = struct{}{} + userKeys = append(userKeys, model.SerializedUsersKey(val)) } } for _, values := range t2Referrals { for _, val := range values { - usrIDs[val] = struct{}{} + userKeys = append(userKeys, model.SerializedUsersKey(val)) } } - for _, usr := range needToBeRecalculatedUsers { - usrIDs[usr.ID] = struct{}{} + for key, _ := range needToBeRecalculatedUsers { + userKeys = append(userKeys, model.SerializedUsersKey(key)) } - if len(usrIDs) == 0 { - return nil, nil + userResults, err := getInternalIDs(ctx, m.db, userKeys...) + if err != nil { + return nil, errors.Wrapf(err, "failed to get internal ids for:%v", userKeys) } offset := int64(0) for { - historyInformation, err := m.dwhClient.GetAdjustUserInformation(ctx, usrIDs, maxLimit, offset) + historyInformation, err := m.dwhClient.GetAdjustUserInformation(ctx, userResults, maxLimit, offset) if err != nil { - return nil, errors.Wrapf(err, "can't get adjust user information for ids:#%v", usrIDs) + return nil, errors.Wrapf(err, "can't get adjust user information for ids:#%v", userResults) } if len(historyInformation) == 0 { break } offset += maxLimit for _, info := range historyInformation { - historyTimeRanges[info.ID] = append(historyTimeRanges[info.ID], &historyRangeTime{ + historyTimeRanges[info.UserID] = append(historyTimeRanges[info.UserID], &historyRangeTime{ MiningSessionSoloPreviouslyEndedAt: info.MiningSessionSoloPreviouslyEndedAt, MiningSessionSoloStartedAt: info.MiningSessionSoloStartedAt, MiningSessionSoloEndedAt: info.MiningSessionSoloEndedAt, @@ -354,11 +360,12 @@ func (m *miner) gatherHistoryAndReferralsInformation(ctx context.Context, users } return &historyData{ - HistoryTimeRanges: historyTimeRanges, - T1Referrals: t1Referrals, - T2Referrals: t2Referrals, - T1ActiveCounts: t1ActiveCounts, - T2ActiveCounts: t2ActiveCounts, + NeedToBeRecalculatedUsers: needToBeRecalculatedUsers, + HistoryTimeRanges: historyTimeRanges, + T1Referrals: t1Referrals, + T2Referrals: t2Referrals, + T1ActiveCounts: t1ActiveCounts, + T2ActiveCounts: t2ActiveCounts, }, nil } @@ -366,7 +373,10 @@ func (m *miner) recalculateUser(usr *user, adoptions []*tokenomics.Adoption[floa if history == nil || history.HistoryTimeRanges == nil || (history.T1Referrals == nil && history.T2Referrals == nil) || adoptions == nil { return nil } - if _, ok := history.HistoryTimeRanges[usr.ID]; ok { + if _, ok := history.NeedToBeRecalculatedUsers[usr.UserID]; !ok { + return nil + } + if _, ok := history.HistoryTimeRanges[usr.UserID]; ok { var ( isResurrected bool slashingLastEndedAt *time.Time @@ -380,7 +390,7 @@ func (m *miner) recalculateUser(usr *user, adoptions []*tokenomics.Adoption[floa updatedUser.BalanceT2 = 0 updatedUser.BalanceLastUpdatedAt = nil - for _, usrRange := range history.HistoryTimeRanges[usr.ID] { + for _, usrRange := range history.HistoryTimeRanges[usr.UserID] { if updatedUser == nil { updatedUser = initializeEmptyUser(updatedUser, usr) } @@ -446,8 +456,8 @@ func (m *miner) recalculateUser(usr *user, adoptions []*tokenomics.Adoption[floa /****************************************************************************************************************************************************** 4. T1 Balance calculation for the current user time range. ******************************************************************************************************************************************************/ - if _, ok := history.T1Referrals[usr.ID]; ok { - for _, refID := range history.T1Referrals[usr.ID] { + if _, ok := history.T1Referrals[usr.UserID]; ok { + for _, refID := range history.T1Referrals[usr.UserID] { if _, ok := history.HistoryTimeRanges[refID]; ok { var previousT1MiningSessionStartedAt, previousT1MiningSessionEndedAt *time.Time for _, timeRange := range history.HistoryTimeRanges[refID] { @@ -501,8 +511,8 @@ func (m *miner) recalculateUser(usr *user, adoptions []*tokenomics.Adoption[floa /****************************************************************************************************************************************************** 5. T2 Balance calculation for the current user time range. ******************************************************************************************************************************************************/ - if _, ok := history.T2Referrals[usr.ID]; ok { - for _, refID := range history.T2Referrals[usr.ID] { + if _, ok := history.T2Referrals[usr.UserID]; ok { + for _, refID := range history.T2Referrals[usr.UserID] { if _, ok := history.HistoryTimeRanges[refID]; ok { var previousT2MiningSessionStartedAt, previousT2MiningSessionEndedAt *time.Time for _, timeRange := range history.HistoryTimeRanges[refID] {