Skip to content

Commit

Permalink
Revert "feat(storage): reset to the latest committed epoch in recovery (
Browse files Browse the repository at this point in the history
#14923)"

This reverts commit b2bda85.
  • Loading branch information
TennyZhuang committed Feb 20, 2024
1 parent 2592880 commit d7396b2
Show file tree
Hide file tree
Showing 27 changed files with 235 additions and 458 deletions.
1 change: 0 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ message DropActorsResponse {

message ForceStopActorsRequest {
string request_id = 1;
uint64 prev_epoch = 2;
}

message ForceStopActorsResponse {
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl StreamService for StreamServiceImpl {
request: Request<ForceStopActorsRequest>,
) -> std::result::Result<Response<ForceStopActorsResponse>, Status> {
let req = request.into_inner();
self.mgr.reset(req.prev_epoch).await;
self.mgr.reset().await;
Ok(Response::new(ForceStopActorsResponse {
request_id: req.request_id,
status: None,
Expand Down
6 changes: 4 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,9 @@ impl GlobalBarrierManager {
let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
let paused_reason = paused.then_some(PausedReason::Manual);

self.recovery(paused_reason).instrument(span).await;
self.recovery(prev_epoch, paused_reason)
.instrument(span)
.await;
}

self.context.set_status(BarrierManagerStatus::Running);
Expand Down Expand Up @@ -768,7 +770,7 @@ impl GlobalBarrierManager {

// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
self.recovery(None).instrument(span).await;
self.recovery(prev_epoch, None).instrument(span).await;
self.context.set_status(BarrierManagerStatus::Running);
} else {
panic!("failed to execute barrier: {}", err.as_report());
Expand Down
44 changes: 23 additions & 21 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,7 @@ impl GlobalBarrierManager {
/// the cluster or `risectl` command. Used for debugging purpose.
///
/// Returns the new state of the barrier manager after recovery.
pub async fn recovery(&mut self, paused_reason: Option<PausedReason>) {
let prev_epoch = TracedEpoch::new(
self.context
.hummock_manager
.latest_snapshot()
.committed_epoch
.into(),
);
pub async fn recovery(&mut self, prev_epoch: TracedEpoch, paused_reason: Option<PausedReason>) {
// Mark blocked and abort buffered schedules, they might be dirty already.
self.scheduled_barriers
.abort_and_mark_blocked("cluster is under recovering");
Expand Down Expand Up @@ -406,11 +399,9 @@ impl GlobalBarrierManager {
};

// Reset all compute nodes, stop and drop existing actors.
self.reset_compute_nodes(&info, prev_epoch.value().0)
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "reset compute nodes failed");
})?;
self.reset_compute_nodes(&info).await.inspect_err(|err| {
warn!(error = %err.as_report(), "reset compute nodes failed");
})?;

if self.pre_apply_drop_cancel().await? {
info = self
Expand Down Expand Up @@ -456,6 +447,21 @@ impl GlobalBarrierManager {
tracing::Span::current(), // recovery span
));

#[cfg(not(all(test, feature = "failpoints")))]
{
use risingwave_common::util::epoch::INVALID_EPOCH;

let mce = self
.context
.hummock_manager
.get_current_max_committed_epoch()
.await;

if mce != INVALID_EPOCH {
command_ctx.wait_epoch_commit(mce).await?;
}
};

let res = match self
.context
.inject_barrier(command_ctx.clone(), None, None)
Expand Down Expand Up @@ -1049,18 +1055,14 @@ impl GlobalBarrierManager {
}

/// Reset all compute nodes by calling `force_stop_actors`.
async fn reset_compute_nodes(
&self,
info: &InflightActorInfo,
prev_epoch: u64,
) -> MetaResult<()> {
debug!(prev_epoch, worker = ?info.node_map.keys().collect_vec(), "force stop actors");
async fn reset_compute_nodes(&self, info: &InflightActorInfo) -> MetaResult<()> {
debug!(worker = ?info.node_map.keys().collect_vec(), "force stop actors");
self.context
.stream_rpc_manager
.force_stop_actors(info.node_map.values(), prev_epoch)
.force_stop_actors(info.node_map.values())
.await?;

debug!(prev_epoch, "all compute nodes have been reset.");
debug!("all compute nodes have been reset.");

Ok(())
}
Expand Down
2 changes: 0 additions & 2 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,11 @@ impl StreamRpcManager {
pub async fn force_stop_actors(
&self,
nodes: impl Iterator<Item = &WorkerNode>,
prev_epoch: u64,
) -> MetaResult<()> {
self.broadcast(nodes, |client| async move {
client
.force_stop_actors(ForceStopActorsRequest {
request_id: Self::new_request_id(),
prev_epoch,
})
.await
})
Expand Down
8 changes: 6 additions & 2 deletions src/storage/hummock_test/src/bin/replay/replay_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,12 @@ impl ReplayStateStore for GlobalReplayImpl {
Ok(())
}

async fn clear_shared_buffer(&self, prev_epoch: u64) {
self.store.clear_shared_buffer(prev_epoch).await
async fn clear_shared_buffer(&self) -> Result<()> {
self.store
.clear_shared_buffer()
.await
.map_err(|_| TraceError::ClearSharedBufferFailed)?;
Ok(())
}
}
pub(crate) struct LocalReplayImpl(LocalHummockStorage);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() {

drop(local_hummock_storage);

hummock_storage.clear_shared_buffer(epoch1).await;
hummock_storage.clear_shared_buffer().await.unwrap();

assert_eq!(
hummock_storage
Expand Down
8 changes: 4 additions & 4 deletions src/storage/hummock_test/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use risingwave_storage::filter_key_extractor::{
RpcFilterKeyExtractorManager,
};
use risingwave_storage::hummock::backup_reader::BackupReader;
use risingwave_storage::hummock::event_handler::HummockVersionUpdate;
use risingwave_storage::hummock::event_handler::{HummockEvent, HummockVersionUpdate};
use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
use risingwave_storage::hummock::observer_manager::HummockObserverNode;
Expand All @@ -53,8 +53,8 @@ pub async fn prepare_first_valid_version(
worker_node: WorkerNode,
) -> (
PinnedVersion,
UnboundedSender<HummockVersionUpdate>,
UnboundedReceiver<HummockVersionUpdate>,
UnboundedSender<HummockEvent>,
UnboundedReceiver<HummockEvent>,
) {
let (tx, mut rx) = unbounded_channel();
let notification_client =
Expand All @@ -73,7 +73,7 @@ pub async fn prepare_first_valid_version(
.await;
observer_manager.start().await;
let hummock_version = match rx.recv().await {
Some(HummockVersionUpdate::PinnedVersion(version)) => version,
Some(HummockEvent::VersionUpdate(HummockVersionUpdate::PinnedVersion(version))) => version,
_ => unreachable!("should be full version"),
};

Expand Down
7 changes: 2 additions & 5 deletions src/storage/hummock_trace/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,8 @@ impl TraceSpan {
Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type)
}

pub fn new_clear_shared_buffer_span(prev_epoch: u64) -> MayTraceSpan {
Self::new_global_op(
Operation::ClearSharedBuffer(prev_epoch),
StorageType::Global,
)
pub fn new_clear_shared_buffer_span() -> MayTraceSpan {
Self::new_global_op(Operation::ClearSharedBuffer, StorageType::Global)
}

pub fn new_validate_read_epoch_span(epoch: HummockReadEpoch) -> MayTraceSpan {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/hummock_trace/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub enum Operation {
TryWaitEpoch(TracedHummockReadEpoch),

/// clear shared buffer
ClearSharedBuffer(u64),
ClearSharedBuffer,

/// Seal current epoch
SealCurrentEpoch {
Expand Down Expand Up @@ -299,6 +299,7 @@ pub enum OperationResult {
Sync(TraceResult<usize>),
NotifyHummock(TraceResult<()>),
TryWaitEpoch(TraceResult<()>),
ClearSharedBuffer(TraceResult<()>),
ValidateReadEpoch(TraceResult<()>),
LocalStorageEpoch(TraceResult<u64>),
LocalStorageIsDirty(TraceResult<bool>),
Expand Down
4 changes: 2 additions & 2 deletions src/storage/hummock_trace/src/replay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub trait ReplayStateStore {
async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result<u64>;
async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>;
async fn clear_shared_buffer(&self, prev_epoch: u64);
async fn clear_shared_buffer(&self) -> Result<()>;
fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>;
}

Expand Down Expand Up @@ -152,7 +152,7 @@ mock! {
) -> Result<u64>;
async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>;
async fn clear_shared_buffer(&self, prev_epoch: u64);
async fn clear_shared_buffer(&self) -> Result<()>;
fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>;
}
impl GlobalReplay for GlobalReplayInterface{}
Expand Down
17 changes: 15 additions & 2 deletions src/storage/hummock_trace/src/replay/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,22 @@ impl ReplayWorker {
);
}
}
Operation::ClearSharedBuffer(prev_epoch) => {
Operation::ClearSharedBuffer => {
assert_eq!(storage_type, StorageType::Global);
replay.clear_shared_buffer(prev_epoch).await;
let res = res_rx.recv().await.expect("recv result failed");
if let OperationResult::ClearSharedBuffer(expected) = res {
let actual = replay.clear_shared_buffer().await;
assert_eq!(
TraceResult::from(actual),
expected,
"clear_shared_buffer wrong"
);
} else {
panic!(
"wrong clear_shared_buffer result, expect epoch result, but got {:?}",
res
);
}
}
Operation::SealCurrentEpoch { epoch, opts } => {
assert_ne!(storage_type, StorageType::Global);
Expand Down
Loading

0 comments on commit d7396b2

Please sign in to comment.