From a6255c5a569fb3eb0fe267ee974956fb716da8ea Mon Sep 17 00:00:00 2001 From: Lulu Zhang Date: Wed, 11 Dec 2024 11:25:49 -0800 Subject: [PATCH] [raft] move remove data out of removeReplica (#8043) Also fix RemoveNodeFromCluster test. Sometimes test fail with shardNotFound when we call removeReplica on the store where c2n4 resides. Seperate RemoveData from RemoveReplica so that we can call try removeReplica on different replicas (in following PRs) https://github.com/buildbuddy-io/buildbuddy-internal/issues/4220 --- enterprise/server/raft/driver/BUILD | 1 + enterprise/server/raft/driver/driver.go | 31 +++++++++++++++++++-- enterprise/server/raft/store/store.go | 32 ++++++---------------- enterprise/server/raft/store/store_test.go | 15 +++++++++- 4 files changed, 51 insertions(+), 28 deletions(-) diff --git a/enterprise/server/raft/driver/BUILD b/enterprise/server/raft/driver/BUILD index d33d07bffac..1b1969033f0 100644 --- a/enterprise/server/raft/driver/BUILD +++ b/enterprise/server/raft/driver/BUILD @@ -6,6 +6,7 @@ go_library( importpath = "github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/driver", visibility = ["//visibility:public"], deps = [ + "//enterprise/server/raft/client", "//enterprise/server/raft/config", "//enterprise/server/raft/constants", "//enterprise/server/raft/header", diff --git a/enterprise/server/raft/driver/driver.go b/enterprise/server/raft/driver/driver.go index 8a168b78440..b9cf8996e93 100644 --- a/enterprise/server/raft/driver/driver.go +++ b/enterprise/server/raft/driver/driver.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/client" "github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/config" "github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/constants" "github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/header" @@ -300,15 +301,16 @@ type Queue struct { pq *priorityQueue pqItemMap map[uint64]*pqItem - clock clockwork.Clock - log log.Logger + clock clockwork.Clock + log log.Logger + apiClient *client.APIClient eg *errgroup.Group egCtx context.Context egCancel context.CancelFunc } -func NewQueue(store IStore, gossipManager interfaces.GossipService, nhlog log.Logger, clock clockwork.Clock) *Queue { +func NewQueue(store IStore, gossipManager interfaces.GossipService, nhlog log.Logger, apiClient *client.APIClient, clock clockwork.Clock) *Queue { storeMap := storemap.New(gossipManager, clock) ctx, cancelFunc := context.WithCancel(context.Background()) eg, gctx := errgroup.WithContext(ctx) @@ -320,6 +322,7 @@ func NewQueue(store IStore, gossipManager interfaces.GossipService, nhlog log.Lo maxSize: 100, clock: clock, log: nhlog, + apiClient: apiClient, eg: eg, egCtx: gctx, @@ -1043,6 +1046,28 @@ func (rq *Queue) applyChange(ctx context.Context, change *change) error { return err } rq.log.Infof("RemoveReplicaRequest finished: %+v", change.removeOp) + + replicaDesc := &rfpb.ReplicaDescriptor{RangeId: change.removeOp.GetRange().GetRangeId(), ReplicaId: change.removeOp.GetReplicaId()} + // Remove the data from the now stopped node. This is best-effort only, + // because we can remove the replica when the node is dead; and in this case, + // we won't be able to connect to the node. + c, err := rq.apiClient.GetForReplica(ctx, replicaDesc) + if err != nil { + rq.log.Warningf("RemoveReplica unable to remove data on c%dn%d, err getting api client: %s", replicaDesc.GetRangeId(), replicaDesc.GetReplicaId(), err) + return nil + } + _, err = c.RemoveData(ctx, &rfpb.RemoveDataRequest{ + RangeId: replicaDesc.GetRangeId(), + ReplicaId: replicaDesc.GetReplicaId(), + Start: change.removeOp.GetRange().GetStart(), + End: change.removeOp.GetRange().GetEnd(), + }) + if err != nil { + rq.log.Warningf("RemoveReplica unable to remove data err: %s", err) + return nil + } + + rq.log.Infof("Removed shard: c%dn%d", replicaDesc.GetRangeId(), replicaDesc.GetReplicaId()) } if change.transferLeadershipOp != nil { _, err := rq.store.TransferLeadership(ctx, change.transferLeadershipOp) diff --git a/enterprise/server/raft/store/store.go b/enterprise/server/raft/store/store.go index afd461f28bd..979d9a1b4f4 100644 --- a/enterprise/server/raft/store/store.go +++ b/enterprise/server/raft/store/store.go @@ -267,7 +267,7 @@ func NewWithArgs(env environment.Env, rootDir string, nodeHost *dragonboat.NodeH usages, err := usagetracker.New(s.sender, s.leaser, gossipManager, s.NodeDescriptor(), partitions, clock) if *enableDriver { - s.driverQueue = driver.NewQueue(s, gossipManager, nhLog, clock) + s.driverQueue = driver.NewQueue(s, gossipManager, nhLog, apiClient, clock) } s.deleteSessionWorker = newDeleteSessionsWorker(clock, s) @@ -2283,12 +2283,15 @@ func (s *Store) RemoveReplica(ctx context.Context, req *rfpb.RemoveReplicaReques var replicaDesc *rfpb.ReplicaDescriptor for _, replica := range req.GetRange().GetReplicas() { if replica.GetReplicaId() == req.GetReplicaId() { + if replica.GetNhid() == s.NHID() { + return nil, status.InvalidArgumentErrorf("c%dn%d is on the node %s: cannot remove", req.GetRange().GetRangeId(), req.GetReplicaId(), s.NHID()) + } replicaDesc = replica break } } if replicaDesc == nil { - return nil, status.FailedPreconditionErrorf("No node with id %d found in range: %+v", req.GetReplicaId(), req.GetRange()) + return nil, status.FailedPreconditionErrorf("No replica with replica_id %d found in range: %+v", req.GetReplicaId(), req.GetRange()) } // First, update the range descriptor information to reflect the @@ -2298,35 +2301,16 @@ func (s *Store) RemoveReplica(ctx context.Context, req *rfpb.RemoveReplicaReques return nil, err } - if err = s.syncRequestDeleteReplica(ctx, replicaDesc.GetRangeId(), replicaDesc.GetReplicaId()); err != nil { - return nil, err + if err = s.syncRequestDeleteReplica(ctx, req.GetRange().GetRangeId(), req.GetReplicaId()); err != nil { + return nil, status.InternalErrorf("nodehost.SyncRequestDeleteReplica failed for c%dn%d: %s", req.GetRange().GetRangeId(), req.GetReplicaId(), err) } rsp := &rfpb.RemoveReplicaResponse{ Range: rd, } - // Remove the data from the now stopped node. This is best-effort only, - // because we can remove the replica when the node is dead; and in this case, - // we won't be able to connect to the node. - c, err := s.apiClient.GetForReplica(ctx, replicaDesc) - if err != nil { - s.log.Warningf("RemoveReplica unable to remove data on c%dn%d, err getting api client: %s", replicaDesc.GetRangeId(), replicaDesc.GetReplicaId(), err) - return rsp, nil - } - _, err = c.RemoveData(ctx, &rfpb.RemoveDataRequest{ - RangeId: replicaDesc.GetRangeId(), - ReplicaId: replicaDesc.GetReplicaId(), - Start: rd.GetStart(), - End: rd.GetEnd(), - }) - if err != nil { - s.log.Warningf("RemoveReplica unable to remove data err: %s", err) - return rsp, nil - } - - s.log.Infof("Removed shard: c%dn%d", replicaDesc.GetRangeId(), replicaDesc.GetReplicaId()) return rsp, nil + } func (s *Store) reserveReplicaIDs(ctx context.Context, n int) ([]uint64, error) { diff --git a/enterprise/server/raft/store/store_test.go b/enterprise/server/raft/store/store_test.go index 8cd12865667..d6b7f66f05c 100644 --- a/enterprise/server/raft/store/store_test.go +++ b/enterprise/server/raft/store/store_test.go @@ -267,6 +267,10 @@ func TestAddNodeToCluster(t *testing.T) { } func TestRemoveNodeFromCluster(t *testing.T) { + // disable txn cleanup and zombie scan, because advance the fake clock can + // prematurely trigger txn cleanup and zombie cleanup. + flags.Set(t, "cache.raft.enable_txn_cleanup", false) + flags.Set(t, "cache.raft.zombie_node_scan_interval", 0) sf := testutil.NewStoreFactory(t) s1 := sf.NewStore(t) s2 := sf.NewStore(t) @@ -277,10 +281,19 @@ func TestRemoveNodeFromCluster(t *testing.T) { s := testutil.GetStoreWithRangeLease(t, ctx, stores, 2) + // RemoveReplica can't remove the replica on its own machine. rd := s.GetRange(2) + replicaIdToRemove := uint64(0) + for _, repl := range rd.GetReplicas() { + if repl.GetNhid() != s.NHID() { + replicaIdToRemove = repl.GetReplicaId() + break + } + } + log.Infof("remove replica c%dn%d", rd.GetRangeId(), replicaIdToRemove) _, err := s.RemoveReplica(ctx, &rfpb.RemoveReplicaRequest{ Range: rd, - ReplicaId: 4, + ReplicaId: replicaIdToRemove, }) require.NoError(t, err)