Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Balances recalculation optimizations #38

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bookkeeper/storage/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type (
Ping(ctx context.Context) error
Insert(ctx context.Context, columns *Columns, input InsertMetadata, usrs []*model.User) error
SelectBalanceHistory(ctx context.Context, id int64, createdAts []stdlibtime.Time) ([]*BalanceHistory, error)
GetAdjustUserInformation(ctx context.Context, userIDs map[int64]struct{}, limit, offset int64) ([]*AdjustUserInfo, error)
GetAdjustUserInformation(ctx context.Context, userIDs []string, limit, offset int64) ([]*AdjustUserInfo, error)
}
AdjustUserInfo struct {
MiningSessionSoloStartedAt *time.Time
Expand All @@ -33,6 +33,7 @@ type (
MiningSessionSoloPreviouslyEndedAt *time.Time
CreatedAt *time.Time
ResurrectSoloUsedAt *time.Time
UserID string
ID int64
SlashingRateSolo float64
SlashingRateT1 float64
Expand Down
17 changes: 11 additions & 6 deletions bookkeeper/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (db *db) SelectBalanceHistory(ctx context.Context, id int64, createdAts []s
return res, nil
}

func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs map[int64]struct{}, limit, offset int64) ([]*AdjustUserInfo, error) {
func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs []string, limit, offset int64) ([]*AdjustUserInfo, error) {
const (
maxIDCount = 25_000
)
Expand All @@ -453,16 +453,16 @@ func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs map[int64]st
counter = 0
)
var userIDArray []string
for userID, _ := range userIDs {
userIDArray = append(userIDArray, fmt.Sprint(userID))
for _, id := range userIDs {
userIDArray = append(userIDArray, id)
counter++
if counter >= maxIDCount { // Hack not to have 'Max query size exceeded' error.
result, err := db.getAdjustUserInformation(ctx, userIDArray, limit, offset)
if err != nil {
return nil, errors.Wrapf(err, "can't get adjust user information for userIDs:%#v", userIDArray)
}
res = append(res, result...)
userIDArray = nil
userIDArray = userIDArray[:0]
counter = 0

continue
Expand All @@ -482,6 +482,7 @@ func (db *db) GetAdjustUserInformation(ctx context.Context, userIDs map[int64]st
func (db *db) getAdjustUserInformation(ctx context.Context, userIDArray []string, limit, offset int64) ([]*AdjustUserInfo, error) {
var (
id = make(proto.ColInt64, 0, len(userIDArray))
userID = &proto.ColStr{Buf: make([]byte, 0, 40*len(userIDArray)), Pos: make([]proto.Position, 0, len(userIDArray))}
miningSessionSoloStartedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC}
miningSessionSoloEndedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC}
miningSessionSoloPreviouslyEndedAt = proto.ColDateTime64{Data: make([]proto.DateTime64, 0, len(userIDArray)), Location: stdlibtime.UTC}
Expand All @@ -496,7 +497,8 @@ func (db *db) getAdjustUserInformation(ctx context.Context, userIDArray []string
res = make([]*AdjustUserInfo, 0, len(userIDArray))
)
if err := db.pools[atomic.AddUint64(&db.currentIndex, 1)%uint64(len(db.pools))].Do(ctx, ch.Query{
Body: fmt.Sprintf(`SELECT id,
Body: fmt.Sprintf(`SELECT id,
user_id,
mining_session_solo_started_at,
mining_session_solo_ended_at,
mining_session_solo_previously_ended_at,
Expand All @@ -513,8 +515,9 @@ func (db *db) getAdjustUserInformation(ctx context.Context, userIDArray []string
ORDER BY created_at ASC
LIMIT %[3]v, %[4]v
`, tableName, strings.Join(userIDArray, ","), offset, limit),
Result: append(make(proto.Results, 0, 12),
Result: append(make(proto.Results, 0, 13),
proto.ResultColumn{Name: "id", Data: &id},
proto.ResultColumn{Name: "user_id", Data: userID},
proto.ResultColumn{Name: "mining_session_solo_started_at", Data: &miningSessionSoloStartedAt},
proto.ResultColumn{Name: "mining_session_solo_ended_at", Data: &miningSessionSoloEndedAt},
proto.ResultColumn{Name: "mining_session_solo_previously_ended_at", Data: &miningSessionSoloPreviouslyEndedAt},
Expand All @@ -531,6 +534,7 @@ func (db *db) getAdjustUserInformation(ctx context.Context, userIDArray []string
for ix := 0; ix < block.Rows; ix++ {
res = append(res, &AdjustUserInfo{
ID: (&id).Row(ix),
UserID: userID.Row(ix),
MiningSessionSoloStartedAt: time.New((&miningSessionSoloStartedAt).Row(ix)),
MiningSessionSoloEndedAt: time.New((&miningSessionSoloEndedAt).Row(ix)),
MiningSessionSoloPreviouslyEndedAt: time.New((&miningSessionSoloPreviouslyEndedAt).Row(ix)),
Expand All @@ -545,6 +549,7 @@ func (db *db) getAdjustUserInformation(ctx context.Context, userIDArray []string
})
}
(&id).Reset()
userID.Reset()
(&miningSessionSoloStartedAt).Reset()
(&miningSessionSoloEndedAt).Reset()
(&miningSessionSoloPreviouslyEndedAt).Reset()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-20 v0.4.1 // indirect
github.com/quic-go/quic-go v0.39.3 // indirect
github.com/quic-go/quic-go v0.40.0 // indirect
github.com/refraction-networking/utls v1.5.4 // indirect
github.com/rs/zerolog v1.31.0 // indirect
github.com/sagikazarmark/locafero v0.3.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/qtls-go1-20 v0.4.1 h1:D33340mCNDAIKBqXuAvexTNMUByrYmFYVfKfDN5nfFs=
github.com/quic-go/qtls-go1-20 v0.4.1/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k=
github.com/quic-go/quic-go v0.39.3 h1:o3YB6t2SR+HU/pgwF29kJ6g4jJIJEwEZ8CKia1h1TKg=
github.com/quic-go/quic-go v0.39.3/go.mod h1:T09QsDQWjLiQ74ZmacDfqZmhY/NLnw5BC40MANNNZ1Q=
github.com/quic-go/quic-go v0.40.0 h1:GYd1iznlKm7dpHD7pOVpUvItgMPo/jrMgDWZhMCecqw=
github.com/quic-go/quic-go v0.40.0/go.mod h1:PeN7kuVJ4xZbxSv/4OX6S1USOX8MJvydwpTx31vx60c=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o=
Expand Down
11 changes: 5 additions & 6 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
userGlobalRanks = userGlobalRanks[:0]
referralsThatStoppedMining = referralsThatStoppedMining[:0]
allAdoptions = allAdoptions[:0]
backupUsersUpdated = backupUsersUpdated[:0]
for k := range t0Referrals {
delete(t0Referrals, k)
}
Expand Down Expand Up @@ -378,15 +379,13 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
usr.SlashingRateT1 = backupedUsr.SlashingRateT1
usr.SlashingRateT2 = backupedUsr.SlashingRateT2

backupUsersUpdated = append(backupUsersUpdated, &backupUserUpdated{
DeserializedBackupUsersKey: model.DeserializedBackupUsersKey{ID: usr.ID},
BalancesBackupUsedAtField: model.BalancesBackupUsedAtField{BalancesBackupUsedAt: time.Now()},
})
backupedUsr.BalancesBackupUsedAt = time.Now()
backupUsersUpdated = append(backupUsersUpdated, backupedUsr)
}
} else {
if recalculatedUsr := m.recalculateUser(usr, allAdoptions, recalculationHistory); recalculatedUsr != nil {
diffT1ActiveValue := recalculationHistory.T1ActiveCounts[usr.ID] - usr.ActiveT1Referrals
diffT2ActiveValue := recalculationHistory.T2ActiveCounts[usr.ID] - usr.ActiveT2Referrals
diffT1ActiveValue := recalculationHistory.T1ActiveCounts[usr.UserID] - usr.ActiveT1Referrals
diffT2ActiveValue := recalculationHistory.T2ActiveCounts[usr.UserID] - usr.ActiveT2Referrals

if diffT1ActiveValue < 0 && diffT1ActiveValue*-1 > usr.ActiveT1Referrals {
diffT1ActiveValue = -usr.ActiveT1Referrals
Expand Down
Loading
Loading