Skip to content

Commit

Permalink
[raft] Remove data, allow remove data when only range_id is available (
Browse files Browse the repository at this point in the history
…#8191)

Also, include the error when failing to get the range descriptor
  • Loading branch information
luluz66 authored Jan 15, 2025
1 parent 7020e60 commit f8d6fdf
Showing 1 changed file with 29 additions and 15 deletions.
44 changes: 29 additions & 15 deletions enterprise/server/raft/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,12 +1117,36 @@ func (s *Store) removeAndStopReplica(ctx context.Context, rd *rfpb.RangeDescript
return nil
}

func (s *Store) syncRemoveData(ctx context.Context, rangeID, replicaID uint64) error {
err := client.RunNodehostFn(ctx, func(ctx context.Context) error {
err := s.nodeHost.SyncRemoveData(ctx, rangeID, replicaID)
// If the shard is not stopped, we want to retry SyncRemoveData call.
if err == dragonboat.ErrShardNotStopped {
err = dragonboat.ErrTimeout
}
return err
})
if err != nil {
return status.InternalErrorf("failed to remove data of c%dn%d from raft: %s", rangeID, replicaID, err)
}
return nil
}

// RemoveData tries to remove all data associated with the specified node (shard, replica). It waits for the node (shard, replica) to be fully offloaded or the context is cancelled. This method should only be used after the node is deleted from its Raft cluster.
func (s *Store) RemoveData(ctx context.Context, req *rfpb.RemoveDataRequest) (*rfpb.RemoveDataResponse, error) {
if req.GetRangeId() != 0 {
if err := s.syncRemoveData(ctx, req.GetRangeId(), req.GetReplicaId()); err != nil {
return nil, err
}
return &rfpb.RemoveDataResponse{}, nil
}
rd := req.GetRange()
if rd == nil {
return nil, status.InvalidArgumentErrorf("need to specify either range_id or range")
}
remoteRD, err := s.Sender().LookupRangeDescriptor(ctx, rd.GetStart(), true /*skip Cache */)
if err != nil {
return nil, status.InternalErrorf("failed to look up range descriptor")
return nil, status.InternalErrorf("failed to look up range descriptor: %s", err)
}

if rd.GetRangeId() != remoteRD.GetRangeId() {
Expand All @@ -1144,21 +1168,11 @@ func (s *Store) RemoveData(ctx context.Context, req *rfpb.RemoveDataRequest) (*r
// This should not happen because we don't allow a range to be split while there are removals in progress.
if !bytes.Equal(remoteRD.GetStart(), rd.GetStart()) || !bytes.Equal(remoteRD.GetEnd(), rd.GetEnd()) {
err := status.InternalErrorf("range descriptor's range changed from [%q, %q) (gen: %d) to [%q, %q) (gen: %d) while there are replicas in process of removal", rd.GetStart(), rd.GetEnd(), rd.GetGeneration(), remoteRD.GetStart(), remoteRD.GetEnd(), remoteRD.GetGeneration())
s.log.Errorf("%s", err)
return nil, err
}
}

err = client.RunNodehostFn(ctx, func(ctx context.Context) error {
err := s.nodeHost.SyncRemoveData(ctx, rd.GetRangeId(), req.GetReplicaId())
// If the shard is not stopped, we want to retry SyncRemoveData call.
if err == dragonboat.ErrShardNotStopped {
err = dragonboat.ErrTimeout
}
return err
})
if err != nil {
return nil, status.InternalErrorf("failed to remove data of c%dn%d from raft: %s", rd.GetRangeId(), req.GetReplicaId(), err)
if err := s.syncRemoveData(ctx, req.GetRangeId(), req.GetReplicaId()); err != nil {
return nil, err
}

if shouldDeleteRange {
Expand All @@ -1167,7 +1181,7 @@ func (s *Store) RemoveData(ctx context.Context, req *rfpb.RemoveDataRequest) (*r
return nil, err
}
defer db.Close()
if err := db.DeleteRange(rd.GetStart(), rd.GetEnd(), pebble.NoSync); err != nil {
if err = db.DeleteRange(remoteRD.GetStart(), remoteRD.GetEnd(), pebble.NoSync); err != nil {
return nil, status.InternalErrorf("failed to delete data of c%dn%d from pebble: %s", req.GetRange().GetRangeId(), req.GetReplicaId(), err)
}

Expand All @@ -1178,7 +1192,7 @@ func (s *Store) RemoveData(ctx context.Context, req *rfpb.RemoveDataRequest) (*r
}
}

//update range descriptor: remove the replica descriptor from the removed list.
// update range descriptor: remove the replica descriptor from the removed list.
if markedForRemoval {
rd, err = s.removeReplicaFromRangeDescriptor(ctx, remoteRD.GetRangeId(), req.GetReplicaId(), remoteRD)
if err != nil {
Expand Down

0 comments on commit f8d6fdf

Please sign in to comment.