Skip to content

Commit

Permalink
[raft] add clock to driver and storemap (#6829)
Browse files Browse the repository at this point in the history
  • Loading branch information
luluz66 authored Jun 14, 2024
1 parent 500e510 commit 48545ef
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 16 deletions.
1 change: 1 addition & 0 deletions enterprise/server/raft/driver/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//server/interfaces",
"//server/util/alert",
"//server/util/log",
"@com_github_jonboulle_clockwork//:clockwork",
],
)

Expand Down
19 changes: 11 additions & 8 deletions enterprise/server/raft/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/util/alert"
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/jonboulle/clockwork"
)

var (
Expand Down Expand Up @@ -217,15 +218,17 @@ type Queue struct {

mu sync.Mutex //protects started and pq
started bool
clock clockwork.Clock
}

func NewQueue(store IStore, gossipManager interfaces.GossipService) *Queue {
storeMap := storemap.New(gossipManager)
func NewQueue(store IStore, gossipManager interfaces.GossipService, clock clockwork.Clock) *Queue {
storeMap := storemap.New(gossipManager, clock)
return &Queue{
storeMap: storeMap,
pq: &priorityQueue{},
store: store,
maxSize: 100,
clock: clock,
}
}

Expand Down Expand Up @@ -312,7 +315,7 @@ func (rq *Queue) MaybeAdd(ctx context.Context, replica IReplica) {
shardID: replica.ShardID(),
replicaID: replica.ReplicaID(),
priority: priority,
insertTime: time.Now(),
insertTime: rq.clock.Now(),
}
rq.push(item)
log.Infof("queued replica rangeID=%d", item.rangeID)
Expand All @@ -337,13 +340,13 @@ func (rq *Queue) remove(item *pqItem) {
}

func (rq *Queue) Start(ctx context.Context) {
queueDelay := time.NewTicker(queueWaitDuration)
queueDelay := rq.clock.NewTicker(queueWaitDuration)
defer queueDelay.Stop()
for {
select {
case <-ctx.Done():
return
case <-queueDelay.C:
case <-queueDelay.Chan():
}
if rq.Len() == 0 {
continue
Expand Down Expand Up @@ -510,7 +513,7 @@ func (rq *Queue) findRemovableReplicas(rd *rfpb.RangeDescriptor, brandNewReplica
func (rq *Queue) findReplicaForRemoval(rd *rfpb.RangeDescriptor, localRepl IReplica) *rfpb.ReplicaDescriptor {
lastReplIDAdded := rd.LastAddedReplicaId
if lastReplIDAdded != nil && rd.LastReplicaAddedAtUsec != nil {
if time.Since(time.UnixMicro(rd.GetLastReplicaAddedAtUsec())) > *newReplicaGracePeriod {
if rq.clock.Since(time.UnixMicro(rd.GetLastReplicaAddedAtUsec())) > *newReplicaGracePeriod {
lastReplIDAdded = nil
}
}
Expand Down Expand Up @@ -584,7 +587,7 @@ func (rq *Queue) applyChange(ctx context.Context, change *change) error {
if change.addOp != nil {
rsp, err := rq.store.AddReplica(ctx, change.addOp)
if err != nil {
log.Errorf("AddReplica err: %s", err)
log.Errorf("AddReplica %+v err: %s", change.addOp, err)
return err
}
log.Infof("AddReplicaRequest finished: %+v", change.addOp)
Expand All @@ -596,7 +599,7 @@ func (rq *Queue) applyChange(ctx context.Context, change *change) error {
}
_, err := rq.store.RemoveReplica(ctx, change.removeOp)
if err != nil {
log.Errorf("RemoveReplica err: %s", err)
log.Errorf("RemoveReplica %+v err: %s", change.removeOp, err)
return err
}
log.Infof("RemoveReplicaRequest finished: %+v", change.removeOp)
Expand Down
2 changes: 1 addition & 1 deletion enterprise/server/raft/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func NewWithArgs(env environment.Env, rootDir string, nodeHost *dragonboat.NodeH
usages, err := usagetracker.New(s, gossipManager, s.NodeDescriptor(), partitions, s.AddEventListener())

if *enableDriver {
s.driverQueue = driver.NewQueue(s, gossipManager)
s.driverQueue = driver.NewQueue(s, gossipManager, clock)
}
s.deleteSessionWorker = newDeleteSessionsWorker(clock, s)

Expand Down
1 change: 1 addition & 0 deletions enterprise/server/raft/storemap/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//server/util/proto",
"//server/util/status",
"@com_github_hashicorp_serf//serf",
"@com_github_jonboulle_clockwork//:clockwork",
],
)

Expand Down
17 changes: 10 additions & 7 deletions enterprise/server/raft/storemap/storemap.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
"github.com/buildbuddy-io/buildbuddy/server/util/status"
"github.com/hashicorp/serf/serf"
"github.com/jonboulle/clockwork"

rfpb "github.com/buildbuddy-io/buildbuddy/proto/raft"
)
Expand Down Expand Up @@ -43,14 +44,16 @@ type StoreMap struct {
storeDetails map[string]*StoreDetail

startTime time.Time
clock clockwork.Clock
}

func New(gossipManager interfaces.GossipService) *StoreMap {
func New(gossipManager interfaces.GossipService, clock clockwork.Clock) *StoreMap {
sm := &StoreMap{
mu: &sync.RWMutex{},
startTime: time.Now(),
storeDetails: make(map[string]*StoreDetail),
gossipManager: gossipManager,
clock: clock,
}
gossipManager.AddListener(sm)
return sm
Expand All @@ -68,7 +71,7 @@ func (sm *StoreMap) DivideByStatus(repls []*rfpb.ReplicaDescriptor) *ReplicasByS
res := &ReplicasByStatus{}
for _, repl := range repls {
detail := sm.getDetail(repl.GetNhid())
status := detail.status()
status := detail.status(sm.clock)
switch status {
case storeStatusAvailable:
res.LiveReplicas = append(res.LiveReplicas, repl)
Expand All @@ -93,8 +96,8 @@ func (sm *StoreMap) getDetail(nhid string) *StoreDetail {

}

func (sd *StoreDetail) status() storeStatus {
if !sd.lastUnavailableAt.IsZero() && time.Since(sd.lastUnavailableAt) > *deadStoreTimeout {
func (sd *StoreDetail) status(clock clockwork.Clock) storeStatus {
if !sd.lastUnavailableAt.IsZero() && clock.Since(sd.lastUnavailableAt) > *deadStoreTimeout {
return storeStatusDead
}

Expand All @@ -121,7 +124,7 @@ func (sm *StoreMap) updateStoreDetail(nhid string, usage *rfpb.StoreUsage, nodeS
if usage != nil {
detail.usage = usage
}
now := time.Now()
now := sm.clock.Now()
if nodeStatus != serf.StatusAlive {
detail.lastUnavailableAt = now
} else {
Expand Down Expand Up @@ -198,7 +201,7 @@ func (sm *StoreMap) GetStoresWithStats() *StoresWithStats {

alive := make([]*rfpb.StoreUsage, 0, len(sm.storeDetails))
for _, sd := range sm.storeDetails {
status := sd.status()
status := sd.status(sm.clock)
if status == storeStatusAvailable {
alive = append(alive, sd.usage)
}
Expand All @@ -216,7 +219,7 @@ func (sm *StoreMap) GetStoresWithStatsFromIDs(nhids []string) *StoresWithStats {
if !ok {
continue
}
status := sd.status()
status := sd.status(sm.clock)
if status == storeStatusAvailable || status == storeStatusSuspect {
alive = append(alive, sd.usage)
}
Expand Down

0 comments on commit 48545ef

Please sign in to comment.