From dd9b329e7f158c6b6a577cc75155162df147869f Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 1 Feb 2024 01:28:16 +0800 Subject: [PATCH 1/6] pass prev epoch in recovery --- proto/stream_service.proto | 1 + src/compute/src/rpc/service/stream_service.rs | 2 +- src/meta/src/barrier/recovery.rs | 25 ++++----- .../src/bin/replay/replay_impl.rs | 8 +-- .../hummock_test/src/state_store_tests.rs | 2 +- src/storage/hummock_trace/src/collector.rs | 7 ++- src/storage/hummock_trace/src/record.rs | 3 +- src/storage/hummock_trace/src/replay/mod.rs | 4 +- .../hummock_trace/src/replay/worker.rs | 17 +------ .../event_handler/hummock_event_handler.rs | 26 ++++++---- src/storage/src/hummock/event_handler/mod.rs | 4 +- .../src/hummock/event_handler/refiller.rs | 38 +++++--------- .../src/hummock/event_handler/uploader.rs | 51 ++++++++----------- .../src/hummock/store/hummock_storage.rs | 6 +-- src/storage/src/memory.rs | 2 +- src/storage/src/monitor/monitored_store.rs | 5 +- src/storage/src/monitor/traced_store.rs | 10 ++-- src/storage/src/panic_store.rs | 2 +- src/storage/src/store.rs | 2 +- src/storage/src/store_impl.rs | 14 ++--- .../common/log_store_impl/kv_log_store/mod.rs | 6 +-- src/stream/src/task/barrier_manager.rs | 14 +++-- src/stream/src/task/stream_manager.rs | 11 ++-- 23 files changed, 114 insertions(+), 146 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 462f5ff0256a6..e8c5d94a20ac3 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -46,6 +46,7 @@ message DropActorsResponse { message ForceStopActorsRequest { string request_id = 1; + uint64 prev_epoch = 2; } message ForceStopActorsResponse { diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index def9a534586bb..d496e20d51eb5 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -116,7 +116,7 @@ impl StreamService for StreamServiceImpl { request: Request, ) -> std::result::Result, Status> { let req = request.into_inner(); - self.mgr.reset().await; + self.mgr.reset(req.prev_epoch).await; Ok(Response::new(ForceStopActorsResponse { request_id: req.request_id, status: None, diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 1958ab03a166d..ac99b21b664a9 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -391,9 +391,11 @@ impl GlobalBarrierManagerContext { }; // Reset all compute nodes, stop and drop existing actors. - self.reset_compute_nodes(&info).await.inspect_err(|err| { - warn!(error = %err.as_report(), "reset compute nodes failed"); - })?; + self.reset_compute_nodes(&info, prev_epoch.value().0) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "reset compute nodes failed"); + })?; if self.pre_apply_drop_cancel(scheduled_barriers).await? { info = self.resolve_actor_info().await; @@ -432,16 +434,6 @@ impl GlobalBarrierManagerContext { tracing::Span::current(), // recovery span )); - #[cfg(not(all(test, feature = "failpoints")))] - { - use risingwave_common::util::epoch::INVALID_EPOCH; - - let mce = self.hummock_manager.get_current_max_committed_epoch().await; - - if mce != INVALID_EPOCH { - command_ctx.wait_epoch_commit(mce).await?; - } - }; let await_barrier_complete = self.inject_barrier(command_ctx.clone()).await; let res = match await_barrier_complete.await.result { Ok(response) => { @@ -952,13 +944,18 @@ impl GlobalBarrierManagerContext { } /// Reset all compute nodes by calling `force_stop_actors`. - async fn reset_compute_nodes(&self, info: &InflightActorInfo) -> MetaResult<()> { + async fn reset_compute_nodes( + &self, + info: &InflightActorInfo, + prev_epoch: u64, + ) -> MetaResult<()> { let futures = info.node_map.values().map(|worker_node| async move { let client = self.env.stream_client_pool().get(worker_node).await?; debug!(worker = ?worker_node.id, "force stop actors"); client .force_stop_actors(ForceStopActorsRequest { request_id: Uuid::new_v4().to_string(), + prev_epoch, }) .await }); diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index ef9b671da9b49..c3dedd6dbae46 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -186,12 +186,8 @@ impl ReplayStateStore for GlobalReplayImpl { Ok(()) } - async fn clear_shared_buffer(&self) -> Result<()> { - self.store - .clear_shared_buffer() - .await - .map_err(|_| TraceError::ClearSharedBufferFailed)?; - Ok(()) + async fn clear_shared_buffer(&self, prev_epoch: u64) { + self.store.clear_shared_buffer(prev_epoch).await } } pub(crate) struct LocalReplayImpl(LocalHummockStorage); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 8553c255b5375..12e821bf39516 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1328,7 +1328,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { min_object_id_epoch2, ); - hummock_storage.clear_shared_buffer().await.unwrap(); + hummock_storage.clear_shared_buffer(epoch1).await; let read_version = local_hummock_storage.read_version(); diff --git a/src/storage/hummock_trace/src/collector.rs b/src/storage/hummock_trace/src/collector.rs index 264d2445ca76d..b1a269a4620ee 100644 --- a/src/storage/hummock_trace/src/collector.rs +++ b/src/storage/hummock_trace/src/collector.rs @@ -214,8 +214,11 @@ impl TraceSpan { Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type) } - pub fn new_clear_shared_buffer_span() -> MayTraceSpan { - Self::new_global_op(Operation::ClearSharedBuffer, StorageType::Global) + pub fn new_clear_shared_buffer_span(prev_epoch: u64) -> MayTraceSpan { + Self::new_global_op( + Operation::ClearSharedBuffer(prev_epoch), + StorageType::Global, + ) } pub fn new_validate_read_epoch_span(epoch: HummockReadEpoch) -> MayTraceSpan { diff --git a/src/storage/hummock_trace/src/record.rs b/src/storage/hummock_trace/src/record.rs index 7bd0a86d0e222..f8e4ceed65449 100644 --- a/src/storage/hummock_trace/src/record.rs +++ b/src/storage/hummock_trace/src/record.rs @@ -170,7 +170,7 @@ pub enum Operation { TryWaitEpoch(TracedHummockReadEpoch), /// clear shared buffer - ClearSharedBuffer, + ClearSharedBuffer(u64), /// Seal current epoch SealCurrentEpoch { @@ -299,7 +299,6 @@ pub enum OperationResult { Sync(TraceResult), NotifyHummock(TraceResult<()>), TryWaitEpoch(TraceResult<()>), - ClearSharedBuffer(TraceResult<()>), ValidateReadEpoch(TraceResult<()>), LocalStorageEpoch(TraceResult), LocalStorageIsDirty(TraceResult), diff --git a/src/storage/hummock_trace/src/replay/mod.rs b/src/storage/hummock_trace/src/replay/mod.rs index 99aa1c0c37144..046ab67b18607 100644 --- a/src/storage/hummock_trace/src/replay/mod.rs +++ b/src/storage/hummock_trace/src/replay/mod.rs @@ -120,7 +120,7 @@ pub trait ReplayStateStore { async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; - async fn clear_shared_buffer(&self) -> Result<()>; + async fn clear_shared_buffer(&self, prev_epoch: u64); fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; } @@ -152,7 +152,7 @@ mock! { ) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; - async fn clear_shared_buffer(&self) -> Result<()>; + async fn clear_shared_buffer(&self, prev_epoch: u64); fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; } impl GlobalReplay for GlobalReplayInterface{} diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index 7bc821a26c5b9..4d37708420d48 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -327,22 +327,9 @@ impl ReplayWorker { ); } } - Operation::ClearSharedBuffer => { + Operation::ClearSharedBuffer(prev_epoch) => { assert_eq!(storage_type, StorageType::Global); - let res = res_rx.recv().await.expect("recv result failed"); - if let OperationResult::ClearSharedBuffer(expected) = res { - let actual = replay.clear_shared_buffer().await; - assert_eq!( - TraceResult::from(actual), - expected, - "clear_shared_buffer wrong" - ); - } else { - panic!( - "wrong clear_shared_buffer result, expect epoch result, but got {:?}", - res - ); - } + replay.clear_shared_buffer(prev_epoch).await; } Operation::SealCurrentEpoch { epoch, opts } => { assert_ne!(storage_type, StorageType::Global); diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index cccbb8da242e3..d45f09345a7a2 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -394,7 +394,7 @@ impl HummockEventHandler { } } - fn handle_clear(&mut self, notifier: oneshot::Sender<()>) { + async fn handle_clear(&mut self, notifier: oneshot::Sender<()>, prev_epoch: u64) { info!( "handle clear event. max_committed_epoch: {}, max_synced_epoch: {}, max_sealed_epoch: {}", self.uploader.max_committed_epoch(), @@ -524,8 +524,17 @@ impl HummockEventHandler { } event = pin!(self.hummock_event_rx.recv()) => { let Some(event) = event else { break }; - if self.handle_hummock_event(event) { - break; + match event { + HummockEvent::Clear(notifier, prev_epoch) => { + self.handle_clear(notifier, prev_epoch).await + }, + HummockEvent::Shutdown => { + info!("buffer tracker shutdown"); + return; + }, + event => { + self.handle_hummock_event(event); + } } } } @@ -565,7 +574,7 @@ impl HummockEventHandler { } /// Gracefully shutdown if returns `true`. - fn handle_hummock_event(&mut self, event: HummockEvent) -> bool { + fn handle_hummock_event(&mut self, event: HummockEvent) { match event { HummockEvent::BufferMayFlush => { self.uploader.may_flush(); @@ -576,14 +585,12 @@ impl HummockEventHandler { } => { self.handle_await_sync_epoch(new_sync_epoch, sync_result_sender); } - HummockEvent::Clear(notifier) => { - self.handle_clear(notifier); + HummockEvent::Clear(_, _) => { + unreachable!("clear is handled in separated async context") } HummockEvent::Shutdown => { - info!("buffer tracker shutdown"); - return true; + unreachable!("shutdown is handled specially") } - HummockEvent::VersionUpdate(version_payload) => { self.handle_version_update(version_payload); } @@ -702,7 +709,6 @@ impl HummockEventHandler { } } } - false } fn generate_instance_id(&mut self) -> LocalInstanceId { diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index ffce8c622fbd6..9446effa4d074 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -61,7 +61,7 @@ pub enum HummockEvent { }, /// Clear shared buffer and reset all states - Clear(oneshot::Sender<()>), + Clear(oneshot::Sender<()>, u64), Shutdown, @@ -109,7 +109,7 @@ impl HummockEvent { sync_result_sender: _, } => format!("AwaitSyncEpoch epoch {} ", new_sync_epoch), - HummockEvent::Clear(_) => "Clear".to_string(), + HummockEvent::Clear(_, prev_epoch) => format!("Clear {:?}", prev_epoch), HummockEvent::Shutdown => "Shutdown".to_string(), diff --git a/src/storage/src/hummock/event_handler/refiller.rs b/src/storage/src/hummock/event_handler/refiller.rs index 3be242d6b94ec..321eb31028673 100644 --- a/src/storage/src/hummock/event_handler/refiller.rs +++ b/src/storage/src/hummock/event_handler/refiller.rs @@ -13,10 +13,10 @@ // limitations under the License. use std::collections::{HashMap, HashSet, VecDeque}; -use std::ops::{Deref, DerefMut, Range}; -use std::pin::Pin; +use std::future::poll_fn; +use std::ops::{Deref, Range}; use std::sync::{Arc, LazyLock}; -use std::task::{ready, Context, Poll}; +use std::task::{ready, Poll}; use std::time::{Duration, Instant}; use foyer::common::code::Key; @@ -268,29 +268,19 @@ impl CacheRefiller { pub fn last_new_pinned_version(&self) -> Option<&PinnedVersion> { self.queue.back().map(|item| &item.event.new_pinned_version) } - - pub fn next_event(&mut self) -> NextCacheRefillerEvent<'_> { - NextCacheRefillerEvent { refiller: self } - } } -pub struct NextCacheRefillerEvent<'a> { - refiller: &'a mut CacheRefiller, -} - -impl<'a> Future for NextCacheRefillerEvent<'a> { - type Output = CacheRefillerEvent; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let refiller = &mut self.deref_mut().refiller; - - if let Some(item) = refiller.queue.front_mut() { - ready!(item.handle.poll_unpin(cx)).unwrap(); - let item = refiller.queue.pop_front().unwrap(); - GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.sub(1); - return Poll::Ready(item.event); - } - Poll::Pending +impl CacheRefiller { + pub fn next_event(&mut self) -> impl Future + '_ { + poll_fn(|cx| { + if let Some(item) = self.queue.front_mut() { + ready!(item.handle.poll_unpin(cx)).unwrap(); + let item = self.queue.pop_front().unwrap(); + GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.sub(1); + return Poll::Ready(item.event); + } + Poll::Pending + }) } } diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 1aeda293c84be..2d6f4bb59fffa 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -15,9 +15,8 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, VecDeque}; use std::fmt::{Debug, Display, Formatter}; -use std::future::Future; +use std::future::{poll_fn, Future}; use std::mem::swap; -use std::ops::DerefMut; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -1021,10 +1020,6 @@ impl HummockUploader { // TODO: call `abort` on the uploading task join handle } - - pub(crate) fn next_event(&mut self) -> NextUploaderEvent<'_> { - NextUploaderEvent { uploader: self } - } } impl HummockUploader { @@ -1116,10 +1111,6 @@ impl HummockUploader { } } -pub(crate) struct NextUploaderEvent<'a> { - uploader: &'a mut HummockUploader, -} - pub(crate) enum UploaderEvent { // staging sstable info of newer data comes first SyncFinish(HummockEpoch, Vec), @@ -1127,30 +1118,28 @@ pub(crate) enum UploaderEvent { ImmMerged(MergeImmTaskOutput), } -impl<'a> Future for NextUploaderEvent<'a> { - type Output = UploaderEvent; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let uploader = &mut self.deref_mut().uploader; - - if let Some((epoch, newly_uploaded_sstables)) = ready!(uploader.poll_syncing_task(cx)) { - return Poll::Ready(UploaderEvent::SyncFinish(epoch, newly_uploaded_sstables)); - } +impl HummockUploader { + pub(crate) fn next_event(&mut self) -> impl Future + '_ { + poll_fn(|cx| { + if let Some((epoch, newly_uploaded_sstables)) = ready!(self.poll_syncing_task(cx)) { + return Poll::Ready(UploaderEvent::SyncFinish(epoch, newly_uploaded_sstables)); + } - if let Some(sstable_info) = ready!(uploader.poll_sealed_spill_task(cx)) { - return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); - } + if let Some(sstable_info) = ready!(self.poll_sealed_spill_task(cx)) { + return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); + } - if let Some(sstable_info) = ready!(uploader.poll_unsealed_spill_task(cx)) { - return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); - } + if let Some(sstable_info) = ready!(self.poll_unsealed_spill_task(cx)) { + return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); + } - if let Some(merge_output) = ready!(uploader.poll_sealed_merge_imm_task(cx)) { - // add the merged imm into sealed data - uploader.update_sealed_data(&merge_output.merged_imm); - return Poll::Ready(UploaderEvent::ImmMerged(merge_output)); - } - Poll::Pending + if let Some(merge_output) = ready!(self.poll_sealed_merge_imm_task(cx)) { + // add the merged imm into sealed data + self.update_sealed_data(&merge_output.merged_imm); + return Poll::Ready(UploaderEvent::ImmMerged(merge_output)); + } + Poll::Pending + }) } } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 831411c11017a..5ff0d0542c7bb 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -467,10 +467,10 @@ impl StateStore for HummockStorage { StoreLocalStatistic::flush_all(); } - async fn clear_shared_buffer(&self) -> StorageResult<()> { + async fn clear_shared_buffer(&self, prev_epoch: u64) { let (tx, rx) = oneshot::channel(); self.hummock_event_sender - .send(HummockEvent::Clear(tx)) + .send(HummockEvent::Clear(tx, prev_epoch)) .expect("should send success"); rx.await.expect("should wait success"); @@ -478,8 +478,6 @@ impl StateStore for HummockStorage { self.min_current_epoch .store(HummockEpoch::MAX, MemOrdering::SeqCst); self.seal_epoch.store(epoch, MemOrdering::SeqCst); - - Ok(()) } fn new_local(&self, option: NewLocalOptions) -> impl Future + Send + '_ { diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 1e77b5f5652bd..007260fef7350 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -635,7 +635,7 @@ impl StateStore for RangeKvStateStore { fn seal_epoch(&self, _epoch: u64, _is_checkpoint: bool) {} #[allow(clippy::unused_async)] - async fn clear_shared_buffer(&self) -> StorageResult<()> { + async fn clear_shared_buffer(&self, _prev_epoch: u64) { unimplemented!("recovery not supported") } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 6e3b9b3db0fc0..1bea7f6742c68 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -331,11 +331,10 @@ impl StateStore for MonitoredStateStore { panic!("the state store is already monitored") } - fn clear_shared_buffer(&self) -> impl Future> + Send + '_ { + fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_ { self.inner - .clear_shared_buffer() + .clear_shared_buffer(prev_epoch) .verbose_instrument_await("store_clear_shared_buffer") - .inspect_err(|e| error!(error = %e.as_report(), "Failed in clear_shared_buffer")) } async fn new_local(&self, option: NewLocalOptions) -> Self::Local { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 95a5c835407d6..8cf96a231ead0 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -250,13 +250,9 @@ impl StateStore for TracedStateStore { self.inner.seal_epoch(epoch, is_checkpoint); } - async fn clear_shared_buffer(&self) -> StorageResult<()> { - let span = TraceSpan::new_clear_shared_buffer_span(); - let res = self.inner.clear_shared_buffer().await; - span.may_send_result(OperationResult::ClearSharedBuffer( - res.as_ref().map(|o| *o).into(), - )); - res + async fn clear_shared_buffer(&self, prev_epoch: u64) { + let _span = TraceSpan::new_clear_shared_buffer_span(prev_epoch); + self.inner.clear_shared_buffer(prev_epoch).await; } async fn new_local(&self, options: NewLocalOptions) -> Self::Local { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 08b0663b4e220..5299cac9fe085 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -154,7 +154,7 @@ impl StateStore for PanicStateStore { } #[allow(clippy::unused_async)] - async fn clear_shared_buffer(&self) -> StorageResult<()> { + async fn clear_shared_buffer(&self, _prev_epoch: u64) { panic!("should not clear shared buffer from the panic state store!"); } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 0daca6aa5305d..7c8353dc0f30f 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -191,7 +191,7 @@ pub trait StateStore: StateStoreRead + StaticSendSync + Clone { /// Clears contents in shared buffer. /// This method should only be called when dropping all actors in the local compute node. - fn clear_shared_buffer(&self) -> impl Future> + Send + '_; + fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_; fn new_local(&self, option: NewLocalOptions) -> impl Future + Send + '_; diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 4502236c0a20a..f1316fe7e20c8 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -482,8 +482,8 @@ pub mod verify { self.actual.seal_epoch(epoch, is_checkpoint) } - fn clear_shared_buffer(&self) -> impl Future> + Send + '_ { - self.actual.clear_shared_buffer() + fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_ { + self.actual.clear_shared_buffer(prev_epoch) } async fn new_local(&self, option: NewLocalOptions) -> Self::Local { @@ -927,7 +927,7 @@ pub mod boxed_state_store { fn seal_epoch(&self, epoch: u64, is_checkpoint: bool); - async fn clear_shared_buffer(&self) -> StorageResult<()>; + async fn clear_shared_buffer(&self, prev_epoch: u64); async fn new_local(&self, option: NewLocalOptions) -> BoxDynamicDispatchedLocalStateStore; @@ -948,8 +948,8 @@ pub mod boxed_state_store { self.seal_epoch(epoch, is_checkpoint); } - async fn clear_shared_buffer(&self) -> StorageResult<()> { - self.clear_shared_buffer().await + async fn clear_shared_buffer(&self, prev_epoch: u64) { + self.clear_shared_buffer(prev_epoch).await } async fn new_local(&self, option: NewLocalOptions) -> BoxDynamicDispatchedLocalStateStore { @@ -1018,8 +1018,8 @@ pub mod boxed_state_store { self.deref().sync(epoch) } - fn clear_shared_buffer(&self) -> impl Future> + Send + '_ { - self.deref().clear_shared_buffer() + fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_ { + self.deref().clear_shared_buffer(prev_epoch) } fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index d3fa9ddf3582c..771acdb3263ab 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -581,7 +581,7 @@ mod tests { .unwrap(); // Recovery - test_env.storage.clear_shared_buffer().await.unwrap(); + test_env.storage.clear_shared_buffer(epoch2).await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -778,7 +778,7 @@ mod tests { .unwrap(); // Recovery - test_env.storage.clear_shared_buffer().await.unwrap(); + test_env.storage.clear_shared_buffer(epoch2).await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -1002,7 +1002,7 @@ mod tests { .unwrap(); // Recovery - test_env.storage.clear_shared_buffer().await.unwrap(); + test_env.storage.clear_shared_buffer(epoch2).await; let vnodes = build_bitmap(0..VirtualNode::COUNT); let factory = KvLogStoreFactory::new( diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index b838314729ad3..385825a23e0d2 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -72,7 +72,10 @@ pub(super) enum LocalBarrierEvent { actor_ids_to_collect: HashSet, result_sender: oneshot::Sender>, }, - Reset(oneshot::Sender<()>), + Reset { + prev_epoch: u64, + result_sender: oneshot::Sender<()>, + }, ReportActorCollected { actor_id: ActorId, barrier: Barrier, @@ -223,9 +226,10 @@ impl LocalBarrierWorker { event = event_rx.recv() => { if let Some(event) = event { match event { - LocalBarrierEvent::Reset(finish_sender) => { - self.reset().await; - let _ = finish_sender.send(()); + LocalBarrierEvent::Reset { + result_sender, prev_epoch} => { + self.reset(prev_epoch).await; + let _ = result_sender.send(()); } event => { self.handle_event(event); @@ -268,7 +272,7 @@ impl LocalBarrierWorker { warn!(err=?e, "fail to send inject barrier result"); }); } - LocalBarrierEvent::Reset(_) => { + LocalBarrierEvent::Reset { .. } => { unreachable!("Reset event should be handled separately in async context") } ReportActorCollected { actor_id, barrier } => self.collect(actor_id, &barrier), diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 5a2bde99da491..11eb9a44290cf 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -231,9 +231,12 @@ impl LocalStreamManager { } /// Force stop all actors on this worker, and then drop their resources. - pub async fn reset(&self) { + pub async fn reset(&self, prev_epoch: u64) { self.local_barrier_manager - .send_and_await(LocalBarrierEvent::Reset) + .send_and_await(|result_sender| LocalBarrierEvent::Reset { + result_sender, + prev_epoch, + }) .await .expect("should receive reset") } @@ -268,7 +271,7 @@ impl LocalBarrierWorker { } /// Force stop all actors on this worker, and then drop their resources. - pub(super) async fn reset(&mut self) { + pub(super) async fn reset(&mut self, prev_epoch: u64) { let actor_handles = self.actor_manager_state.drain_actor_handles(); for (actor_id, handle) in &actor_handles { tracing::debug!("force stopping actor {}", actor_id); @@ -295,7 +298,7 @@ impl LocalBarrierWorker { m.lock().clear(); } dispatch_state_store!(&self.actor_manager.env.state_store(), store, { - store.clear_shared_buffer().await.unwrap(); + store.clear_shared_buffer(prev_epoch).await; }); self.reset_state(); self.actor_manager.env.dml_manager_ref().clear(); From d852cf8b193209b4f201864f60cae5e059be4047 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 1 Feb 2024 01:57:14 +0800 Subject: [PATCH 2/6] send version update in a separate channel --- src/storage/hummock_test/src/test_utils.rs | 8 ++-- .../event_handler/hummock_event_handler.rs | 44 ++++++++++++------- src/storage/src/hummock/event_handler/mod.rs | 6 --- src/storage/src/hummock/observer_manager.rs | 26 +++++------ .../src/hummock/store/hummock_storage.rs | 22 +++++----- 5 files changed, 55 insertions(+), 51 deletions(-) diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index 28cd9273bc254..ee8d8bc393da9 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -34,7 +34,7 @@ use risingwave_storage::filter_key_extractor::{ RpcFilterKeyExtractorManager, }; use risingwave_storage::hummock::backup_reader::BackupReader; -use risingwave_storage::hummock::event_handler::{HummockEvent, HummockVersionUpdate}; +use risingwave_storage::hummock::event_handler::HummockVersionUpdate; use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use risingwave_storage::hummock::observer_manager::HummockObserverNode; @@ -53,8 +53,8 @@ pub async fn prepare_first_valid_version( worker_node: WorkerNode, ) -> ( PinnedVersion, - UnboundedSender, - UnboundedReceiver, + UnboundedSender, + UnboundedReceiver, ) { let (tx, mut rx) = unbounded_channel(); let notification_client = @@ -73,7 +73,7 @@ pub async fn prepare_first_valid_version( .await; observer_manager.start().await; let hummock_version = match rx.recv().await { - Some(HummockEvent::VersionUpdate(HummockVersionUpdate::PinnedVersion(version))) => version, + Some(HummockVersionUpdate::PinnedVersion(version)) => version, _ => unreachable!("should be full version"), }; diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index d45f09345a7a2..753c031db042c 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -24,7 +24,8 @@ use prometheus::core::{AtomicU64, GenericGauge}; use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; use thiserror_ext::AsReport; use tokio::spawn; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; use tracing::{debug, error, info, trace, warn}; use super::refiller::{CacheRefillConfig, CacheRefiller}; @@ -113,8 +114,9 @@ impl BufferTracker { } pub struct HummockEventHandler { - hummock_event_tx: mpsc::UnboundedSender, - hummock_event_rx: mpsc::UnboundedReceiver, + hummock_event_tx: UnboundedSender, + hummock_event_rx: UnboundedReceiver, + version_update_rx: UnboundedReceiver, pending_sync_requests: BTreeMap>>, read_version_mapping: Arc, @@ -158,8 +160,7 @@ async fn flush_imms( impl HummockEventHandler { pub fn new( - hummock_event_tx: mpsc::UnboundedSender, - hummock_event_rx: mpsc::UnboundedReceiver, + version_update_rx: UnboundedReceiver, pinned_version: PinnedVersion, compactor_context: CompactorContext, filter_key_extractor_manager: FilterKeyExtractorManager, @@ -169,8 +170,7 @@ impl HummockEventHandler { let upload_compactor_context = compactor_context.clone(); let cloned_sstable_object_id_manager = sstable_object_id_manager.clone(); Self::new_inner( - hummock_event_tx, - hummock_event_rx, + version_update_rx, pinned_version, Some(sstable_object_id_manager), compactor_context.sstable_store.clone(), @@ -190,8 +190,7 @@ impl HummockEventHandler { } fn new_inner( - hummock_event_tx: mpsc::UnboundedSender, - hummock_event_rx: mpsc::UnboundedReceiver, + version_update_rx: UnboundedReceiver, pinned_version: PinnedVersion, sstable_object_id_manager: Option>, sstable_store: SstableStoreRef, @@ -200,6 +199,7 @@ impl HummockEventHandler { spawn_upload_task: SpawnUploadTask, spawn_merging_task: SpawnMergingTask, ) -> Self { + let (hummock_event_tx, hummock_event_rx) = unbounded_channel(); let (version_update_notifier_tx, _) = tokio::sync::watch::channel(pinned_version.max_committed_epoch()); let version_update_notifier_tx = Arc::new(version_update_notifier_tx); @@ -226,6 +226,7 @@ impl HummockEventHandler { Self { hummock_event_tx, hummock_event_rx, + version_update_rx, pending_sync_requests: Default::default(), version_update_notifier_tx, pinned_version: Arc::new(ArcSwap::from_pointee(pinned_version)), @@ -250,6 +251,10 @@ impl HummockEventHandler { self.read_version_mapping.clone() } + pub fn event_sender(&self) -> UnboundedSender { + self.hummock_event_tx.clone() + } + pub fn buffer_tracker(&self) -> &BufferTracker { self.uploader.buffer_tracker() } @@ -529,7 +534,7 @@ impl HummockEventHandler { self.handle_clear(notifier, prev_epoch).await }, HummockEvent::Shutdown => { - info!("buffer tracker shutdown"); + info!("event handler shutdown"); return; }, event => { @@ -537,6 +542,13 @@ impl HummockEventHandler { } } } + version_update = pin!(self.version_update_rx.recv()) => { + let Some(version_update) = version_update else { + warn!("version update stream ends. event handle shutdown"); + return; + }; + self.handle_version_update(version_update); + } } } } @@ -591,10 +603,6 @@ impl HummockEventHandler { HummockEvent::Shutdown => { unreachable!("shutdown is handled specially") } - HummockEvent::VersionUpdate(version_payload) => { - self.handle_version_update(version_payload); - } - HummockEvent::ImmToUploader(imm) => { self.uploader.add_imm(imm); self.uploader.may_flush(); @@ -785,7 +793,6 @@ mod tests { #[tokio::test] async fn test_event_handler() { - let (tx, rx) = unbounded_channel(); let table_id = TableId::new(123); let epoch0 = 233; let pinned_version = PinnedVersion::new( @@ -800,11 +807,12 @@ mod tests { let mut storage_opts = default_opts_for_test(); storage_opts.imm_merge_threshold = 5; + let (_version_update_tx, version_update_rx) = unbounded_channel(); + let (spawn_upload_task_tx, mut spawn_upload_task_rx) = unbounded_channel(); let (spawn_merging_task_tx, mut spawn_merging_task_rx) = unbounded_channel(); let event_handler = HummockEventHandler::new_inner( - tx.clone(), - rx, + version_update_rx, pinned_version, None, mock_sstable_store(), @@ -843,6 +851,8 @@ mod tests { }), ); + let tx = event_handler.event_sender(); + let _join_handle = spawn(event_handler.start_hummock_event_handler_worker()); let (read_version_tx, read_version_rx) = oneshot::channel(); diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 9446effa4d074..538e495b31da3 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -65,8 +65,6 @@ pub enum HummockEvent { Shutdown, - VersionUpdate(HummockVersionUpdate), - ImmToUploader(ImmutableMemtable), SealEpoch { @@ -113,10 +111,6 @@ impl HummockEvent { HummockEvent::Shutdown => "Shutdown".to_string(), - HummockEvent::VersionUpdate(version_update_payload) => { - format!("VersionUpdate {:?}", version_update_payload) - } - HummockEvent::ImmToUploader(imm) => { format!("ImmToUploader {:?}", imm) } diff --git a/src/storage/src/hummock/observer_manager.rs b/src/storage/src/hummock/observer_manager.rs index e96d575ce599b..4e10d9a523950 100644 --- a/src/storage/src/hummock/observer_manager.rs +++ b/src/storage/src/hummock/observer_manager.rs @@ -26,14 +26,14 @@ use tokio::sync::mpsc::UnboundedSender; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManagerRef}; use crate::hummock::backup_reader::BackupReaderRef; -use crate::hummock::event_handler::{HummockEvent, HummockVersionUpdate}; +use crate::hummock::event_handler::HummockVersionUpdate; use crate::hummock::write_limiter::WriteLimiterRef; pub struct HummockObserverNode { filter_key_extractor_manager: FilterKeyExtractorManagerRef, backup_reader: BackupReaderRef, write_limiter: WriteLimiterRef, - version_update_sender: UnboundedSender, + version_update_sender: UnboundedSender, version: u64, } @@ -71,14 +71,12 @@ impl ObserverState for HummockObserverNode { Info::HummockVersionDeltas(hummock_version_deltas) => { let _ = self .version_update_sender - .send(HummockEvent::VersionUpdate( - HummockVersionUpdate::VersionDeltas( - hummock_version_deltas - .version_deltas - .iter() - .map(HummockVersionDelta::from_rpc_protobuf) - .collect(), - ), + .send(HummockVersionUpdate::VersionDeltas( + hummock_version_deltas + .version_deltas + .iter() + .map(HummockVersionDelta::from_rpc_protobuf) + .collect(), )) .inspect_err(|e| { tracing::error!(event = ?e.0, "unable to send version delta"); @@ -123,12 +121,12 @@ impl ObserverState for HummockObserverNode { ); let _ = self .version_update_sender - .send(HummockEvent::VersionUpdate( - HummockVersionUpdate::PinnedVersion(HummockVersion::from_rpc_protobuf( + .send(HummockVersionUpdate::PinnedVersion( + HummockVersion::from_rpc_protobuf( &snapshot .hummock_version .expect("should get hummock version"), - )), + ), )) .inspect_err(|e| { tracing::error!(event = ?e.0, "unable to send full version"); @@ -142,7 +140,7 @@ impl HummockObserverNode { pub fn new( filter_key_extractor_manager: FilterKeyExtractorManagerRef, backup_reader: BackupReaderRef, - version_update_sender: UnboundedSender, + version_update_sender: UnboundedSender, write_limiter: WriteLimiterRef, ) -> Self { Self { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 5ff0d0542c7bb..e9f403f662282 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -79,6 +79,8 @@ impl Drop for HummockStorageShutdownGuard { #[derive(Clone)] pub struct HummockStorage { hummock_event_sender: UnboundedSender, + // only used in test for setting hummock version in uploader + version_update_sender: UnboundedSender, context: CompactorContext, @@ -151,22 +153,22 @@ impl HummockStorage { .await .map_err(HummockError::read_backup_error)?; let write_limiter = Arc::new(WriteLimiter::default()); - let (event_tx, mut event_rx) = unbounded_channel(); + let (version_update_tx, mut version_update_rx) = unbounded_channel(); let observer_manager = ObserverManager::new( notification_client, HummockObserverNode::new( filter_key_extractor_manager.clone(), backup_reader.clone(), - event_tx.clone(), + version_update_tx.clone(), write_limiter.clone(), ), ) .await; observer_manager.start().await; - let hummock_version = match event_rx.recv().await { - Some(HummockEvent::VersionUpdate(HummockVersionUpdate::PinnedVersion(version))) => version, + let hummock_version = match version_update_rx.recv().await { + Some(HummockVersionUpdate::PinnedVersion(version)) => version, _ => unreachable!("the hummock observer manager is the first one to take the event tx. Should be full hummock version") }; @@ -189,8 +191,7 @@ impl HummockStorage { let seal_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); let min_current_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); let hummock_event_handler = HummockEventHandler::new( - event_tx.clone(), - event_rx, + version_update_rx, pinned_version, compactor_context.clone(), filter_key_extractor_manager.clone(), @@ -198,6 +199,8 @@ impl HummockStorage { state_store_metrics.clone(), ); + let event_tx = hummock_event_handler.event_sender(); + let instance = Self { context: compactor_context, filter_key_extractor_manager: filter_key_extractor_manager.clone(), @@ -206,6 +209,7 @@ impl HummockStorage { version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(), seal_epoch, hummock_event_sender: event_tx.clone(), + version_update_sender: version_update_tx, pinned_version: hummock_event_handler.pinned_version(), hummock_version_reader: HummockVersionReader::new( sstable_store, @@ -528,10 +532,8 @@ impl HummockStorage { pub async fn update_version_and_wait(&self, version: HummockVersion) { use tokio::task::yield_now; let version_id = version.id; - self.hummock_event_sender - .send(HummockEvent::VersionUpdate( - HummockVersionUpdate::PinnedVersion(version), - )) + self.version_update_sender + .send(HummockVersionUpdate::PinnedVersion(version)) .unwrap(); loop { if self.pinned_version.load().id() >= version_id { From 1a76c8e6f747cdb7a97ba1ce2208e99e441b3b4b Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 1 Feb 2024 02:21:15 +0800 Subject: [PATCH 3/6] wait for mce comes up in recovery --- .../event_handler/hummock_event_handler.rs | 43 +++++++++++++++++++ .../src/hummock/event_handler/refiller.rs | 11 +++++ 2 files changed, 54 insertions(+) diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 753c031db042c..a03bc97798d50 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -408,6 +408,49 @@ impl HummockEventHandler { ); self.uploader.clear(); + if let Some(CacheRefillerEvent { + pinned_version, + new_pinned_version, + }) = self.refiller.clear() + { + self.apply_version_update(pinned_version, new_pinned_version); + } + + let mce = self.uploader.max_committed_epoch(); + + if mce < prev_epoch { + while self + .refiller + .last_new_pinned_version() + .map(|version| version.max_committed_epoch()) + .unwrap_or(mce) + < prev_epoch + { + let version_update = self + .version_update_rx + .recv() + .await + .expect("should not be empty"); + self.handle_version_update(version_update); + } + let CacheRefillerEvent { + pinned_version, + new_pinned_version, + } = self + .refiller + .clear() + .expect("must have some version update to raise the mce"); + self.apply_version_update(pinned_version, new_pinned_version); + } + + assert!(self.uploader.max_committed_epoch() >= prev_epoch); + if self.uploader.max_committed_epoch() > prev_epoch { + warn!( + mce = self.uploader.max_committed_epoch(), + prev_epoch, "mce higher than clear prev_epoch" + ); + } + for (epoch, result_sender) in self.pending_sync_requests.extract_if(|_, _| true) { send_sync_result( result_sender, diff --git a/src/storage/src/hummock/event_handler/refiller.rs b/src/storage/src/hummock/event_handler/refiller.rs index 321eb31028673..a1a4de33dd2dc 100644 --- a/src/storage/src/hummock/event_handler/refiller.rs +++ b/src/storage/src/hummock/event_handler/refiller.rs @@ -268,6 +268,17 @@ impl CacheRefiller { pub fn last_new_pinned_version(&self) -> Option<&PinnedVersion> { self.queue.back().map(|item| &item.event.new_pinned_version) } + + pub fn clear(&mut self) -> Option { + let Some(last_item) = self.queue.pop_back() else { + return None; + }; + let mut event = last_item.event; + while let Some(item) = self.queue.pop_back() { + event.pinned_version = item.event.pinned_version; + } + Some(event) + } } impl CacheRefiller { From 184bb9a90b046fb225cb6dee9f5b1ff358cd06b3 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 1 Feb 2024 16:00:31 +0800 Subject: [PATCH 4/6] refine --- .../event_handler/hummock_event_handler.rs | 104 ++++++++++++------ .../src/hummock/event_handler/refiller.rs | 4 +- .../src/hummock/event_handler/uploader.rs | 4 + 3 files changed, 74 insertions(+), 38 deletions(-) diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index a03bc97798d50..2b890dfe748af 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -21,6 +21,7 @@ use arc_swap::ArcSwap; use await_tree::InstrumentAwait; use parking_lot::RwLock; use prometheus::core::{AtomicU64, GenericGauge}; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo; use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; use thiserror_ext::AsReport; use tokio::spawn; @@ -401,46 +402,61 @@ impl HummockEventHandler { async fn handle_clear(&mut self, notifier: oneshot::Sender<()>, prev_epoch: u64) { info!( - "handle clear event. max_committed_epoch: {}, max_synced_epoch: {}, max_sealed_epoch: {}", - self.uploader.max_committed_epoch(), - self.uploader.max_synced_epoch(), - self.uploader.max_sealed_epoch(), + prev_epoch, + max_committed_epoch = self.uploader.max_committed_epoch(), + max_synced_epoch = self.uploader.max_synced_epoch(), + max_sealed_epoch = self.uploader.max_sealed_epoch(), + "handle clear event" ); self.uploader.clear(); - if let Some(CacheRefillerEvent { - pinned_version, - new_pinned_version, - }) = self.refiller.clear() - { - self.apply_version_update(pinned_version, new_pinned_version); - } + let current_version = self.uploader.hummock_version(); - let mce = self.uploader.max_committed_epoch(); + if current_version.max_committed_epoch() < prev_epoch { + let mut latest_version = if let Some(CacheRefillerEvent { + pinned_version, + new_pinned_version, + }) = self.refiller.clear() + { + assert_eq!( + current_version.id(), + pinned_version.id(), + "refiller earliest version {:?} not equal to current version {:?}", + pinned_version.version(), + current_version.version() + ); - if mce < prev_epoch { - while self - .refiller - .last_new_pinned_version() - .map(|version| version.max_committed_epoch()) - .unwrap_or(mce) - < prev_epoch + info!( + prev_epoch, + current_mce = current_version.max_committed_epoch(), + refiller_mce = new_pinned_version.max_committed_epoch(), + "refiller is clear in recovery" + ); + + Some(new_pinned_version) + } else { + None + }; + + while let latest_version_ref = latest_version.as_ref().unwrap_or(current_version) + && latest_version_ref.max_committed_epoch() < prev_epoch { let version_update = self .version_update_rx .recv() .await .expect("should not be empty"); - self.handle_version_update(version_update); + latest_version = Some(Self::resolve_version_update_info( + latest_version_ref.clone(), + version_update, + None, + )); } - let CacheRefillerEvent { - pinned_version, - new_pinned_version, - } = self - .refiller - .clear() - .expect("must have some version update to raise the mce"); - self.apply_version_update(pinned_version, new_pinned_version); + + self.apply_version_update( + current_version.clone(), + latest_version.expect("must have some version update to raise the mce"), + ); } assert!(self.uploader.max_committed_epoch() >= prev_epoch); @@ -476,6 +492,8 @@ impl HummockEventHandler { let _ = notifier.send(()).inspect_err(|e| { error!("failed to notify completion of clear event: {:?}", e); }); + + info!(prev_epoch, "clear finished"); } fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) { @@ -483,17 +501,34 @@ impl HummockEventHandler { .refiller .last_new_pinned_version() .cloned() - .map(Arc::new) - .unwrap_or_else(|| self.pinned_version.load().clone()); + .unwrap_or_else(|| self.uploader.hummock_version().clone()); let mut sst_delta_infos = vec![]; + let new_pinned_version = Self::resolve_version_update_info( + pinned_version.clone(), + version_payload, + Some(&mut sst_delta_infos), + ); + + self.refiller + .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version); + } + + fn resolve_version_update_info( + pinned_version: PinnedVersion, + version_payload: HummockVersionUpdate, + mut sst_delta_infos: Option<&mut Vec>, + ) -> PinnedVersion { let newly_pinned_version = match version_payload { HummockVersionUpdate::VersionDeltas(version_deltas) => { let mut version_to_apply = pinned_version.version().clone(); for version_delta in &version_deltas { assert_eq!(version_to_apply.id, version_delta.prev_id); if version_to_apply.max_committed_epoch == version_delta.max_committed_epoch { - sst_delta_infos = version_to_apply.build_sst_delta_infos(version_delta); + if let Some(sst_delta_infos) = &mut sst_delta_infos { + **sst_delta_infos = + version_to_apply.build_sst_delta_infos(version_delta); + } } version_to_apply.apply_version_delta(version_delta); } @@ -505,15 +540,12 @@ impl HummockEventHandler { validate_table_key_range(&newly_pinned_version); - let new_pinned_version = pinned_version.new_pin_version(newly_pinned_version); - - self.refiller - .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version); + pinned_version.new_pin_version(newly_pinned_version) } fn apply_version_update( &mut self, - pinned_version: Arc, + pinned_version: PinnedVersion, new_pinned_version: PinnedVersion, ) { self.pinned_version diff --git a/src/storage/src/hummock/event_handler/refiller.rs b/src/storage/src/hummock/event_handler/refiller.rs index a1a4de33dd2dc..8366d958c3fd7 100644 --- a/src/storage/src/hummock/event_handler/refiller.rs +++ b/src/storage/src/hummock/event_handler/refiller.rs @@ -248,7 +248,7 @@ impl CacheRefiller { pub fn start_cache_refill( &mut self, deltas: Vec, - pinned_version: Arc, + pinned_version: PinnedVersion, new_pinned_version: PinnedVersion, ) { let task = CacheRefillTask { @@ -296,7 +296,7 @@ impl CacheRefiller { } pub struct CacheRefillerEvent { - pub pinned_version: Arc, + pub pinned_version: PinnedVersion, pub new_pinned_version: PinnedVersion, } diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 2d6f4bb59fffa..7ed88da3ce9e7 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -763,6 +763,10 @@ impl HummockUploader { self.context.pinned_version.max_committed_epoch() } + pub(crate) fn hummock_version(&self) -> &PinnedVersion { + &self.context.pinned_version + } + pub(crate) fn get_synced_data(&self, epoch: HummockEpoch) -> Option<&SyncedDataState> { assert!(self.max_committed_epoch() < epoch && epoch <= self.max_synced_epoch); self.synced_data.get(&epoch) From 3e5bd8510b1545342577665a98145986c6579296 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 1 Feb 2024 17:56:23 +0800 Subject: [PATCH 5/6] add ut --- .../event_handler/hummock_event_handler.rs | 138 +++++++++++++++++- .../src/hummock/event_handler/refiller.rs | 47 ++++-- .../src/hummock/store/hummock_storage.rs | 7 +- 3 files changed, 173 insertions(+), 19 deletions(-) diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 2b890dfe748af..53c5442def416 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -34,7 +34,7 @@ use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType}; use crate::filter_key_extractor::FilterKeyExtractorManager; use crate::hummock::compactor::{compact, CompactorContext}; use crate::hummock::conflict_detector::ConflictDetector; -use crate::hummock::event_handler::refiller::CacheRefillerEvent; +use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask}; use crate::hummock::event_handler::uploader::{ default_spawn_merging_task, HummockUploader, SpawnMergingTask, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskPayload, UploaderEvent, @@ -187,6 +187,7 @@ impl HummockEventHandler { )) }), default_spawn_merging_task(compactor_context.compaction_executor.clone()), + CacheRefiller::default_spawn_refill_task(), ) } @@ -199,6 +200,7 @@ impl HummockEventHandler { storage_opts: &StorageOpts, spawn_upload_task: SpawnUploadTask, spawn_merging_task: SpawnMergingTask, + spawn_refill_task: SpawnRefillTask, ) -> Self { let (hummock_event_tx, hummock_event_rx) = unbounded_channel(); let (version_update_notifier_tx, _) = @@ -222,6 +224,7 @@ impl HummockEventHandler { let refiller = CacheRefiller::new( CacheRefillConfig::from_storage_opts(storage_opts), sstable_store, + spawn_refill_task, ); Self { @@ -836,7 +839,7 @@ fn to_sync_result(result: &HummockResult) -> HummockResult, CacheRefillContext, PinnedVersion, PinnedVersion) -> JoinHandle<()> + + Send + + Sync + + 'static, +>; + /// A cache refiller for hummock data. -pub struct CacheRefiller { +pub(crate) struct CacheRefiller { /// order: old => new queue: VecDeque, context: CacheRefillContext, + + spawn_refill_task: SpawnRefillTask, } impl CacheRefiller { - pub fn new(config: CacheRefillConfig, sstable_store: SstableStoreRef) -> Self { + pub(crate) fn new( + config: CacheRefillConfig, + sstable_store: SstableStoreRef, + spawn_refill_task: SpawnRefillTask, + ) -> Self { let config = Arc::new(config); let concurrency = Arc::new(Semaphore::new(config.concurrency)); Self { @@ -242,34 +256,43 @@ impl CacheRefiller { concurrency, sstable_store, }, + spawn_refill_task, } } - pub fn start_cache_refill( + pub(crate) fn default_spawn_refill_task() -> SpawnRefillTask { + Arc::new(|deltas, context, _, _| { + let task = CacheRefillTask { deltas, context }; + tokio::spawn(task.run()) + }) + } + + pub(crate) fn start_cache_refill( &mut self, deltas: Vec, pinned_version: PinnedVersion, new_pinned_version: PinnedVersion, ) { - let task = CacheRefillTask { + let handle = (self.spawn_refill_task)( deltas, - context: self.context.clone(), - }; + self.context.clone(), + pinned_version.clone(), + new_pinned_version.clone(), + ); let event = CacheRefillerEvent { pinned_version, new_pinned_version, }; - let handle = tokio::spawn(task.run()); let item = Item { handle, event }; self.queue.push_back(item); GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.add(1); } - pub fn last_new_pinned_version(&self) -> Option<&PinnedVersion> { + pub(crate) fn last_new_pinned_version(&self) -> Option<&PinnedVersion> { self.queue.back().map(|item| &item.event.new_pinned_version) } - pub fn clear(&mut self) -> Option { + pub(crate) fn clear(&mut self) -> Option { let Some(last_item) = self.queue.pop_back() else { return None; }; @@ -282,7 +305,7 @@ impl CacheRefiller { } impl CacheRefiller { - pub fn next_event(&mut self) -> impl Future + '_ { + pub(crate) fn next_event(&mut self) -> impl Future + '_ { poll_fn(|cx| { if let Some(item) = self.queue.front_mut() { ready!(item.handle.poll_unpin(cx)).unwrap(); @@ -301,13 +324,13 @@ pub struct CacheRefillerEvent { } #[derive(Clone)] -struct CacheRefillContext { +pub(crate) struct CacheRefillContext { config: Arc, concurrency: Arc, sstable_store: SstableStoreRef, } -pub struct CacheRefillTask { +struct CacheRefillTask { deltas: Vec, context: CacheRefillContext, } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index e9f403f662282..800b7c43bb380 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -80,7 +80,7 @@ impl Drop for HummockStorageShutdownGuard { pub struct HummockStorage { hummock_event_sender: UnboundedSender, // only used in test for setting hummock version in uploader - version_update_sender: UnboundedSender, + _version_update_sender: UnboundedSender, context: CompactorContext, @@ -209,7 +209,7 @@ impl HummockStorage { version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(), seal_epoch, hummock_event_sender: event_tx.clone(), - version_update_sender: version_update_tx, + _version_update_sender: version_update_tx, pinned_version: hummock_event_handler.pinned_version(), hummock_version_reader: HummockVersionReader::new( sstable_store, @@ -529,10 +529,11 @@ impl HummockStorage { } /// Used in the compaction test tool + #[cfg(any(test, feature = "test"))] pub async fn update_version_and_wait(&self, version: HummockVersion) { use tokio::task::yield_now; let version_id = version.id; - self.version_update_sender + self._version_update_sender .send(HummockVersionUpdate::PinnedVersion(version)) .unwrap(); loop { From 99f543b57a69898ad2240c2c722e2e913f99211e Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 19 Feb 2024 14:13:18 +0800 Subject: [PATCH 6/6] resolve comment --- src/meta/src/barrier/mod.rs | 6 ++---- src/meta/src/barrier/recovery.rs | 9 ++++++++- src/storage/src/hummock/event_handler/refiller.rs | 6 ++++++ 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index b13dc19ef61c6..47bef49c66574 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -527,9 +527,7 @@ impl GlobalBarrierManager { let paused = self.take_pause_on_bootstrap().await.unwrap_or(false); let paused_reason = paused.then_some(PausedReason::Manual); - self.recovery(prev_epoch, paused_reason) - .instrument(span) - .await; + self.recovery(paused_reason).instrument(span).await; } self.context.set_status(BarrierManagerStatus::Running); @@ -770,7 +768,7 @@ impl GlobalBarrierManager { // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. - self.recovery(prev_epoch, None).instrument(span).await; + self.recovery(None).instrument(span).await; self.context.set_status(BarrierManagerStatus::Running); } else { panic!("failed to execute barrier: {}", err.as_report()); diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 145366a872cac..1ced92ce61c3a 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -329,7 +329,14 @@ impl GlobalBarrierManager { /// the cluster or `risectl` command. Used for debugging purpose. /// /// Returns the new state of the barrier manager after recovery. - pub async fn recovery(&mut self, prev_epoch: TracedEpoch, paused_reason: Option) { + pub async fn recovery(&mut self, paused_reason: Option) { + let prev_epoch = TracedEpoch::new( + self.context + .hummock_manager + .latest_snapshot() + .committed_epoch + .into(), + ); // Mark blocked and abort buffered schedules, they might be dirty already. self.scheduled_barriers .abort_and_mark_blocked("cluster is under recovering"); diff --git a/src/storage/src/hummock/event_handler/refiller.rs b/src/storage/src/hummock/event_handler/refiller.rs index d489d719df62a..1c592fac85a2d 100644 --- a/src/storage/src/hummock/event_handler/refiller.rs +++ b/src/storage/src/hummock/event_handler/refiller.rs @@ -292,12 +292,18 @@ impl CacheRefiller { self.queue.back().map(|item| &item.event.new_pinned_version) } + /// Clear the queue for cache refill and return an event that merges all pending cache refill events + /// into a single event that takes the earliest and latest version. pub(crate) fn clear(&mut self) -> Option { let Some(last_item) = self.queue.pop_back() else { return None; }; let mut event = last_item.event; while let Some(item) = self.queue.pop_back() { + assert_eq!( + event.pinned_version.id(), + item.event.new_pinned_version.id() + ); event.pinned_version = item.event.pinned_version; } Some(event)