Skip to content

Commit

Permalink
Backup feature to have possibility to restore balances to values befo…
Browse files Browse the repository at this point in the history
…re recalculation was started.
  • Loading branch information
ice-myles committed Oct 20, 2023
1 parent 1253d06 commit 11d3790
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 36 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/goccy/go-json v0.10.2
github.com/hashicorp/go-multierror v1.1.1
github.com/ice-blockchain/eskimo v1.170.0
github.com/ice-blockchain/eskimo v1.171.0
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb
github.com/ice-blockchain/wintr v1.125.0
github.com/pkg/errors v0.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ice-blockchain/eskimo v1.170.0 h1:FgJb6kTbn8/4noTqZUAZVnaSHYmid5omSmgs5oTmXmY=
github.com/ice-blockchain/eskimo v1.170.0/go.mod h1:wnROoQhxpMKt+rI1Zd/ADGQxpRLBcJztFgY7xAS21Jo=
github.com/ice-blockchain/eskimo v1.171.0 h1:ACAlyemaU9BKngNs7h7UZr1VI60BA3QZgjUm4K1BTxM=
github.com/ice-blockchain/eskimo v1.171.0/go.mod h1:wnROoQhxpMKt+rI1Zd/ADGQxpRLBcJztFgY7xAS21Jo=
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb h1:8TnFP3mc7O+tc44kv2e0/TpZKnEVUaKH+UstwfBwRkk=
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb/go.mod h1:ZsQU7i3mxhgBBu43Oev7WPFbIjP4TniN/b1UPNGbrq8=
github.com/ice-blockchain/wintr v1.125.0 h1:pk/SVyztstUF19+JDCufJRMXJeNpchVA4O26xp47l3Y=
Expand Down
3 changes: 2 additions & 1 deletion miner/adoption_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"testing"
stdlibtime "time"

"github.com/stretchr/testify/assert"

"github.com/ice-blockchain/freezer/tokenomics"
"github.com/ice-blockchain/wintr/time"
"github.com/stretchr/testify/assert"
)

func TestGetAdoptionsRange_1AdoptionPerRange(t *testing.T) {
Expand Down
10 changes: 8 additions & 2 deletions miner/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,15 @@ type (
model.IDT0Field
model.IDTMinus1Field
}
userBalanceTiersUpdated struct {
model.DeserializedUsersKey

backupUserUpdated struct {
model.DeserializedBackupUsersKey
model.BalanceT1Field
model.BalanceT2Field
model.SlashingRateT1Field
model.SlashingRateT2Field
model.ActiveT1ReferralsField
model.ActiveT2ReferralsField
}

referral struct {
Expand Down Expand Up @@ -139,6 +144,7 @@ type (
extraBonusStartDate *time.Time
extraBonusIndicesDistribution map[uint16]map[uint16]uint16
recalculationBalanceStartDate *time.Time
balanceBackupStartDate *time.Time
}
config struct {
disableAdvancedTeam *atomic.Pointer[[]string]
Expand Down
134 changes: 108 additions & 26 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func MustStartMining(ctx context.Context, cancel context.CancelFunc) Client {
mi.cancel = cancel
mi.extraBonusStartDate = extrabonusnotifier.MustGetExtraBonusStartDate(ctx, mi.db)
mi.extraBonusIndicesDistribution = extrabonusnotifier.MustGetExtraBonusIndicesDistribution(ctx, mi.db)
mi.recalculationBalanceStartDate = MustGetRecalculationBalancesStartDate(ctx, mi.db)
mi.recalculationBalanceStartDate = mustGetRecalculationBalancesStartDate(ctx, mi.db)
mi.balanceBackupStartDate = mustGetBalancesBackupStartDate(ctx, mi.db)

for workerNumber := int64(0); workerNumber < cfg.Workers; workerNumber++ {
go func(wn int64) {
Expand Down Expand Up @@ -114,7 +115,7 @@ func (m *miner) checkDBHealth(ctx context.Context) error {
return nil
}

func MustGetRecalculationBalancesStartDate(ctx context.Context, db storage.DB) (recalculationBalancesStartDate *time.Time) {
func mustGetRecalculationBalancesStartDate(ctx context.Context, db storage.DB) (recalculationBalancesStartDate *time.Time) {
recalculationBalancesStartDateString, err := db.Get(ctx, "recalculation_balances_start_date").Result()
if err != nil && errors.Is(err, redis.Nil) {
err = nil
Expand All @@ -131,12 +132,28 @@ func MustGetRecalculationBalancesStartDate(ctx context.Context, db storage.DB) (
set, sErr := db.SetNX(ctx, "recalculation_balances_start_date", recalculationBalancesStartDate, 0).Result()
log.Panic(errors.Wrap(sErr, "failed to set recalculation_balances_start_date"))
if !set {
return MustGetRecalculationBalancesStartDate(ctx, db)
return mustGetRecalculationBalancesStartDate(ctx, db)
}

return recalculationBalancesStartDate
}

func mustGetBalancesBackupStartDate(ctx context.Context, db storage.DB) (balancesBackupStartDate *time.Time) {
balancesBackupStartDateString, err := db.Get(ctx, "balances_backup_start_date").Result()
if err != nil && errors.Is(err, redis.Nil) {
err = nil
}
log.Panic(errors.Wrap(err, "failed to get balances_backup_start_date"))
if balancesBackupStartDateString != "" {
balancesBackupStartDate = new(time.Time)
log.Panic(errors.Wrapf(balancesBackupStartDate.UnmarshalText([]byte(balancesBackupStartDateString)), "failed to parse balances_backup_start_date `%v`", balancesBackupStartDateString)) //nolint:lll // .

return
}

return balancesBackupStartDate
}

func (m *miner) mine(ctx context.Context, workerNumber int64) {
dwhClient := dwh.MustConnect(context.Background(), applicationYamlKey)
defer func() {
Expand All @@ -154,8 +171,8 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
currentAdoption = m.getAdoption(ctx, m.db, workerNumber)
workers = cfg.Workers
batchSize = cfg.BatchSize
userKeys, userHistoryKeys, referralKeys = make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, 2*batchSize)
userResults, referralResults = make([]*user, 0, batchSize), make([]*referral, 0, 2*batchSize)
userKeys, userBackupKeys, userHistoryKeys, referralKeys = make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, batchSize), make([]string, 0, 2*batchSize)
userResults, backupUserResults, referralResults = make([]*user, 0, batchSize), make([]*backupUserUpdated, 0, batchSize), make([]*referral, 0, 2*batchSize)
t0Referrals, tMinus1Referrals = make(map[int64]*referral, batchSize), make(map[int64]*referral, batchSize)
t1ReferralsToIncrementActiveValue, t2ReferralsToIncrementActiveValue = make(map[int64]int32, batchSize), make(map[int64]int32, batchSize)
t1ReferralsThatStoppedMining, t2ReferralsThatStoppedMining = make(map[int64]uint32, batchSize), make(map[int64]uint32, batchSize)
Expand All @@ -168,6 +185,9 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
referralsUpdated = make([]*referralUpdated, 0, batchSize)
histories = make([]*model.User, 0, batchSize)
userGlobalRanks = make([]redis.Z, 0, batchSize)
backupedUsers = make(map[int64]*backupUserUpdated, batchSize)
backupUsersUpdated = make([]*backupUserUpdated, 0, batchSize)
recalculatedTiersBalancesUsers = make(map[int64]*user, batchSize)
historyColumns, historyInsertMetadata = dwh.InsertDDL(int(batchSize))
shouldSynchronizeBalanceFunc = func(batchNumberArg uint64) bool { return false }
)
Expand All @@ -191,7 +211,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
if batchNumber == 0 || currentAdoption == nil {
currentAdoption = m.getAdoption(ctx, m.db, workerNumber)
}
userKeys, userHistoryKeys, referralKeys = userKeys[:0], userHistoryKeys[:0], referralKeys[:0]
userKeys, userBackupKeys, userHistoryKeys, referralKeys = userKeys[:0], userBackupKeys[:0], userHistoryKeys[:0], referralKeys[:0]
userResults, referralResults = userResults[:0], referralResults[:0]
msgs, errs = msgs[:0], errs[:0]
updatedUsers = updatedUsers[:0]
Expand All @@ -218,6 +238,12 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
for k := range t2ReferralsToIncrementActiveValue {
delete(t2ReferralsToIncrementActiveValue, k)
}
for k := range backupedUsers {
delete(backupedUsers, k)
}
for k := range recalculatedTiersBalancesUsers {
delete(recalculatedTiersBalancesUsers, k)
}
}
for ctx.Err() == nil {
/******************************************************************************************************************************************************
Expand All @@ -241,6 +267,21 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
if len(userKeys) > 0 {
go m.telemetry.collectElapsed(2, *before.Time)
}
for ix := batchNumber * batchSize; ix < (batchNumber+1)*batchSize; ix++ {
userBackupKeys = append(userBackupKeys, model.SerializedBackupUsersKey((workers*ix)+workerNumber))
}
reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline)
if err := storage.Bind[backupUserUpdated](reqCtx, m.db, userBackupKeys, &backupUserResults); err != nil {
log.Error(errors.Wrapf(err, "[miner] failed to get backuped users for batchNumber:%v,workerNumber:%v", batchNumber, workerNumber))
reqCancel()
now = time.Now()

continue
}
reqCancel()
for _, usr := range backupUserResults {
backupedUsers[usr.ID] = usr
}

/******************************************************************************************************************************************************
2. Fetching T0 & T-1 referrals of the fetched users.
Expand Down Expand Up @@ -296,33 +337,70 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
t0Referrals[ref.ID] = ref
}
}
recalculatedTiersBalancesUsers, err := m.showTiersDiffBalances(ctx, userResults, tMinus1Referrals, t0Referrals)
if err != nil {
log.Error(errors.New("tiers diff balances error"), err)
if m.balanceBackupStartDate.IsNil() {
var err error
recalculatedTiersBalancesUsers, err = m.recalculateTiersBalances(ctx, userResults, tMinus1Referrals, t0Referrals)
if err != nil {
log.Error(errors.New("tiers diff balances error"), err)
}
}
shouldSynchronizeBalance := shouldSynchronizeBalanceFunc(uint64(batchNumber))
for _, usr := range userResults {
if usr.UserID == "" {
continue
}
if recalculatedUsr, ok := recalculatedTiersBalancesUsers[usr.ID]; ok {
diffT1ActiveValue := recalculatedUsr.ActiveT1Referrals - usr.ActiveT1Referrals
diffT2ActiveValue := recalculatedUsr.ActiveT2Referrals - usr.ActiveT2Referrals
if diffT1ActiveValue < 0 && diffT1ActiveValue*-1 > usr.ActiveT1Referrals {
diffT1ActiveValue = -usr.ActiveT1Referrals
}
if diffT2ActiveValue < 0 && diffT2ActiveValue*-1 > usr.ActiveT2Referrals {
diffT2ActiveValue = -usr.ActiveT2Referrals
}
t1ReferralsToIncrementActiveValue[usr.ID] += diffT1ActiveValue
t2ReferralsToIncrementActiveValue[usr.ID] += diffT2ActiveValue
if !m.balanceBackupStartDate.IsNil() {
if backupedUsr, ok := backupedUsers[usr.ID]; ok {
diffT1ActiveValue := backupedUsr.ActiveT1Referrals - usr.ActiveT1Referrals
diffT2ActiveValue := backupedUsr.ActiveT2Referrals - usr.ActiveT2Referrals
if diffT1ActiveValue < 0 && diffT1ActiveValue*-1 > usr.ActiveT1Referrals {
diffT1ActiveValue = -usr.ActiveT1Referrals
}
if diffT2ActiveValue < 0 && diffT2ActiveValue*-1 > usr.ActiveT2Referrals {
diffT2ActiveValue = -usr.ActiveT2Referrals
}
t1ReferralsToIncrementActiveValue[usr.ID] += diffT1ActiveValue
t2ReferralsToIncrementActiveValue[usr.ID] += diffT2ActiveValue

usr.BalanceT1 = recalculatedUsr.BalanceT1
usr.BalanceT2 = recalculatedUsr.BalanceT2
usr.BalanceT1 = backupedUsr.BalanceT1
usr.BalanceT2 = backupedUsr.BalanceT2

usr.SlashingRateT1 = recalculatedUsr.SlashingRateT1
usr.SlashingRateT2 = recalculatedUsr.SlashingRateT2
usr.SlashingRateT1 = backupedUsr.SlashingRateT1
usr.SlashingRateT2 = backupedUsr.SlashingRateT2
}
} else {
if recalculatedUsr, ok := recalculatedTiersBalancesUsers[usr.ID]; ok {
diffT1ActiveValue := recalculatedUsr.ActiveT1Referrals - usr.ActiveT1Referrals
diffT2ActiveValue := recalculatedUsr.ActiveT2Referrals - usr.ActiveT2Referrals
if diffT1ActiveValue < 0 && diffT1ActiveValue*-1 > usr.ActiveT1Referrals {
diffT1ActiveValue = -usr.ActiveT1Referrals
}
if diffT2ActiveValue < 0 && diffT2ActiveValue*-1 > usr.ActiveT2Referrals {
diffT2ActiveValue = -usr.ActiveT2Referrals
}
t1ReferralsToIncrementActiveValue[usr.ID] += diffT1ActiveValue
t2ReferralsToIncrementActiveValue[usr.ID] += diffT2ActiveValue

usr.BalanceT1 = recalculatedUsr.BalanceT1
usr.BalanceT2 = recalculatedUsr.BalanceT2

usr.SlashingRateT1 = recalculatedUsr.SlashingRateT1
usr.SlashingRateT2 = recalculatedUsr.SlashingRateT2

if _, ok := backupedUsers[usr.ID]; !ok {
backupUsersUpdated = append(backupUsersUpdated, &backupUserUpdated{
DeserializedBackupUsersKey: model.DeserializedBackupUsersKey{ID: usr.ID},
BalanceT1Field: usr.BalanceT1Field,
BalanceT2Field: usr.BalanceT2Field,
SlashingRateT1Field: usr.SlashingRateT1Field,
SlashingRateT2Field: usr.SlashingRateT2Field,
ActiveT1ReferralsField: model.ActiveT1ReferralsField{ActiveT1Referrals: usr.ActiveT1Referrals + diffT1ActiveValue},
ActiveT2ReferralsField: model.ActiveT2ReferralsField{ActiveT2Referrals: usr.ActiveT2Referrals + diffT2ActiveValue},
})
}
}
}

var t0Ref, tMinus1Ref *referral
if usr.IDT0 > 0 {
t0Ref = t0Referrals[usr.IDT0]
Expand Down Expand Up @@ -478,12 +556,11 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
}

var pipeliner redis.Pipeliner
if len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks) > 0 {
if len(t1ReferralsThatStoppedMining)+len(t2ReferralsThatStoppedMining)+len(extraBonusOnlyUpdatedUsers)+len(referralsUpdated)+len(userGlobalRanks)+len(backupUsersUpdated) > 0 {
pipeliner = m.db.TxPipeline()
} else {
pipeliner = m.db.Pipeline()
}

before = time.Now()
reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline)
if responses, err := pipeliner.Pipelined(reqCtx, func(pipeliner redis.Pipeliner) error {
Expand Down Expand Up @@ -522,6 +599,11 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
return err
}
}
for _, value := range backupUsersUpdated {
if err := pipeliner.HSet(reqCtx, value.Key(), storage.SerializeValue(value)...).Err(); err != nil {
return err
}
}
if len(userGlobalRanks) > 0 {
if err := pipeliner.ZAdd(reqCtx, "top_miners", userGlobalRanks...).Err(); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion miner/recalculate_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func initializeEmptyUser(updatedUser, usr *user) *user {
return &newUser
}

func (m *miner) showTiersDiffBalances(ctx context.Context, users []*user, tMinus1Referrals map[int64]*referral, t0Referrals map[int64]*referral) (map[int64]*user, error) {
func (m *miner) recalculateTiersBalances(ctx context.Context, users []*user, tMinus1Referrals map[int64]*referral, t0Referrals map[int64]*referral) (map[int64]*user, error) {
var (
needToBeRecalculatedUsers []*user
actualBalancesT1 = make(map[int64]float64)
Expand Down
45 changes: 42 additions & 3 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ type (
ExtraBonusLastClaimAvailableAtField struct {
ExtraBonusLastClaimAvailableAt *time.Time `redis:"extra_bonus_last_claim_available_at,omitempty"`
}
RecalculatedTiersBalancesAtField struct {
RecalculatedTiersBalancesAt *time.Time `redis:"recalculated_tiers_balances_at,omitempty"`
}
UserIDField struct {
UserID string `redis:"user_id,omitempty"`
}
Expand Down Expand Up @@ -228,6 +225,9 @@ type (
HideRankingField struct {
HideRanking bool `redis:"hide_ranking"`
}
DeserializedBackupUsersKey struct {
ID int64 `redis:"-"`
}
)

func (k *DeserializedUsersKey) Key() string {
Expand Down Expand Up @@ -268,3 +268,42 @@ func SerializedUsersKey(val any) string {
panic(fmt.Sprintf("%#v cannot be used as users key", val))
}
}

func (k *DeserializedBackupUsersKey) Key() string {
if k == nil || k.ID == 0 {
return ""
}

return SerializedBackupUsersKey(k.ID)
}

func (k *DeserializedBackupUsersKey) SetKey(val string) {
if val == "" || val == "backup:" {
return
}
if val[0] == 'b' {
val = val[7:]
}
var err error
k.ID, err = strconv.ParseInt(val, 10, 64)
log.Panic(err)
}

func SerializedBackupUsersKey(val any) string {
switch typedVal := val.(type) {
case string:
if typedVal == "" {
return ""
}

return "backup:" + typedVal
case int64:
if typedVal == 0 {
return ""
}

return "backup:" + strconv.FormatInt(typedVal, 10)
default:
panic(fmt.Sprintf("%#v cannot be used as backup key", val))
}
}

0 comments on commit 11d3790

Please sign in to comment.