From b2bda85f245a6de915a0f7c1e5fc553427016bd2 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Mon, 19 Feb 2024 14:48:28 +0800 Subject: [PATCH] feat(storage): reset to the latest committed epoch in recovery (#14923) --- proto/stream_service.proto | 1 + src/compute/src/rpc/service/stream_service.rs | 2 +- src/meta/src/barrier/mod.rs | 6 +- src/meta/src/barrier/recovery.rs | 44 ++- src/meta/src/barrier/rpc.rs | 2 + .../src/bin/replay/replay_impl.rs | 8 +- .../hummock_test/src/state_store_tests.rs | 2 +- src/storage/hummock_test/src/test_utils.rs | 8 +- src/storage/hummock_trace/src/collector.rs | 7 +- src/storage/hummock_trace/src/record.rs | 3 +- src/storage/hummock_trace/src/replay/mod.rs | 4 +- .../hummock_trace/src/replay/worker.rs | 17 +- .../event_handler/hummock_event_handler.rs | 305 +++++++++++++++--- src/storage/src/hummock/event_handler/mod.rs | 10 +- .../src/hummock/event_handler/refiller.rs | 98 ++++-- .../src/hummock/event_handler/uploader.rs | 55 ++-- src/storage/src/hummock/observer_manager.rs | 26 +- .../src/hummock/store/hummock_storage.rs | 29 +- src/storage/src/memory.rs | 2 +- src/storage/src/monitor/monitored_store.rs | 5 +- src/storage/src/monitor/traced_store.rs | 10 +- src/storage/src/panic_store.rs | 2 +- src/storage/src/store.rs | 2 +- src/storage/src/store_impl.rs | 14 +- .../common/log_store_impl/kv_log_store/mod.rs | 6 +- src/stream/src/task/barrier_manager.rs | 14 +- src/stream/src/task/stream_manager.rs | 11 +- 27 files changed, 458 insertions(+), 235 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 462f5ff0256a..e8c5d94a20ac 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -46,6 +46,7 @@ message DropActorsResponse { message ForceStopActorsRequest { string request_id = 1; + uint64 prev_epoch = 2; } message ForceStopActorsResponse { diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index def9a534586b..d496e20d51eb 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -116,7 +116,7 @@ impl StreamService for StreamServiceImpl { request: Request, ) -> std::result::Result, Status> { let req = request.into_inner(); - self.mgr.reset().await; + self.mgr.reset(req.prev_epoch).await; Ok(Response::new(ForceStopActorsResponse { request_id: req.request_id, status: None, diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index b13dc19ef61c..47bef49c6657 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -527,9 +527,7 @@ impl GlobalBarrierManager { let paused = self.take_pause_on_bootstrap().await.unwrap_or(false); let paused_reason = paused.then_some(PausedReason::Manual); - self.recovery(prev_epoch, paused_reason) - .instrument(span) - .await; + self.recovery(paused_reason).instrument(span).await; } self.context.set_status(BarrierManagerStatus::Running); @@ -770,7 +768,7 @@ impl GlobalBarrierManager { // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. - self.recovery(prev_epoch, None).instrument(span).await; + self.recovery(None).instrument(span).await; self.context.set_status(BarrierManagerStatus::Running); } else { panic!("failed to execute barrier: {}", err.as_report()); diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 3f7f3911e730..1ced92ce61c3 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -329,7 +329,14 @@ impl GlobalBarrierManager { /// the cluster or `risectl` command. Used for debugging purpose. /// /// Returns the new state of the barrier manager after recovery. - pub async fn recovery(&mut self, prev_epoch: TracedEpoch, paused_reason: Option) { + pub async fn recovery(&mut self, paused_reason: Option) { + let prev_epoch = TracedEpoch::new( + self.context + .hummock_manager + .latest_snapshot() + .committed_epoch + .into(), + ); // Mark blocked and abort buffered schedules, they might be dirty already. self.scheduled_barriers .abort_and_mark_blocked("cluster is under recovering"); @@ -399,9 +406,11 @@ impl GlobalBarrierManager { }; // Reset all compute nodes, stop and drop existing actors. - self.reset_compute_nodes(&info).await.inspect_err(|err| { - warn!(error = %err.as_report(), "reset compute nodes failed"); - })?; + self.reset_compute_nodes(&info, prev_epoch.value().0) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "reset compute nodes failed"); + })?; if self.pre_apply_drop_cancel().await? { info = self @@ -447,21 +456,6 @@ impl GlobalBarrierManager { tracing::Span::current(), // recovery span )); - #[cfg(not(all(test, feature = "failpoints")))] - { - use risingwave_common::util::epoch::INVALID_EPOCH; - - let mce = self - .context - .hummock_manager - .get_current_max_committed_epoch() - .await; - - if mce != INVALID_EPOCH { - command_ctx.wait_epoch_commit(mce).await?; - } - }; - let res = match self .context .inject_barrier(command_ctx.clone(), None, None) @@ -1055,14 +1049,18 @@ impl GlobalBarrierManager { } /// Reset all compute nodes by calling `force_stop_actors`. - async fn reset_compute_nodes(&self, info: &InflightActorInfo) -> MetaResult<()> { - debug!(worker = ?info.node_map.keys().collect_vec(), "force stop actors"); + async fn reset_compute_nodes( + &self, + info: &InflightActorInfo, + prev_epoch: u64, + ) -> MetaResult<()> { + debug!(prev_epoch, worker = ?info.node_map.keys().collect_vec(), "force stop actors"); self.context .stream_rpc_manager - .force_stop_actors(info.node_map.values()) + .force_stop_actors(info.node_map.values(), prev_epoch) .await?; - debug!("all compute nodes have been reset."); + debug!(prev_epoch, "all compute nodes have been reset."); Ok(()) } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 877f935f2520..8690435b4ebc 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -405,11 +405,13 @@ impl StreamRpcManager { pub async fn force_stop_actors( &self, nodes: impl Iterator, + prev_epoch: u64, ) -> MetaResult<()> { self.broadcast(nodes, |client| async move { client .force_stop_actors(ForceStopActorsRequest { request_id: Self::new_request_id(), + prev_epoch, }) .await }) diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index ef9b671da9b4..c3dedd6dbae4 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -186,12 +186,8 @@ impl ReplayStateStore for GlobalReplayImpl { Ok(()) } - async fn clear_shared_buffer(&self) -> Result<()> { - self.store - .clear_shared_buffer() - .await - .map_err(|_| TraceError::ClearSharedBufferFailed)?; - Ok(()) + async fn clear_shared_buffer(&self, prev_epoch: u64) { + self.store.clear_shared_buffer(prev_epoch).await } } pub(crate) struct LocalReplayImpl(LocalHummockStorage); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index e52983dc787a..d7f7531b8b65 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1330,7 +1330,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { drop(local_hummock_storage); - hummock_storage.clear_shared_buffer().await.unwrap(); + hummock_storage.clear_shared_buffer(epoch1).await; assert_eq!( hummock_storage diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index 554f7bd8b8be..f5d1a10a1883 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -34,7 +34,7 @@ use risingwave_storage::filter_key_extractor::{ RpcFilterKeyExtractorManager, }; use risingwave_storage::hummock::backup_reader::BackupReader; -use risingwave_storage::hummock::event_handler::{HummockEvent, HummockVersionUpdate}; +use risingwave_storage::hummock::event_handler::HummockVersionUpdate; use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use risingwave_storage::hummock::observer_manager::HummockObserverNode; @@ -53,8 +53,8 @@ pub async fn prepare_first_valid_version( worker_node: WorkerNode, ) -> ( PinnedVersion, - UnboundedSender, - UnboundedReceiver, + UnboundedSender, + UnboundedReceiver, ) { let (tx, mut rx) = unbounded_channel(); let notification_client = @@ -73,7 +73,7 @@ pub async fn prepare_first_valid_version( .await; observer_manager.start().await; let hummock_version = match rx.recv().await { - Some(HummockEvent::VersionUpdate(HummockVersionUpdate::PinnedVersion(version))) => version, + Some(HummockVersionUpdate::PinnedVersion(version)) => version, _ => unreachable!("should be full version"), }; diff --git a/src/storage/hummock_trace/src/collector.rs b/src/storage/hummock_trace/src/collector.rs index 264d2445ca76..b1a269a4620e 100644 --- a/src/storage/hummock_trace/src/collector.rs +++ b/src/storage/hummock_trace/src/collector.rs @@ -214,8 +214,11 @@ impl TraceSpan { Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type) } - pub fn new_clear_shared_buffer_span() -> MayTraceSpan { - Self::new_global_op(Operation::ClearSharedBuffer, StorageType::Global) + pub fn new_clear_shared_buffer_span(prev_epoch: u64) -> MayTraceSpan { + Self::new_global_op( + Operation::ClearSharedBuffer(prev_epoch), + StorageType::Global, + ) } pub fn new_validate_read_epoch_span(epoch: HummockReadEpoch) -> MayTraceSpan { diff --git a/src/storage/hummock_trace/src/record.rs b/src/storage/hummock_trace/src/record.rs index 7bd0a86d0e22..f8e4ceed6544 100644 --- a/src/storage/hummock_trace/src/record.rs +++ b/src/storage/hummock_trace/src/record.rs @@ -170,7 +170,7 @@ pub enum Operation { TryWaitEpoch(TracedHummockReadEpoch), /// clear shared buffer - ClearSharedBuffer, + ClearSharedBuffer(u64), /// Seal current epoch SealCurrentEpoch { @@ -299,7 +299,6 @@ pub enum OperationResult { Sync(TraceResult), NotifyHummock(TraceResult<()>), TryWaitEpoch(TraceResult<()>), - ClearSharedBuffer(TraceResult<()>), ValidateReadEpoch(TraceResult<()>), LocalStorageEpoch(TraceResult), LocalStorageIsDirty(TraceResult), diff --git a/src/storage/hummock_trace/src/replay/mod.rs b/src/storage/hummock_trace/src/replay/mod.rs index 99aa1c0c3714..046ab67b1860 100644 --- a/src/storage/hummock_trace/src/replay/mod.rs +++ b/src/storage/hummock_trace/src/replay/mod.rs @@ -120,7 +120,7 @@ pub trait ReplayStateStore { async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; - async fn clear_shared_buffer(&self) -> Result<()>; + async fn clear_shared_buffer(&self, prev_epoch: u64); fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; } @@ -152,7 +152,7 @@ mock! { ) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; - async fn clear_shared_buffer(&self) -> Result<()>; + async fn clear_shared_buffer(&self, prev_epoch: u64); fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; } impl GlobalReplay for GlobalReplayInterface{} diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index 7bc821a26c5b..4d37708420d4 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -327,22 +327,9 @@ impl ReplayWorker { ); } } - Operation::ClearSharedBuffer => { + Operation::ClearSharedBuffer(prev_epoch) => { assert_eq!(storage_type, StorageType::Global); - let res = res_rx.recv().await.expect("recv result failed"); - if let OperationResult::ClearSharedBuffer(expected) = res { - let actual = replay.clear_shared_buffer().await; - assert_eq!( - TraceResult::from(actual), - expected, - "clear_shared_buffer wrong" - ); - } else { - panic!( - "wrong clear_shared_buffer result, expect epoch result, but got {:?}", - res - ); - } + replay.clear_shared_buffer(prev_epoch).await; } Operation::SealCurrentEpoch { epoch, opts } => { assert_ne!(storage_type, StorageType::Global); diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 615bb293a411..f1209775e154 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -22,10 +22,12 @@ use await_tree::InstrumentAwait; use itertools::Itertools; use parking_lot::RwLock; use prometheus::core::{AtomicU64, GenericGauge}; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo; use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; use thiserror_ext::AsReport; use tokio::spawn; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; use tracing::{debug, error, info, trace, warn}; use super::refiller::{CacheRefillConfig, CacheRefiller}; @@ -33,7 +35,7 @@ use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType}; use crate::filter_key_extractor::FilterKeyExtractorManager; use crate::hummock::compactor::{compact, CompactorContext}; use crate::hummock::conflict_detector::ConflictDetector; -use crate::hummock::event_handler::refiller::CacheRefillerEvent; +use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask}; use crate::hummock::event_handler::uploader::{ default_spawn_merging_task, HummockUploader, SpawnMergingTask, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskPayload, UploaderEvent, @@ -117,8 +119,9 @@ impl BufferTracker { } pub struct HummockEventHandler { - hummock_event_tx: mpsc::UnboundedSender, - hummock_event_rx: mpsc::UnboundedReceiver, + hummock_event_tx: UnboundedSender, + hummock_event_rx: UnboundedReceiver, + version_update_rx: UnboundedReceiver, pending_sync_requests: BTreeMap>>, read_version_mapping: Arc>, /// A copy of `read_version_mapping` but owned by event handler @@ -164,8 +167,7 @@ async fn flush_imms( impl HummockEventHandler { pub fn new( - hummock_event_tx: mpsc::UnboundedSender, - hummock_event_rx: mpsc::UnboundedReceiver, + version_update_rx: UnboundedReceiver, pinned_version: PinnedVersion, compactor_context: CompactorContext, filter_key_extractor_manager: FilterKeyExtractorManager, @@ -175,8 +177,7 @@ impl HummockEventHandler { let upload_compactor_context = compactor_context.clone(); let cloned_sstable_object_id_manager = sstable_object_id_manager.clone(); Self::new_inner( - hummock_event_tx, - hummock_event_rx, + version_update_rx, pinned_version, Some(sstable_object_id_manager), compactor_context.sstable_store.clone(), @@ -192,12 +193,12 @@ impl HummockEventHandler { )) }), default_spawn_merging_task(compactor_context.compaction_executor.clone()), + CacheRefiller::default_spawn_refill_task(), ) } fn new_inner( - hummock_event_tx: mpsc::UnboundedSender, - hummock_event_rx: mpsc::UnboundedReceiver, + version_update_rx: UnboundedReceiver, pinned_version: PinnedVersion, sstable_object_id_manager: Option>, sstable_store: SstableStoreRef, @@ -205,7 +206,9 @@ impl HummockEventHandler { storage_opts: &StorageOpts, spawn_upload_task: SpawnUploadTask, spawn_merging_task: SpawnMergingTask, + spawn_refill_task: SpawnRefillTask, ) -> Self { + let (hummock_event_tx, hummock_event_rx) = unbounded_channel(); let (version_update_notifier_tx, _) = tokio::sync::watch::channel(pinned_version.max_committed_epoch()); let version_update_notifier_tx = Arc::new(version_update_notifier_tx); @@ -227,11 +230,13 @@ impl HummockEventHandler { let refiller = CacheRefiller::new( CacheRefillConfig::from_storage_opts(storage_opts), sstable_store, + spawn_refill_task, ); Self { hummock_event_tx, hummock_event_rx, + version_update_rx, pending_sync_requests: Default::default(), version_update_notifier_tx, pinned_version: Arc::new(ArcSwap::from_pointee(pinned_version)), @@ -257,6 +262,10 @@ impl HummockEventHandler { ReadOnlyRwLockRef::new(self.read_version_mapping.clone()) } + pub fn event_sender(&self) -> UnboundedSender { + self.hummock_event_tx.clone() + } + pub fn buffer_tracker(&self) -> &BufferTracker { self.uploader.buffer_tracker() } @@ -395,16 +404,74 @@ impl HummockEventHandler { } } - fn handle_clear(&mut self, notifier: oneshot::Sender<()>) { + async fn handle_clear(&mut self, notifier: oneshot::Sender<()>, prev_epoch: u64) { info!( - "handle clear event. max_committed_epoch: {}, max_synced_epoch: {}, max_sealed_epoch: {}", - self.uploader.max_committed_epoch(), - self.uploader.max_synced_epoch(), - self.uploader.max_sealed_epoch(), + prev_epoch, + max_committed_epoch = self.uploader.max_committed_epoch(), + max_synced_epoch = self.uploader.max_synced_epoch(), + max_sealed_epoch = self.uploader.max_sealed_epoch(), + "handle clear event" ); self.uploader.clear(); + let current_version = self.uploader.hummock_version(); + + if current_version.max_committed_epoch() < prev_epoch { + let mut latest_version = if let Some(CacheRefillerEvent { + pinned_version, + new_pinned_version, + }) = self.refiller.clear() + { + assert_eq!( + current_version.id(), + pinned_version.id(), + "refiller earliest version {:?} not equal to current version {:?}", + pinned_version.version(), + current_version.version() + ); + + info!( + prev_epoch, + current_mce = current_version.max_committed_epoch(), + refiller_mce = new_pinned_version.max_committed_epoch(), + "refiller is clear in recovery" + ); + + Some(new_pinned_version) + } else { + None + }; + + while let latest_version_ref = latest_version.as_ref().unwrap_or(current_version) + && latest_version_ref.max_committed_epoch() < prev_epoch + { + let version_update = self + .version_update_rx + .recv() + .await + .expect("should not be empty"); + latest_version = Some(Self::resolve_version_update_info( + latest_version_ref.clone(), + version_update, + None, + )); + } + + self.apply_version_update( + current_version.clone(), + latest_version.expect("must have some version update to raise the mce"), + ); + } + + assert!(self.uploader.max_committed_epoch() >= prev_epoch); + if self.uploader.max_committed_epoch() > prev_epoch { + warn!( + mce = self.uploader.max_committed_epoch(), + prev_epoch, "mce higher than clear prev_epoch" + ); + } + for (epoch, result_sender) in self.pending_sync_requests.extract_if(|_, _| true) { send_sync_result( result_sender, @@ -433,6 +500,8 @@ impl HummockEventHandler { let _ = notifier.send(()).inspect_err(|e| { error!("failed to notify completion of clear event: {:?}", e); }); + + info!(prev_epoch, "clear finished"); } fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) { @@ -440,17 +509,34 @@ impl HummockEventHandler { .refiller .last_new_pinned_version() .cloned() - .map(Arc::new) - .unwrap_or_else(|| self.pinned_version.load().clone()); + .unwrap_or_else(|| self.uploader.hummock_version().clone()); let mut sst_delta_infos = vec![]; + let new_pinned_version = Self::resolve_version_update_info( + pinned_version.clone(), + version_payload, + Some(&mut sst_delta_infos), + ); + + self.refiller + .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version); + } + + fn resolve_version_update_info( + pinned_version: PinnedVersion, + version_payload: HummockVersionUpdate, + mut sst_delta_infos: Option<&mut Vec>, + ) -> PinnedVersion { let newly_pinned_version = match version_payload { HummockVersionUpdate::VersionDeltas(version_deltas) => { let mut version_to_apply = pinned_version.version().clone(); for version_delta in &version_deltas { assert_eq!(version_to_apply.id, version_delta.prev_id); if version_to_apply.max_committed_epoch == version_delta.max_committed_epoch { - sst_delta_infos = version_to_apply.build_sst_delta_infos(version_delta); + if let Some(sst_delta_infos) = &mut sst_delta_infos { + **sst_delta_infos = + version_to_apply.build_sst_delta_infos(version_delta); + } } version_to_apply.apply_version_delta(version_delta); } @@ -462,15 +548,12 @@ impl HummockEventHandler { validate_table_key_range(&newly_pinned_version); - let new_pinned_version = pinned_version.new_pin_version(newly_pinned_version); - - self.refiller - .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version); + pinned_version.new_pin_version(newly_pinned_version) } fn apply_version_update( &mut self, - pinned_version: Arc, + pinned_version: PinnedVersion, new_pinned_version: PinnedVersion, ) { self.pinned_version @@ -529,10 +612,26 @@ impl HummockEventHandler { } event = pin!(self.hummock_event_rx.recv()) => { let Some(event) = event else { break }; - if self.handle_hummock_event(event) { - break; + match event { + HummockEvent::Clear(notifier, prev_epoch) => { + self.handle_clear(notifier, prev_epoch).await + }, + HummockEvent::Shutdown => { + info!("event handler shutdown"); + return; + }, + event => { + self.handle_hummock_event(event); + } } } + version_update = pin!(self.version_update_rx.recv()) => { + let Some(version_update) = version_update else { + warn!("version update stream ends. event handle shutdown"); + return; + }; + self.handle_version_update(version_update); + } } } } @@ -570,7 +669,7 @@ impl HummockEventHandler { } /// Gracefully shutdown if returns `true`. - fn handle_hummock_event(&mut self, event: HummockEvent) -> bool { + fn handle_hummock_event(&mut self, event: HummockEvent) { match event { HummockEvent::BufferMayFlush => { self.uploader.may_flush(); @@ -581,18 +680,12 @@ impl HummockEventHandler { } => { self.handle_await_sync_epoch(new_sync_epoch, sync_result_sender); } - HummockEvent::Clear(notifier) => { - self.handle_clear(notifier); + HummockEvent::Clear(_, _) => { + unreachable!("clear is handled in separated async context") } HummockEvent::Shutdown => { - info!("buffer tracker shutdown"); - return true; - } - - HummockEvent::VersionUpdate(version_payload) => { - self.handle_version_update(version_payload); + unreachable!("shutdown is handled specially") } - HummockEvent::ImmToUploader(imm) => { assert!( self.local_read_version_mapping @@ -730,7 +823,6 @@ impl HummockEventHandler { } } } - false } fn generate_instance_id(&mut self) -> LocalInstanceId { @@ -775,7 +867,7 @@ fn to_sync_result(result: &HummockResult) -> HummockResult), + Clear(oneshot::Sender<()>, u64), Shutdown, - VersionUpdate(HummockVersionUpdate), - ImmToUploader(ImmutableMemtable), SealEpoch { @@ -108,14 +106,10 @@ impl HummockEvent { sync_result_sender: _, } => format!("AwaitSyncEpoch epoch {} ", new_sync_epoch), - HummockEvent::Clear(_) => "Clear".to_string(), + HummockEvent::Clear(_, prev_epoch) => format!("Clear {:?}", prev_epoch), HummockEvent::Shutdown => "Shutdown".to_string(), - HummockEvent::VersionUpdate(version_update_payload) => { - format!("VersionUpdate {:?}", version_update_payload) - } - HummockEvent::ImmToUploader(imm) => { format!("ImmToUploader {:?}", imm) } diff --git a/src/storage/src/hummock/event_handler/refiller.rs b/src/storage/src/hummock/event_handler/refiller.rs index 3be242d6b94e..1c592fac85a2 100644 --- a/src/storage/src/hummock/event_handler/refiller.rs +++ b/src/storage/src/hummock/event_handler/refiller.rs @@ -13,10 +13,10 @@ // limitations under the License. use std::collections::{HashMap, HashSet, VecDeque}; -use std::ops::{Deref, DerefMut, Range}; -use std::pin::Pin; +use std::future::poll_fn; +use std::ops::{Deref, Range}; use std::sync::{Arc, LazyLock}; -use std::task::{ready, Context, Poll}; +use std::task::{ready, Poll}; use std::time::{Duration, Instant}; use foyer::common::code::Key; @@ -223,16 +223,30 @@ struct Item { event: CacheRefillerEvent, } +pub(crate) type SpawnRefillTask = Arc< + // first current version, second new version + dyn Fn(Vec, CacheRefillContext, PinnedVersion, PinnedVersion) -> JoinHandle<()> + + Send + + Sync + + 'static, +>; + /// A cache refiller for hummock data. -pub struct CacheRefiller { +pub(crate) struct CacheRefiller { /// order: old => new queue: VecDeque, context: CacheRefillContext, + + spawn_refill_task: SpawnRefillTask, } impl CacheRefiller { - pub fn new(config: CacheRefillConfig, sstable_store: SstableStoreRef) -> Self { + pub(crate) fn new( + config: CacheRefillConfig, + sstable_store: SstableStoreRef, + spawn_refill_task: SpawnRefillTask, + ) -> Self { let config = Arc::new(config); let concurrency = Arc::new(Semaphore::new(config.concurrency)); Self { @@ -242,71 +256,87 @@ impl CacheRefiller { concurrency, sstable_store, }, + spawn_refill_task, } } - pub fn start_cache_refill( + pub(crate) fn default_spawn_refill_task() -> SpawnRefillTask { + Arc::new(|deltas, context, _, _| { + let task = CacheRefillTask { deltas, context }; + tokio::spawn(task.run()) + }) + } + + pub(crate) fn start_cache_refill( &mut self, deltas: Vec, - pinned_version: Arc, + pinned_version: PinnedVersion, new_pinned_version: PinnedVersion, ) { - let task = CacheRefillTask { + let handle = (self.spawn_refill_task)( deltas, - context: self.context.clone(), - }; + self.context.clone(), + pinned_version.clone(), + new_pinned_version.clone(), + ); let event = CacheRefillerEvent { pinned_version, new_pinned_version, }; - let handle = tokio::spawn(task.run()); let item = Item { handle, event }; self.queue.push_back(item); GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.add(1); } - pub fn last_new_pinned_version(&self) -> Option<&PinnedVersion> { + pub(crate) fn last_new_pinned_version(&self) -> Option<&PinnedVersion> { self.queue.back().map(|item| &item.event.new_pinned_version) } - pub fn next_event(&mut self) -> NextCacheRefillerEvent<'_> { - NextCacheRefillerEvent { refiller: self } + /// Clear the queue for cache refill and return an event that merges all pending cache refill events + /// into a single event that takes the earliest and latest version. + pub(crate) fn clear(&mut self) -> Option { + let Some(last_item) = self.queue.pop_back() else { + return None; + }; + let mut event = last_item.event; + while let Some(item) = self.queue.pop_back() { + assert_eq!( + event.pinned_version.id(), + item.event.new_pinned_version.id() + ); + event.pinned_version = item.event.pinned_version; + } + Some(event) } } -pub struct NextCacheRefillerEvent<'a> { - refiller: &'a mut CacheRefiller, -} - -impl<'a> Future for NextCacheRefillerEvent<'a> { - type Output = CacheRefillerEvent; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let refiller = &mut self.deref_mut().refiller; - - if let Some(item) = refiller.queue.front_mut() { - ready!(item.handle.poll_unpin(cx)).unwrap(); - let item = refiller.queue.pop_front().unwrap(); - GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.sub(1); - return Poll::Ready(item.event); - } - Poll::Pending +impl CacheRefiller { + pub(crate) fn next_event(&mut self) -> impl Future + '_ { + poll_fn(|cx| { + if let Some(item) = self.queue.front_mut() { + ready!(item.handle.poll_unpin(cx)).unwrap(); + let item = self.queue.pop_front().unwrap(); + GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.sub(1); + return Poll::Ready(item.event); + } + Poll::Pending + }) } } pub struct CacheRefillerEvent { - pub pinned_version: Arc, + pub pinned_version: PinnedVersion, pub new_pinned_version: PinnedVersion, } #[derive(Clone)] -struct CacheRefillContext { +pub(crate) struct CacheRefillContext { config: Arc, concurrency: Arc, sstable_store: SstableStoreRef, } -pub struct CacheRefillTask { +struct CacheRefillTask { deltas: Vec, context: CacheRefillContext, } diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index b8ebc858054a..7ffa3956c234 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -15,9 +15,8 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, VecDeque}; use std::fmt::{Debug, Display, Formatter}; -use std::future::Future; +use std::future::{poll_fn, Future}; use std::mem::swap; -use std::ops::DerefMut; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -780,6 +779,10 @@ impl HummockUploader { self.context.pinned_version.max_committed_epoch() } + pub(crate) fn hummock_version(&self) -> &PinnedVersion { + &self.context.pinned_version + } + pub(crate) fn get_synced_data(&self, epoch: HummockEpoch) -> Option<&SyncedDataState> { assert!(self.max_committed_epoch() < epoch && epoch <= self.max_synced_epoch); self.synced_data.get(&epoch) @@ -1037,10 +1040,6 @@ impl HummockUploader { // TODO: call `abort` on the uploading task join handle } - - pub(crate) fn next_event(&mut self) -> NextUploaderEvent<'_> { - NextUploaderEvent { uploader: self } - } } impl HummockUploader { @@ -1132,10 +1131,6 @@ impl HummockUploader { } } -pub(crate) struct NextUploaderEvent<'a> { - uploader: &'a mut HummockUploader, -} - pub(crate) enum UploaderEvent { // staging sstable info of newer data comes first SyncFinish(HummockEpoch, Vec), @@ -1143,30 +1138,28 @@ pub(crate) enum UploaderEvent { ImmMerged(MergeImmTaskOutput), } -impl<'a> Future for NextUploaderEvent<'a> { - type Output = UploaderEvent; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let uploader = &mut self.deref_mut().uploader; - - if let Some((epoch, newly_uploaded_sstables)) = ready!(uploader.poll_syncing_task(cx)) { - return Poll::Ready(UploaderEvent::SyncFinish(epoch, newly_uploaded_sstables)); - } +impl HummockUploader { + pub(crate) fn next_event(&mut self) -> impl Future + '_ { + poll_fn(|cx| { + if let Some((epoch, newly_uploaded_sstables)) = ready!(self.poll_syncing_task(cx)) { + return Poll::Ready(UploaderEvent::SyncFinish(epoch, newly_uploaded_sstables)); + } - if let Some(sstable_info) = ready!(uploader.poll_sealed_spill_task(cx)) { - return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); - } + if let Some(sstable_info) = ready!(self.poll_sealed_spill_task(cx)) { + return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); + } - if let Some(sstable_info) = ready!(uploader.poll_unsealed_spill_task(cx)) { - return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); - } + if let Some(sstable_info) = ready!(self.poll_unsealed_spill_task(cx)) { + return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); + } - if let Some(merge_output) = ready!(uploader.poll_sealed_merge_imm_task(cx)) { - // add the merged imm into sealed data - uploader.update_sealed_data(&merge_output.merged_imm); - return Poll::Ready(UploaderEvent::ImmMerged(merge_output)); - } - Poll::Pending + if let Some(merge_output) = ready!(self.poll_sealed_merge_imm_task(cx)) { + // add the merged imm into sealed data + self.update_sealed_data(&merge_output.merged_imm); + return Poll::Ready(UploaderEvent::ImmMerged(merge_output)); + } + Poll::Pending + }) } } diff --git a/src/storage/src/hummock/observer_manager.rs b/src/storage/src/hummock/observer_manager.rs index e96d575ce599..4e10d9a52395 100644 --- a/src/storage/src/hummock/observer_manager.rs +++ b/src/storage/src/hummock/observer_manager.rs @@ -26,14 +26,14 @@ use tokio::sync::mpsc::UnboundedSender; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManagerRef}; use crate::hummock::backup_reader::BackupReaderRef; -use crate::hummock::event_handler::{HummockEvent, HummockVersionUpdate}; +use crate::hummock::event_handler::HummockVersionUpdate; use crate::hummock::write_limiter::WriteLimiterRef; pub struct HummockObserverNode { filter_key_extractor_manager: FilterKeyExtractorManagerRef, backup_reader: BackupReaderRef, write_limiter: WriteLimiterRef, - version_update_sender: UnboundedSender, + version_update_sender: UnboundedSender, version: u64, } @@ -71,14 +71,12 @@ impl ObserverState for HummockObserverNode { Info::HummockVersionDeltas(hummock_version_deltas) => { let _ = self .version_update_sender - .send(HummockEvent::VersionUpdate( - HummockVersionUpdate::VersionDeltas( - hummock_version_deltas - .version_deltas - .iter() - .map(HummockVersionDelta::from_rpc_protobuf) - .collect(), - ), + .send(HummockVersionUpdate::VersionDeltas( + hummock_version_deltas + .version_deltas + .iter() + .map(HummockVersionDelta::from_rpc_protobuf) + .collect(), )) .inspect_err(|e| { tracing::error!(event = ?e.0, "unable to send version delta"); @@ -123,12 +121,12 @@ impl ObserverState for HummockObserverNode { ); let _ = self .version_update_sender - .send(HummockEvent::VersionUpdate( - HummockVersionUpdate::PinnedVersion(HummockVersion::from_rpc_protobuf( + .send(HummockVersionUpdate::PinnedVersion( + HummockVersion::from_rpc_protobuf( &snapshot .hummock_version .expect("should get hummock version"), - )), + ), )) .inspect_err(|e| { tracing::error!(event = ?e.0, "unable to send full version"); @@ -142,7 +140,7 @@ impl HummockObserverNode { pub fn new( filter_key_extractor_manager: FilterKeyExtractorManagerRef, backup_reader: BackupReaderRef, - version_update_sender: UnboundedSender, + version_update_sender: UnboundedSender, write_limiter: WriteLimiterRef, ) -> Self { Self { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index a5322a0d7765..d2064ffd09de 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -79,6 +79,8 @@ impl Drop for HummockStorageShutdownGuard { #[derive(Clone)] pub struct HummockStorage { hummock_event_sender: UnboundedSender, + // only used in test for setting hummock version in uploader + _version_update_sender: UnboundedSender, context: CompactorContext, @@ -151,22 +153,22 @@ impl HummockStorage { .await .map_err(HummockError::read_backup_error)?; let write_limiter = Arc::new(WriteLimiter::default()); - let (event_tx, mut event_rx) = unbounded_channel(); + let (version_update_tx, mut version_update_rx) = unbounded_channel(); let observer_manager = ObserverManager::new( notification_client, HummockObserverNode::new( filter_key_extractor_manager.clone(), backup_reader.clone(), - event_tx.clone(), + version_update_tx.clone(), write_limiter.clone(), ), ) .await; observer_manager.start().await; - let hummock_version = match event_rx.recv().await { - Some(HummockEvent::VersionUpdate(HummockVersionUpdate::PinnedVersion(version))) => version, + let hummock_version = match version_update_rx.recv().await { + Some(HummockVersionUpdate::PinnedVersion(version)) => version, _ => unreachable!("the hummock observer manager is the first one to take the event tx. Should be full hummock version") }; @@ -189,8 +191,7 @@ impl HummockStorage { let seal_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); let min_current_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); let hummock_event_handler = HummockEventHandler::new( - event_tx.clone(), - event_rx, + version_update_rx, pinned_version, compactor_context.clone(), filter_key_extractor_manager.clone(), @@ -198,6 +199,8 @@ impl HummockStorage { state_store_metrics.clone(), ); + let event_tx = hummock_event_handler.event_sender(); + let instance = Self { context: compactor_context, filter_key_extractor_manager: filter_key_extractor_manager.clone(), @@ -206,6 +209,7 @@ impl HummockStorage { version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(), seal_epoch, hummock_event_sender: event_tx.clone(), + _version_update_sender: version_update_tx, pinned_version: hummock_event_handler.pinned_version(), hummock_version_reader: HummockVersionReader::new( sstable_store, @@ -467,10 +471,10 @@ impl StateStore for HummockStorage { StoreLocalStatistic::flush_all(); } - async fn clear_shared_buffer(&self) -> StorageResult<()> { + async fn clear_shared_buffer(&self, prev_epoch: u64) { let (tx, rx) = oneshot::channel(); self.hummock_event_sender - .send(HummockEvent::Clear(tx)) + .send(HummockEvent::Clear(tx, prev_epoch)) .expect("should send success"); rx.await.expect("should wait success"); @@ -478,8 +482,6 @@ impl StateStore for HummockStorage { self.min_current_epoch .store(HummockEpoch::MAX, MemOrdering::SeqCst); self.seal_epoch.store(epoch, MemOrdering::SeqCst); - - Ok(()) } fn new_local(&self, option: NewLocalOptions) -> impl Future + Send + '_ { @@ -527,13 +529,12 @@ impl HummockStorage { } /// Used in the compaction test tool + #[cfg(any(test, feature = "test"))] pub async fn update_version_and_wait(&self, version: HummockVersion) { use tokio::task::yield_now; let version_id = version.id; - self.hummock_event_sender - .send(HummockEvent::VersionUpdate( - HummockVersionUpdate::PinnedVersion(version), - )) + self._version_update_sender + .send(HummockVersionUpdate::PinnedVersion(version)) .unwrap(); loop { if self.pinned_version.load().id() >= version_id { diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 1e77b5f5652b..007260fef735 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -635,7 +635,7 @@ impl StateStore for RangeKvStateStore { fn seal_epoch(&self, _epoch: u64, _is_checkpoint: bool) {} #[allow(clippy::unused_async)] - async fn clear_shared_buffer(&self) -> StorageResult<()> { + async fn clear_shared_buffer(&self, _prev_epoch: u64) { unimplemented!("recovery not supported") } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 6e3b9b3db0fc..1bea7f6742c6 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -331,11 +331,10 @@ impl StateStore for MonitoredStateStore { panic!("the state store is already monitored") } - fn clear_shared_buffer(&self) -> impl Future> + Send + '_ { + fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_ { self.inner - .clear_shared_buffer() + .clear_shared_buffer(prev_epoch) .verbose_instrument_await("store_clear_shared_buffer") - .inspect_err(|e| error!(error = %e.as_report(), "Failed in clear_shared_buffer")) } async fn new_local(&self, option: NewLocalOptions) -> Self::Local { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 95a5c835407d..8cf96a231ead 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -250,13 +250,9 @@ impl StateStore for TracedStateStore { self.inner.seal_epoch(epoch, is_checkpoint); } - async fn clear_shared_buffer(&self) -> StorageResult<()> { - let span = TraceSpan::new_clear_shared_buffer_span(); - let res = self.inner.clear_shared_buffer().await; - span.may_send_result(OperationResult::ClearSharedBuffer( - res.as_ref().map(|o| *o).into(), - )); - res + async fn clear_shared_buffer(&self, prev_epoch: u64) { + let _span = TraceSpan::new_clear_shared_buffer_span(prev_epoch); + self.inner.clear_shared_buffer(prev_epoch).await; } async fn new_local(&self, options: NewLocalOptions) -> Self::Local { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 08b0663b4e22..5299cac9fe08 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -154,7 +154,7 @@ impl StateStore for PanicStateStore { } #[allow(clippy::unused_async)] - async fn clear_shared_buffer(&self) -> StorageResult<()> { + async fn clear_shared_buffer(&self, _prev_epoch: u64) { panic!("should not clear shared buffer from the panic state store!"); } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 0daca6aa5305..7c8353dc0f30 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -191,7 +191,7 @@ pub trait StateStore: StateStoreRead + StaticSendSync + Clone { /// Clears contents in shared buffer. /// This method should only be called when dropping all actors in the local compute node. - fn clear_shared_buffer(&self) -> impl Future> + Send + '_; + fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_; fn new_local(&self, option: NewLocalOptions) -> impl Future + Send + '_; diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 4502236c0a20..f1316fe7e20c 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -482,8 +482,8 @@ pub mod verify { self.actual.seal_epoch(epoch, is_checkpoint) } - fn clear_shared_buffer(&self) -> impl Future> + Send + '_ { - self.actual.clear_shared_buffer() + fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_ { + self.actual.clear_shared_buffer(prev_epoch) } async fn new_local(&self, option: NewLocalOptions) -> Self::Local { @@ -927,7 +927,7 @@ pub mod boxed_state_store { fn seal_epoch(&self, epoch: u64, is_checkpoint: bool); - async fn clear_shared_buffer(&self) -> StorageResult<()>; + async fn clear_shared_buffer(&self, prev_epoch: u64); async fn new_local(&self, option: NewLocalOptions) -> BoxDynamicDispatchedLocalStateStore; @@ -948,8 +948,8 @@ pub mod boxed_state_store { self.seal_epoch(epoch, is_checkpoint); } - async fn clear_shared_buffer(&self) -> StorageResult<()> { - self.clear_shared_buffer().await + async fn clear_shared_buffer(&self, prev_epoch: u64) { + self.clear_shared_buffer(prev_epoch).await } async fn new_local(&self, option: NewLocalOptions) -> BoxDynamicDispatchedLocalStateStore { @@ -1018,8 +1018,8 @@ pub mod boxed_state_store { self.deref().sync(epoch) } - fn clear_shared_buffer(&self) -> impl Future> + Send + '_ { - self.deref().clear_shared_buffer() + fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_ { + self.deref().clear_shared_buffer(prev_epoch) } fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 1cbd65fec151..9b77a9a7bf09 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -640,7 +640,7 @@ mod tests { drop(writer); // Recovery - test_env.storage.clear_shared_buffer().await.unwrap(); + test_env.storage.clear_shared_buffer(epoch2).await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -845,7 +845,7 @@ mod tests { drop(writer); // Recovery - test_env.storage.clear_shared_buffer().await.unwrap(); + test_env.storage.clear_shared_buffer(epoch2).await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -1072,7 +1072,7 @@ mod tests { drop(writer2); // Recovery - test_env.storage.clear_shared_buffer().await.unwrap(); + test_env.storage.clear_shared_buffer(epoch2).await; let vnodes = build_bitmap(0..VirtualNode::COUNT); let factory = KvLogStoreFactory::new( diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index b838314729ad..385825a23e0d 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -72,7 +72,10 @@ pub(super) enum LocalBarrierEvent { actor_ids_to_collect: HashSet, result_sender: oneshot::Sender>, }, - Reset(oneshot::Sender<()>), + Reset { + prev_epoch: u64, + result_sender: oneshot::Sender<()>, + }, ReportActorCollected { actor_id: ActorId, barrier: Barrier, @@ -223,9 +226,10 @@ impl LocalBarrierWorker { event = event_rx.recv() => { if let Some(event) = event { match event { - LocalBarrierEvent::Reset(finish_sender) => { - self.reset().await; - let _ = finish_sender.send(()); + LocalBarrierEvent::Reset { + result_sender, prev_epoch} => { + self.reset(prev_epoch).await; + let _ = result_sender.send(()); } event => { self.handle_event(event); @@ -268,7 +272,7 @@ impl LocalBarrierWorker { warn!(err=?e, "fail to send inject barrier result"); }); } - LocalBarrierEvent::Reset(_) => { + LocalBarrierEvent::Reset { .. } => { unreachable!("Reset event should be handled separately in async context") } ReportActorCollected { actor_id, barrier } => self.collect(actor_id, &barrier), diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 5a2bde99da49..11eb9a44290c 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -231,9 +231,12 @@ impl LocalStreamManager { } /// Force stop all actors on this worker, and then drop their resources. - pub async fn reset(&self) { + pub async fn reset(&self, prev_epoch: u64) { self.local_barrier_manager - .send_and_await(LocalBarrierEvent::Reset) + .send_and_await(|result_sender| LocalBarrierEvent::Reset { + result_sender, + prev_epoch, + }) .await .expect("should receive reset") } @@ -268,7 +271,7 @@ impl LocalBarrierWorker { } /// Force stop all actors on this worker, and then drop their resources. - pub(super) async fn reset(&mut self) { + pub(super) async fn reset(&mut self, prev_epoch: u64) { let actor_handles = self.actor_manager_state.drain_actor_handles(); for (actor_id, handle) in &actor_handles { tracing::debug!("force stopping actor {}", actor_id); @@ -295,7 +298,7 @@ impl LocalBarrierWorker { m.lock().clear(); } dispatch_state_store!(&self.actor_manager.env.state_store(), store, { - store.clear_shared_buffer().await.unwrap(); + store.clear_shared_buffer(prev_epoch).await; }); self.reset_state(); self.actor_manager.env.dml_manager_ref().clear();