diff --git a/enterprise/server/raft/store/store_test.go b/enterprise/server/raft/store/store_test.go index b04c7a7bdc8..a642c7c5fca 100644 --- a/enterprise/server/raft/store/store_test.go +++ b/enterprise/server/raft/store/store_test.go @@ -437,6 +437,15 @@ func waitForReplicaToCatchUp(t testing.TB, ctx context.Context, r *replica.Repli } } +func includeReplicaWithNHID(rd *rfpb.RangeDescriptor, nhid string) bool { + for _, r := range rd.GetReplicas() { + if r.GetNhid() == nhid { + return true + } + } + return false +} + func getReplica(t testing.TB, s *testutil.TestingStore, rangeID uint64) *replica.Replica { for { res, err := s.GetReplica(rangeID) @@ -952,9 +961,7 @@ func TestDownReplicate(t *testing.T) { // Advance the clock to trigger scan replicas clock.Advance(61 * time.Second) - i := 0 for { - i++ clock.Advance(3 * time.Second) time.Sleep(100 * time.Millisecond) @@ -969,3 +976,73 @@ func TestDownReplicate(t *testing.T) { } } } + +func TestReplaceDeadReplica(t *testing.T) { + flags.Set(t, "cache.raft.max_range_size_bytes", 0) // disable auto splitting + // disable txn cleanup and zombie scan, because advance the fake clock can + // prematurely trigger txn cleanup and zombie cleanup + flags.Set(t, "cache.raft.enable_txn_cleanup", false) + flags.Set(t, "cache.raft.zombie_node_scan_interval", 0) + + clock := clockwork.NewFakeClock() + sf := testutil.NewStoreFactoryWithClock(t, clock) + s1 := sf.NewStore(t) + s2 := sf.NewStore(t) + s3 := sf.NewStore(t) + ctx := context.Background() + + // start shards for s1, s2, s3 + stores := []*testutil.TestingStore{s1, s2, s3} + sf.StartShard(t, ctx, stores...) + + { // Verify that there are 2 replicas for range 2, and also write 10 records + s := getStoreWithRangeLease(t, ctx, stores, 2) + writeNRecords(ctx, t, s, 10) + replicas := getMembership(t, s, ctx, 2) + require.Equal(t, 3, len(replicas)) + rd := s.GetRange(2) + require.Equal(t, 3, len(rd.GetReplicas())) + } + + s := getStoreWithRangeLease(t, ctx, stores, 2) + r, err := s.GetReplica(2) + require.NoError(t, err) + desiredAppliedIndex, err := r.LastAppliedIndex() + require.NoError(t, err) + + s4 := sf.NewStore(t) + // Stop store 3 + s3.Stop() + + nhid3 := s3.NodeHost().ID() + nhid4 := s4.NodeHost().ID() + + // Advance the clock pass the cache.raft.dead_store_timeout so s3 is considered dead. + clock.Advance(5*time.Minute + 1*time.Second) + for { + // advance the clock to trigger scan replicas + clock.Advance(61 * time.Second) + // wait some time to allow let driver queue execute + time.Sleep(100 * time.Millisecond) + list, err := s4.ListReplicas(ctx, &rfpb.ListReplicasRequest{}) + require.NoError(t, err) + if len(list.GetReplicas()) < 2 { + // s4 should have two ranges + continue + } + + if !includeReplicaWithNHID(s1.GetRange(1), nhid3) && + !includeReplicaWithNHID(s1.GetRange(2), nhid3) && + !includeReplicaWithNHID(s2.GetRange(1), nhid3) && + !includeReplicaWithNHID(s2.GetRange(2), nhid3) { + // nhid4 should be added to range 1 and range2, and nhid3 removed + require.True(t, includeReplicaWithNHID(s1.GetRange(1), nhid4)) + require.True(t, includeReplicaWithNHID(s1.GetRange(2), nhid4)) + require.True(t, includeReplicaWithNHID(s2.GetRange(1), nhid4)) + require.True(t, includeReplicaWithNHID(s2.GetRange(2), nhid4)) + break + } + } + r2 := getReplica(t, s4, 2) + waitForReplicaToCatchUp(t, ctx, r2, desiredAppliedIndex) +} diff --git a/enterprise/server/raft/testutil/testutil.go b/enterprise/server/raft/testutil/testutil.go index f75f9f106a4..4bd6c0e2811 100644 --- a/enterprise/server/raft/testutil/testutil.go +++ b/enterprise/server/raft/testutil/testutil.go @@ -39,15 +39,6 @@ func localAddr(t *testing.T) string { return fmt.Sprintf("127.0.0.1:%d", testport.FindFree(t)) } -func newGossipManager(t testing.TB, nodeAddr string, seeds []string) *gossip.GossipManager { - node, err := gossip.New("name-"+nodeAddr, nodeAddr, seeds) - require.NoError(t, err) - t.Cleanup(func() { - node.Shutdown() - }) - return node -} - type StoreFactory struct { rootDir string fileDir string @@ -85,10 +76,12 @@ func (sf *StoreFactory) Registry() registry.NodeRegistry { func (sf *StoreFactory) NewStore(t *testing.T) *TestingStore { nodeAddr := localAddr(t) - gm := newGossipManager(t, nodeAddr, sf.gossipAddrs) + gm, err := gossip.New("name-"+nodeAddr, nodeAddr, sf.gossipAddrs) + require.NoError(t, err) sf.gossipAddrs = append(sf.gossipAddrs, nodeAddr) ts := &TestingStore{ + gm: gm, RaftAddress: localAddr(t), GRPCAddress: localAddr(t), RootDir: filepath.Join(sf.rootDir, fmt.Sprintf("store-%d", len(sf.gossipAddrs))), @@ -139,10 +132,7 @@ func (sf *StoreFactory) NewStore(t *testing.T) *TestingStore { ts.Store = store t.Cleanup(func() { - ctx := context.Background() - ctx, cancelFn := context.WithTimeout(ctx, 3*time.Second) - defer cancelFn() - store.Stop(ctx) + ts.Stop() }) return ts } @@ -160,9 +150,11 @@ type TestingStore struct { db pebble.IPebbleDB + gm *gossip.GossipManager RootDir string RaftAddress string GRPCAddress string + closed bool } func (ts *TestingStore) DB() pebble.IPebbleDB { @@ -174,6 +166,19 @@ func (ts *TestingStore) NewReplica(shardID, replicaID uint64) *replica.Replica { return sm.(*replica.Replica) } +func (ts *TestingStore) Stop() { + if ts.closed { + return + } + ctx := context.Background() + ctx, cancelFn := context.WithTimeout(ctx, 3*time.Second) + defer cancelFn() + ts.Store.Stop(ctx) + ts.gm.Leave() + ts.gm.Shutdown() + ts.closed = true +} + func (sf *StoreFactory) StartShard(t *testing.T, ctx context.Context, stores ...*TestingStore) { require.Greater(t, len(stores), 0) err := bringup.SendStartShardRequests(ctx, client.NewSessionWithClock(sf.clock), stores[0].NodeHost(), stores[0].APIClient(), MakeNodeGRPCAddressesMap(stores...))