From d7396b26f9860a187b7204768b47fc1145ffd359 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Tue, 20 Feb 2024 15:54:38 +0800 Subject: [PATCH] Revert "feat(storage): reset to the latest committed epoch in recovery (#14923)" This reverts commit b2bda85f245a6de915a0f7c1e5fc553427016bd2. --- proto/stream_service.proto | 1 - src/compute/src/rpc/service/stream_service.rs | 2 +- src/meta/src/barrier/mod.rs | 6 +- src/meta/src/barrier/recovery.rs | 44 +-- src/meta/src/barrier/rpc.rs | 2 - .../src/bin/replay/replay_impl.rs | 8 +- .../hummock_test/src/state_store_tests.rs | 2 +- src/storage/hummock_test/src/test_utils.rs | 8 +- src/storage/hummock_trace/src/collector.rs | 7 +- src/storage/hummock_trace/src/record.rs | 3 +- src/storage/hummock_trace/src/replay/mod.rs | 4 +- .../hummock_trace/src/replay/worker.rs | 17 +- .../event_handler/hummock_event_handler.rs | 305 +++--------------- src/storage/src/hummock/event_handler/mod.rs | 10 +- .../src/hummock/event_handler/refiller.rs | 98 ++---- .../src/hummock/event_handler/uploader.rs | 55 ++-- src/storage/src/hummock/observer_manager.rs | 26 +- .../src/hummock/store/hummock_storage.rs | 29 +- src/storage/src/memory.rs | 2 +- src/storage/src/monitor/monitored_store.rs | 5 +- src/storage/src/monitor/traced_store.rs | 10 +- src/storage/src/panic_store.rs | 2 +- src/storage/src/store.rs | 2 +- src/storage/src/store_impl.rs | 14 +- .../common/log_store_impl/kv_log_store/mod.rs | 6 +- src/stream/src/task/barrier_manager.rs | 14 +- src/stream/src/task/stream_manager.rs | 11 +- 27 files changed, 235 insertions(+), 458 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index e8c5d94a20ac3..462f5ff0256a6 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -46,7 +46,6 @@ message DropActorsResponse { message ForceStopActorsRequest { string request_id = 1; - uint64 prev_epoch = 2; } message ForceStopActorsResponse { diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index d496e20d51eb5..def9a534586bb 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -116,7 +116,7 @@ impl StreamService for StreamServiceImpl { request: Request, ) -> std::result::Result, Status> { let req = request.into_inner(); - self.mgr.reset(req.prev_epoch).await; + self.mgr.reset().await; Ok(Response::new(ForceStopActorsResponse { request_id: req.request_id, status: None, diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 47bef49c66574..b13dc19ef61c6 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -527,7 +527,9 @@ impl GlobalBarrierManager { let paused = self.take_pause_on_bootstrap().await.unwrap_or(false); let paused_reason = paused.then_some(PausedReason::Manual); - self.recovery(paused_reason).instrument(span).await; + self.recovery(prev_epoch, paused_reason) + .instrument(span) + .await; } self.context.set_status(BarrierManagerStatus::Running); @@ -768,7 +770,7 @@ impl GlobalBarrierManager { // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. - self.recovery(None).instrument(span).await; + self.recovery(prev_epoch, None).instrument(span).await; self.context.set_status(BarrierManagerStatus::Running); } else { panic!("failed to execute barrier: {}", err.as_report()); diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 1ced92ce61c3a..3f7f3911e7302 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -329,14 +329,7 @@ impl GlobalBarrierManager { /// the cluster or `risectl` command. Used for debugging purpose. /// /// Returns the new state of the barrier manager after recovery. - pub async fn recovery(&mut self, paused_reason: Option) { - let prev_epoch = TracedEpoch::new( - self.context - .hummock_manager - .latest_snapshot() - .committed_epoch - .into(), - ); + pub async fn recovery(&mut self, prev_epoch: TracedEpoch, paused_reason: Option) { // Mark blocked and abort buffered schedules, they might be dirty already. self.scheduled_barriers .abort_and_mark_blocked("cluster is under recovering"); @@ -406,11 +399,9 @@ impl GlobalBarrierManager { }; // Reset all compute nodes, stop and drop existing actors. - self.reset_compute_nodes(&info, prev_epoch.value().0) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "reset compute nodes failed"); - })?; + self.reset_compute_nodes(&info).await.inspect_err(|err| { + warn!(error = %err.as_report(), "reset compute nodes failed"); + })?; if self.pre_apply_drop_cancel().await? { info = self @@ -456,6 +447,21 @@ impl GlobalBarrierManager { tracing::Span::current(), // recovery span )); + #[cfg(not(all(test, feature = "failpoints")))] + { + use risingwave_common::util::epoch::INVALID_EPOCH; + + let mce = self + .context + .hummock_manager + .get_current_max_committed_epoch() + .await; + + if mce != INVALID_EPOCH { + command_ctx.wait_epoch_commit(mce).await?; + } + }; + let res = match self .context .inject_barrier(command_ctx.clone(), None, None) @@ -1049,18 +1055,14 @@ impl GlobalBarrierManager { } /// Reset all compute nodes by calling `force_stop_actors`. - async fn reset_compute_nodes( - &self, - info: &InflightActorInfo, - prev_epoch: u64, - ) -> MetaResult<()> { - debug!(prev_epoch, worker = ?info.node_map.keys().collect_vec(), "force stop actors"); + async fn reset_compute_nodes(&self, info: &InflightActorInfo) -> MetaResult<()> { + debug!(worker = ?info.node_map.keys().collect_vec(), "force stop actors"); self.context .stream_rpc_manager - .force_stop_actors(info.node_map.values(), prev_epoch) + .force_stop_actors(info.node_map.values()) .await?; - debug!(prev_epoch, "all compute nodes have been reset."); + debug!("all compute nodes have been reset."); Ok(()) } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 8690435b4ebc0..877f935f25207 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -405,13 +405,11 @@ impl StreamRpcManager { pub async fn force_stop_actors( &self, nodes: impl Iterator, - prev_epoch: u64, ) -> MetaResult<()> { self.broadcast(nodes, |client| async move { client .force_stop_actors(ForceStopActorsRequest { request_id: Self::new_request_id(), - prev_epoch, }) .await }) diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index c3dedd6dbae46..ef9b671da9b49 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -186,8 +186,12 @@ impl ReplayStateStore for GlobalReplayImpl { Ok(()) } - async fn clear_shared_buffer(&self, prev_epoch: u64) { - self.store.clear_shared_buffer(prev_epoch).await + async fn clear_shared_buffer(&self) -> Result<()> { + self.store + .clear_shared_buffer() + .await + .map_err(|_| TraceError::ClearSharedBufferFailed)?; + Ok(()) } } pub(crate) struct LocalReplayImpl(LocalHummockStorage); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index d7f7531b8b65b..e52983dc787a5 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1330,7 +1330,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { drop(local_hummock_storage); - hummock_storage.clear_shared_buffer(epoch1).await; + hummock_storage.clear_shared_buffer().await.unwrap(); assert_eq!( hummock_storage diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index f5d1a10a18839..554f7bd8b8be1 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -34,7 +34,7 @@ use risingwave_storage::filter_key_extractor::{ RpcFilterKeyExtractorManager, }; use risingwave_storage::hummock::backup_reader::BackupReader; -use risingwave_storage::hummock::event_handler::HummockVersionUpdate; +use risingwave_storage::hummock::event_handler::{HummockEvent, HummockVersionUpdate}; use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use risingwave_storage::hummock::observer_manager::HummockObserverNode; @@ -53,8 +53,8 @@ pub async fn prepare_first_valid_version( worker_node: WorkerNode, ) -> ( PinnedVersion, - UnboundedSender, - UnboundedReceiver, + UnboundedSender, + UnboundedReceiver, ) { let (tx, mut rx) = unbounded_channel(); let notification_client = @@ -73,7 +73,7 @@ pub async fn prepare_first_valid_version( .await; observer_manager.start().await; let hummock_version = match rx.recv().await { - Some(HummockVersionUpdate::PinnedVersion(version)) => version, + Some(HummockEvent::VersionUpdate(HummockVersionUpdate::PinnedVersion(version))) => version, _ => unreachable!("should be full version"), }; diff --git a/src/storage/hummock_trace/src/collector.rs b/src/storage/hummock_trace/src/collector.rs index b1a269a4620ee..264d2445ca76d 100644 --- a/src/storage/hummock_trace/src/collector.rs +++ b/src/storage/hummock_trace/src/collector.rs @@ -214,11 +214,8 @@ impl TraceSpan { Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type) } - pub fn new_clear_shared_buffer_span(prev_epoch: u64) -> MayTraceSpan { - Self::new_global_op( - Operation::ClearSharedBuffer(prev_epoch), - StorageType::Global, - ) + pub fn new_clear_shared_buffer_span() -> MayTraceSpan { + Self::new_global_op(Operation::ClearSharedBuffer, StorageType::Global) } pub fn new_validate_read_epoch_span(epoch: HummockReadEpoch) -> MayTraceSpan { diff --git a/src/storage/hummock_trace/src/record.rs b/src/storage/hummock_trace/src/record.rs index f8e4ceed65449..7bd0a86d0e222 100644 --- a/src/storage/hummock_trace/src/record.rs +++ b/src/storage/hummock_trace/src/record.rs @@ -170,7 +170,7 @@ pub enum Operation { TryWaitEpoch(TracedHummockReadEpoch), /// clear shared buffer - ClearSharedBuffer(u64), + ClearSharedBuffer, /// Seal current epoch SealCurrentEpoch { @@ -299,6 +299,7 @@ pub enum OperationResult { Sync(TraceResult), NotifyHummock(TraceResult<()>), TryWaitEpoch(TraceResult<()>), + ClearSharedBuffer(TraceResult<()>), ValidateReadEpoch(TraceResult<()>), LocalStorageEpoch(TraceResult), LocalStorageIsDirty(TraceResult), diff --git a/src/storage/hummock_trace/src/replay/mod.rs b/src/storage/hummock_trace/src/replay/mod.rs index 046ab67b18607..99aa1c0c37144 100644 --- a/src/storage/hummock_trace/src/replay/mod.rs +++ b/src/storage/hummock_trace/src/replay/mod.rs @@ -120,7 +120,7 @@ pub trait ReplayStateStore { async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; - async fn clear_shared_buffer(&self, prev_epoch: u64); + async fn clear_shared_buffer(&self) -> Result<()>; fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; } @@ -152,7 +152,7 @@ mock! { ) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; - async fn clear_shared_buffer(&self, prev_epoch: u64); + async fn clear_shared_buffer(&self) -> Result<()>; fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; } impl GlobalReplay for GlobalReplayInterface{} diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index 4d37708420d48..7bc821a26c5b9 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -327,9 +327,22 @@ impl ReplayWorker { ); } } - Operation::ClearSharedBuffer(prev_epoch) => { + Operation::ClearSharedBuffer => { assert_eq!(storage_type, StorageType::Global); - replay.clear_shared_buffer(prev_epoch).await; + let res = res_rx.recv().await.expect("recv result failed"); + if let OperationResult::ClearSharedBuffer(expected) = res { + let actual = replay.clear_shared_buffer().await; + assert_eq!( + TraceResult::from(actual), + expected, + "clear_shared_buffer wrong" + ); + } else { + panic!( + "wrong clear_shared_buffer result, expect epoch result, but got {:?}", + res + ); + } } Operation::SealCurrentEpoch { epoch, opts } => { assert_ne!(storage_type, StorageType::Global); diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index f1209775e154a..615bb293a4116 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -22,12 +22,10 @@ use await_tree::InstrumentAwait; use itertools::Itertools; use parking_lot::RwLock; use prometheus::core::{AtomicU64, GenericGauge}; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo; use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; use thiserror_ext::AsReport; use tokio::spawn; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, info, trace, warn}; use super::refiller::{CacheRefillConfig, CacheRefiller}; @@ -35,7 +33,7 @@ use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType}; use crate::filter_key_extractor::FilterKeyExtractorManager; use crate::hummock::compactor::{compact, CompactorContext}; use crate::hummock::conflict_detector::ConflictDetector; -use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask}; +use crate::hummock::event_handler::refiller::CacheRefillerEvent; use crate::hummock::event_handler::uploader::{ default_spawn_merging_task, HummockUploader, SpawnMergingTask, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskPayload, UploaderEvent, @@ -119,9 +117,8 @@ impl BufferTracker { } pub struct HummockEventHandler { - hummock_event_tx: UnboundedSender, - hummock_event_rx: UnboundedReceiver, - version_update_rx: UnboundedReceiver, + hummock_event_tx: mpsc::UnboundedSender, + hummock_event_rx: mpsc::UnboundedReceiver, pending_sync_requests: BTreeMap>>, read_version_mapping: Arc>, /// A copy of `read_version_mapping` but owned by event handler @@ -167,7 +164,8 @@ async fn flush_imms( impl HummockEventHandler { pub fn new( - version_update_rx: UnboundedReceiver, + hummock_event_tx: mpsc::UnboundedSender, + hummock_event_rx: mpsc::UnboundedReceiver, pinned_version: PinnedVersion, compactor_context: CompactorContext, filter_key_extractor_manager: FilterKeyExtractorManager, @@ -177,7 +175,8 @@ impl HummockEventHandler { let upload_compactor_context = compactor_context.clone(); let cloned_sstable_object_id_manager = sstable_object_id_manager.clone(); Self::new_inner( - version_update_rx, + hummock_event_tx, + hummock_event_rx, pinned_version, Some(sstable_object_id_manager), compactor_context.sstable_store.clone(), @@ -193,12 +192,12 @@ impl HummockEventHandler { )) }), default_spawn_merging_task(compactor_context.compaction_executor.clone()), - CacheRefiller::default_spawn_refill_task(), ) } fn new_inner( - version_update_rx: UnboundedReceiver, + hummock_event_tx: mpsc::UnboundedSender, + hummock_event_rx: mpsc::UnboundedReceiver, pinned_version: PinnedVersion, sstable_object_id_manager: Option>, sstable_store: SstableStoreRef, @@ -206,9 +205,7 @@ impl HummockEventHandler { storage_opts: &StorageOpts, spawn_upload_task: SpawnUploadTask, spawn_merging_task: SpawnMergingTask, - spawn_refill_task: SpawnRefillTask, ) -> Self { - let (hummock_event_tx, hummock_event_rx) = unbounded_channel(); let (version_update_notifier_tx, _) = tokio::sync::watch::channel(pinned_version.max_committed_epoch()); let version_update_notifier_tx = Arc::new(version_update_notifier_tx); @@ -230,13 +227,11 @@ impl HummockEventHandler { let refiller = CacheRefiller::new( CacheRefillConfig::from_storage_opts(storage_opts), sstable_store, - spawn_refill_task, ); Self { hummock_event_tx, hummock_event_rx, - version_update_rx, pending_sync_requests: Default::default(), version_update_notifier_tx, pinned_version: Arc::new(ArcSwap::from_pointee(pinned_version)), @@ -262,10 +257,6 @@ impl HummockEventHandler { ReadOnlyRwLockRef::new(self.read_version_mapping.clone()) } - pub fn event_sender(&self) -> UnboundedSender { - self.hummock_event_tx.clone() - } - pub fn buffer_tracker(&self) -> &BufferTracker { self.uploader.buffer_tracker() } @@ -404,74 +395,16 @@ impl HummockEventHandler { } } - async fn handle_clear(&mut self, notifier: oneshot::Sender<()>, prev_epoch: u64) { + fn handle_clear(&mut self, notifier: oneshot::Sender<()>) { info!( - prev_epoch, - max_committed_epoch = self.uploader.max_committed_epoch(), - max_synced_epoch = self.uploader.max_synced_epoch(), - max_sealed_epoch = self.uploader.max_sealed_epoch(), - "handle clear event" + "handle clear event. max_committed_epoch: {}, max_synced_epoch: {}, max_sealed_epoch: {}", + self.uploader.max_committed_epoch(), + self.uploader.max_synced_epoch(), + self.uploader.max_sealed_epoch(), ); self.uploader.clear(); - let current_version = self.uploader.hummock_version(); - - if current_version.max_committed_epoch() < prev_epoch { - let mut latest_version = if let Some(CacheRefillerEvent { - pinned_version, - new_pinned_version, - }) = self.refiller.clear() - { - assert_eq!( - current_version.id(), - pinned_version.id(), - "refiller earliest version {:?} not equal to current version {:?}", - pinned_version.version(), - current_version.version() - ); - - info!( - prev_epoch, - current_mce = current_version.max_committed_epoch(), - refiller_mce = new_pinned_version.max_committed_epoch(), - "refiller is clear in recovery" - ); - - Some(new_pinned_version) - } else { - None - }; - - while let latest_version_ref = latest_version.as_ref().unwrap_or(current_version) - && latest_version_ref.max_committed_epoch() < prev_epoch - { - let version_update = self - .version_update_rx - .recv() - .await - .expect("should not be empty"); - latest_version = Some(Self::resolve_version_update_info( - latest_version_ref.clone(), - version_update, - None, - )); - } - - self.apply_version_update( - current_version.clone(), - latest_version.expect("must have some version update to raise the mce"), - ); - } - - assert!(self.uploader.max_committed_epoch() >= prev_epoch); - if self.uploader.max_committed_epoch() > prev_epoch { - warn!( - mce = self.uploader.max_committed_epoch(), - prev_epoch, "mce higher than clear prev_epoch" - ); - } - for (epoch, result_sender) in self.pending_sync_requests.extract_if(|_, _| true) { send_sync_result( result_sender, @@ -500,8 +433,6 @@ impl HummockEventHandler { let _ = notifier.send(()).inspect_err(|e| { error!("failed to notify completion of clear event: {:?}", e); }); - - info!(prev_epoch, "clear finished"); } fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) { @@ -509,34 +440,17 @@ impl HummockEventHandler { .refiller .last_new_pinned_version() .cloned() - .unwrap_or_else(|| self.uploader.hummock_version().clone()); + .map(Arc::new) + .unwrap_or_else(|| self.pinned_version.load().clone()); let mut sst_delta_infos = vec![]; - let new_pinned_version = Self::resolve_version_update_info( - pinned_version.clone(), - version_payload, - Some(&mut sst_delta_infos), - ); - - self.refiller - .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version); - } - - fn resolve_version_update_info( - pinned_version: PinnedVersion, - version_payload: HummockVersionUpdate, - mut sst_delta_infos: Option<&mut Vec>, - ) -> PinnedVersion { let newly_pinned_version = match version_payload { HummockVersionUpdate::VersionDeltas(version_deltas) => { let mut version_to_apply = pinned_version.version().clone(); for version_delta in &version_deltas { assert_eq!(version_to_apply.id, version_delta.prev_id); if version_to_apply.max_committed_epoch == version_delta.max_committed_epoch { - if let Some(sst_delta_infos) = &mut sst_delta_infos { - **sst_delta_infos = - version_to_apply.build_sst_delta_infos(version_delta); - } + sst_delta_infos = version_to_apply.build_sst_delta_infos(version_delta); } version_to_apply.apply_version_delta(version_delta); } @@ -548,12 +462,15 @@ impl HummockEventHandler { validate_table_key_range(&newly_pinned_version); - pinned_version.new_pin_version(newly_pinned_version) + let new_pinned_version = pinned_version.new_pin_version(newly_pinned_version); + + self.refiller + .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version); } fn apply_version_update( &mut self, - pinned_version: PinnedVersion, + pinned_version: Arc, new_pinned_version: PinnedVersion, ) { self.pinned_version @@ -612,26 +529,10 @@ impl HummockEventHandler { } event = pin!(self.hummock_event_rx.recv()) => { let Some(event) = event else { break }; - match event { - HummockEvent::Clear(notifier, prev_epoch) => { - self.handle_clear(notifier, prev_epoch).await - }, - HummockEvent::Shutdown => { - info!("event handler shutdown"); - return; - }, - event => { - self.handle_hummock_event(event); - } + if self.handle_hummock_event(event) { + break; } } - version_update = pin!(self.version_update_rx.recv()) => { - let Some(version_update) = version_update else { - warn!("version update stream ends. event handle shutdown"); - return; - }; - self.handle_version_update(version_update); - } } } } @@ -669,7 +570,7 @@ impl HummockEventHandler { } /// Gracefully shutdown if returns `true`. - fn handle_hummock_event(&mut self, event: HummockEvent) { + fn handle_hummock_event(&mut self, event: HummockEvent) -> bool { match event { HummockEvent::BufferMayFlush => { self.uploader.may_flush(); @@ -680,12 +581,18 @@ impl HummockEventHandler { } => { self.handle_await_sync_epoch(new_sync_epoch, sync_result_sender); } - HummockEvent::Clear(_, _) => { - unreachable!("clear is handled in separated async context") + HummockEvent::Clear(notifier) => { + self.handle_clear(notifier); } HummockEvent::Shutdown => { - unreachable!("shutdown is handled specially") + info!("buffer tracker shutdown"); + return true; + } + + HummockEvent::VersionUpdate(version_payload) => { + self.handle_version_update(version_payload); } + HummockEvent::ImmToUploader(imm) => { assert!( self.local_read_version_mapping @@ -823,6 +730,7 @@ impl HummockEventHandler { } } } + false } fn generate_instance_id(&mut self) -> LocalInstanceId { @@ -867,7 +775,7 @@ fn to_sync_result(result: &HummockResult) -> HummockResult, u64), + Clear(oneshot::Sender<()>), Shutdown, + VersionUpdate(HummockVersionUpdate), + ImmToUploader(ImmutableMemtable), SealEpoch { @@ -106,10 +108,14 @@ impl HummockEvent { sync_result_sender: _, } => format!("AwaitSyncEpoch epoch {} ", new_sync_epoch), - HummockEvent::Clear(_, prev_epoch) => format!("Clear {:?}", prev_epoch), + HummockEvent::Clear(_) => "Clear".to_string(), HummockEvent::Shutdown => "Shutdown".to_string(), + HummockEvent::VersionUpdate(version_update_payload) => { + format!("VersionUpdate {:?}", version_update_payload) + } + HummockEvent::ImmToUploader(imm) => { format!("ImmToUploader {:?}", imm) } diff --git a/src/storage/src/hummock/event_handler/refiller.rs b/src/storage/src/hummock/event_handler/refiller.rs index 1c592fac85a2d..3be242d6b94ec 100644 --- a/src/storage/src/hummock/event_handler/refiller.rs +++ b/src/storage/src/hummock/event_handler/refiller.rs @@ -13,10 +13,10 @@ // limitations under the License. use std::collections::{HashMap, HashSet, VecDeque}; -use std::future::poll_fn; -use std::ops::{Deref, Range}; +use std::ops::{Deref, DerefMut, Range}; +use std::pin::Pin; use std::sync::{Arc, LazyLock}; -use std::task::{ready, Poll}; +use std::task::{ready, Context, Poll}; use std::time::{Duration, Instant}; use foyer::common::code::Key; @@ -223,30 +223,16 @@ struct Item { event: CacheRefillerEvent, } -pub(crate) type SpawnRefillTask = Arc< - // first current version, second new version - dyn Fn(Vec, CacheRefillContext, PinnedVersion, PinnedVersion) -> JoinHandle<()> - + Send - + Sync - + 'static, ->; - /// A cache refiller for hummock data. -pub(crate) struct CacheRefiller { +pub struct CacheRefiller { /// order: old => new queue: VecDeque, context: CacheRefillContext, - - spawn_refill_task: SpawnRefillTask, } impl CacheRefiller { - pub(crate) fn new( - config: CacheRefillConfig, - sstable_store: SstableStoreRef, - spawn_refill_task: SpawnRefillTask, - ) -> Self { + pub fn new(config: CacheRefillConfig, sstable_store: SstableStoreRef) -> Self { let config = Arc::new(config); let concurrency = Arc::new(Semaphore::new(config.concurrency)); Self { @@ -256,87 +242,71 @@ impl CacheRefiller { concurrency, sstable_store, }, - spawn_refill_task, } } - pub(crate) fn default_spawn_refill_task() -> SpawnRefillTask { - Arc::new(|deltas, context, _, _| { - let task = CacheRefillTask { deltas, context }; - tokio::spawn(task.run()) - }) - } - - pub(crate) fn start_cache_refill( + pub fn start_cache_refill( &mut self, deltas: Vec, - pinned_version: PinnedVersion, + pinned_version: Arc, new_pinned_version: PinnedVersion, ) { - let handle = (self.spawn_refill_task)( + let task = CacheRefillTask { deltas, - self.context.clone(), - pinned_version.clone(), - new_pinned_version.clone(), - ); + context: self.context.clone(), + }; let event = CacheRefillerEvent { pinned_version, new_pinned_version, }; + let handle = tokio::spawn(task.run()); let item = Item { handle, event }; self.queue.push_back(item); GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.add(1); } - pub(crate) fn last_new_pinned_version(&self) -> Option<&PinnedVersion> { + pub fn last_new_pinned_version(&self) -> Option<&PinnedVersion> { self.queue.back().map(|item| &item.event.new_pinned_version) } - /// Clear the queue for cache refill and return an event that merges all pending cache refill events - /// into a single event that takes the earliest and latest version. - pub(crate) fn clear(&mut self) -> Option { - let Some(last_item) = self.queue.pop_back() else { - return None; - }; - let mut event = last_item.event; - while let Some(item) = self.queue.pop_back() { - assert_eq!( - event.pinned_version.id(), - item.event.new_pinned_version.id() - ); - event.pinned_version = item.event.pinned_version; - } - Some(event) + pub fn next_event(&mut self) -> NextCacheRefillerEvent<'_> { + NextCacheRefillerEvent { refiller: self } } } -impl CacheRefiller { - pub(crate) fn next_event(&mut self) -> impl Future + '_ { - poll_fn(|cx| { - if let Some(item) = self.queue.front_mut() { - ready!(item.handle.poll_unpin(cx)).unwrap(); - let item = self.queue.pop_front().unwrap(); - GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.sub(1); - return Poll::Ready(item.event); - } - Poll::Pending - }) +pub struct NextCacheRefillerEvent<'a> { + refiller: &'a mut CacheRefiller, +} + +impl<'a> Future for NextCacheRefillerEvent<'a> { + type Output = CacheRefillerEvent; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let refiller = &mut self.deref_mut().refiller; + + if let Some(item) = refiller.queue.front_mut() { + ready!(item.handle.poll_unpin(cx)).unwrap(); + let item = refiller.queue.pop_front().unwrap(); + GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.sub(1); + return Poll::Ready(item.event); + } + Poll::Pending } } pub struct CacheRefillerEvent { - pub pinned_version: PinnedVersion, + pub pinned_version: Arc, pub new_pinned_version: PinnedVersion, } #[derive(Clone)] -pub(crate) struct CacheRefillContext { +struct CacheRefillContext { config: Arc, concurrency: Arc, sstable_store: SstableStoreRef, } -struct CacheRefillTask { +pub struct CacheRefillTask { deltas: Vec, context: CacheRefillContext, } diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 7ffa3956c2349..b8ebc858054a1 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -15,8 +15,9 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, VecDeque}; use std::fmt::{Debug, Display, Formatter}; -use std::future::{poll_fn, Future}; +use std::future::Future; use std::mem::swap; +use std::ops::DerefMut; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -779,10 +780,6 @@ impl HummockUploader { self.context.pinned_version.max_committed_epoch() } - pub(crate) fn hummock_version(&self) -> &PinnedVersion { - &self.context.pinned_version - } - pub(crate) fn get_synced_data(&self, epoch: HummockEpoch) -> Option<&SyncedDataState> { assert!(self.max_committed_epoch() < epoch && epoch <= self.max_synced_epoch); self.synced_data.get(&epoch) @@ -1040,6 +1037,10 @@ impl HummockUploader { // TODO: call `abort` on the uploading task join handle } + + pub(crate) fn next_event(&mut self) -> NextUploaderEvent<'_> { + NextUploaderEvent { uploader: self } + } } impl HummockUploader { @@ -1131,6 +1132,10 @@ impl HummockUploader { } } +pub(crate) struct NextUploaderEvent<'a> { + uploader: &'a mut HummockUploader, +} + pub(crate) enum UploaderEvent { // staging sstable info of newer data comes first SyncFinish(HummockEpoch, Vec), @@ -1138,28 +1143,30 @@ pub(crate) enum UploaderEvent { ImmMerged(MergeImmTaskOutput), } -impl HummockUploader { - pub(crate) fn next_event(&mut self) -> impl Future + '_ { - poll_fn(|cx| { - if let Some((epoch, newly_uploaded_sstables)) = ready!(self.poll_syncing_task(cx)) { - return Poll::Ready(UploaderEvent::SyncFinish(epoch, newly_uploaded_sstables)); - } +impl<'a> Future for NextUploaderEvent<'a> { + type Output = UploaderEvent; - if let Some(sstable_info) = ready!(self.poll_sealed_spill_task(cx)) { - return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); - } + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let uploader = &mut self.deref_mut().uploader; - if let Some(sstable_info) = ready!(self.poll_unsealed_spill_task(cx)) { - return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); - } + if let Some((epoch, newly_uploaded_sstables)) = ready!(uploader.poll_syncing_task(cx)) { + return Poll::Ready(UploaderEvent::SyncFinish(epoch, newly_uploaded_sstables)); + } - if let Some(merge_output) = ready!(self.poll_sealed_merge_imm_task(cx)) { - // add the merged imm into sealed data - self.update_sealed_data(&merge_output.merged_imm); - return Poll::Ready(UploaderEvent::ImmMerged(merge_output)); - } - Poll::Pending - }) + if let Some(sstable_info) = ready!(uploader.poll_sealed_spill_task(cx)) { + return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); + } + + if let Some(sstable_info) = ready!(uploader.poll_unsealed_spill_task(cx)) { + return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); + } + + if let Some(merge_output) = ready!(uploader.poll_sealed_merge_imm_task(cx)) { + // add the merged imm into sealed data + uploader.update_sealed_data(&merge_output.merged_imm); + return Poll::Ready(UploaderEvent::ImmMerged(merge_output)); + } + Poll::Pending } } diff --git a/src/storage/src/hummock/observer_manager.rs b/src/storage/src/hummock/observer_manager.rs index 4e10d9a523950..e96d575ce599b 100644 --- a/src/storage/src/hummock/observer_manager.rs +++ b/src/storage/src/hummock/observer_manager.rs @@ -26,14 +26,14 @@ use tokio::sync::mpsc::UnboundedSender; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManagerRef}; use crate::hummock::backup_reader::BackupReaderRef; -use crate::hummock::event_handler::HummockVersionUpdate; +use crate::hummock::event_handler::{HummockEvent, HummockVersionUpdate}; use crate::hummock::write_limiter::WriteLimiterRef; pub struct HummockObserverNode { filter_key_extractor_manager: FilterKeyExtractorManagerRef, backup_reader: BackupReaderRef, write_limiter: WriteLimiterRef, - version_update_sender: UnboundedSender, + version_update_sender: UnboundedSender, version: u64, } @@ -71,12 +71,14 @@ impl ObserverState for HummockObserverNode { Info::HummockVersionDeltas(hummock_version_deltas) => { let _ = self .version_update_sender - .send(HummockVersionUpdate::VersionDeltas( - hummock_version_deltas - .version_deltas - .iter() - .map(HummockVersionDelta::from_rpc_protobuf) - .collect(), + .send(HummockEvent::VersionUpdate( + HummockVersionUpdate::VersionDeltas( + hummock_version_deltas + .version_deltas + .iter() + .map(HummockVersionDelta::from_rpc_protobuf) + .collect(), + ), )) .inspect_err(|e| { tracing::error!(event = ?e.0, "unable to send version delta"); @@ -121,12 +123,12 @@ impl ObserverState for HummockObserverNode { ); let _ = self .version_update_sender - .send(HummockVersionUpdate::PinnedVersion( - HummockVersion::from_rpc_protobuf( + .send(HummockEvent::VersionUpdate( + HummockVersionUpdate::PinnedVersion(HummockVersion::from_rpc_protobuf( &snapshot .hummock_version .expect("should get hummock version"), - ), + )), )) .inspect_err(|e| { tracing::error!(event = ?e.0, "unable to send full version"); @@ -140,7 +142,7 @@ impl HummockObserverNode { pub fn new( filter_key_extractor_manager: FilterKeyExtractorManagerRef, backup_reader: BackupReaderRef, - version_update_sender: UnboundedSender, + version_update_sender: UnboundedSender, write_limiter: WriteLimiterRef, ) -> Self { Self { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index d2064ffd09dee..a5322a0d7765b 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -79,8 +79,6 @@ impl Drop for HummockStorageShutdownGuard { #[derive(Clone)] pub struct HummockStorage { hummock_event_sender: UnboundedSender, - // only used in test for setting hummock version in uploader - _version_update_sender: UnboundedSender, context: CompactorContext, @@ -153,22 +151,22 @@ impl HummockStorage { .await .map_err(HummockError::read_backup_error)?; let write_limiter = Arc::new(WriteLimiter::default()); - let (version_update_tx, mut version_update_rx) = unbounded_channel(); + let (event_tx, mut event_rx) = unbounded_channel(); let observer_manager = ObserverManager::new( notification_client, HummockObserverNode::new( filter_key_extractor_manager.clone(), backup_reader.clone(), - version_update_tx.clone(), + event_tx.clone(), write_limiter.clone(), ), ) .await; observer_manager.start().await; - let hummock_version = match version_update_rx.recv().await { - Some(HummockVersionUpdate::PinnedVersion(version)) => version, + let hummock_version = match event_rx.recv().await { + Some(HummockEvent::VersionUpdate(HummockVersionUpdate::PinnedVersion(version))) => version, _ => unreachable!("the hummock observer manager is the first one to take the event tx. Should be full hummock version") }; @@ -191,7 +189,8 @@ impl HummockStorage { let seal_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); let min_current_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); let hummock_event_handler = HummockEventHandler::new( - version_update_rx, + event_tx.clone(), + event_rx, pinned_version, compactor_context.clone(), filter_key_extractor_manager.clone(), @@ -199,8 +198,6 @@ impl HummockStorage { state_store_metrics.clone(), ); - let event_tx = hummock_event_handler.event_sender(); - let instance = Self { context: compactor_context, filter_key_extractor_manager: filter_key_extractor_manager.clone(), @@ -209,7 +206,6 @@ impl HummockStorage { version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(), seal_epoch, hummock_event_sender: event_tx.clone(), - _version_update_sender: version_update_tx, pinned_version: hummock_event_handler.pinned_version(), hummock_version_reader: HummockVersionReader::new( sstable_store, @@ -471,10 +467,10 @@ impl StateStore for HummockStorage { StoreLocalStatistic::flush_all(); } - async fn clear_shared_buffer(&self, prev_epoch: u64) { + async fn clear_shared_buffer(&self) -> StorageResult<()> { let (tx, rx) = oneshot::channel(); self.hummock_event_sender - .send(HummockEvent::Clear(tx, prev_epoch)) + .send(HummockEvent::Clear(tx)) .expect("should send success"); rx.await.expect("should wait success"); @@ -482,6 +478,8 @@ impl StateStore for HummockStorage { self.min_current_epoch .store(HummockEpoch::MAX, MemOrdering::SeqCst); self.seal_epoch.store(epoch, MemOrdering::SeqCst); + + Ok(()) } fn new_local(&self, option: NewLocalOptions) -> impl Future + Send + '_ { @@ -529,12 +527,13 @@ impl HummockStorage { } /// Used in the compaction test tool - #[cfg(any(test, feature = "test"))] pub async fn update_version_and_wait(&self, version: HummockVersion) { use tokio::task::yield_now; let version_id = version.id; - self._version_update_sender - .send(HummockVersionUpdate::PinnedVersion(version)) + self.hummock_event_sender + .send(HummockEvent::VersionUpdate( + HummockVersionUpdate::PinnedVersion(version), + )) .unwrap(); loop { if self.pinned_version.load().id() >= version_id { diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 007260fef7350..1e77b5f5652bd 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -635,7 +635,7 @@ impl StateStore for RangeKvStateStore { fn seal_epoch(&self, _epoch: u64, _is_checkpoint: bool) {} #[allow(clippy::unused_async)] - async fn clear_shared_buffer(&self, _prev_epoch: u64) { + async fn clear_shared_buffer(&self) -> StorageResult<()> { unimplemented!("recovery not supported") } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 1bea7f6742c68..6e3b9b3db0fc0 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -331,10 +331,11 @@ impl StateStore for MonitoredStateStore { panic!("the state store is already monitored") } - fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_ { + fn clear_shared_buffer(&self) -> impl Future> + Send + '_ { self.inner - .clear_shared_buffer(prev_epoch) + .clear_shared_buffer() .verbose_instrument_await("store_clear_shared_buffer") + .inspect_err(|e| error!(error = %e.as_report(), "Failed in clear_shared_buffer")) } async fn new_local(&self, option: NewLocalOptions) -> Self::Local { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 8cf96a231ead0..95a5c835407d6 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -250,9 +250,13 @@ impl StateStore for TracedStateStore { self.inner.seal_epoch(epoch, is_checkpoint); } - async fn clear_shared_buffer(&self, prev_epoch: u64) { - let _span = TraceSpan::new_clear_shared_buffer_span(prev_epoch); - self.inner.clear_shared_buffer(prev_epoch).await; + async fn clear_shared_buffer(&self) -> StorageResult<()> { + let span = TraceSpan::new_clear_shared_buffer_span(); + let res = self.inner.clear_shared_buffer().await; + span.may_send_result(OperationResult::ClearSharedBuffer( + res.as_ref().map(|o| *o).into(), + )); + res } async fn new_local(&self, options: NewLocalOptions) -> Self::Local { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 5299cac9fe085..08b0663b4e220 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -154,7 +154,7 @@ impl StateStore for PanicStateStore { } #[allow(clippy::unused_async)] - async fn clear_shared_buffer(&self, _prev_epoch: u64) { + async fn clear_shared_buffer(&self) -> StorageResult<()> { panic!("should not clear shared buffer from the panic state store!"); } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 7c8353dc0f30f..0daca6aa5305d 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -191,7 +191,7 @@ pub trait StateStore: StateStoreRead + StaticSendSync + Clone { /// Clears contents in shared buffer. /// This method should only be called when dropping all actors in the local compute node. - fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_; + fn clear_shared_buffer(&self) -> impl Future> + Send + '_; fn new_local(&self, option: NewLocalOptions) -> impl Future + Send + '_; diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index f1316fe7e20c8..4502236c0a20a 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -482,8 +482,8 @@ pub mod verify { self.actual.seal_epoch(epoch, is_checkpoint) } - fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_ { - self.actual.clear_shared_buffer(prev_epoch) + fn clear_shared_buffer(&self) -> impl Future> + Send + '_ { + self.actual.clear_shared_buffer() } async fn new_local(&self, option: NewLocalOptions) -> Self::Local { @@ -927,7 +927,7 @@ pub mod boxed_state_store { fn seal_epoch(&self, epoch: u64, is_checkpoint: bool); - async fn clear_shared_buffer(&self, prev_epoch: u64); + async fn clear_shared_buffer(&self) -> StorageResult<()>; async fn new_local(&self, option: NewLocalOptions) -> BoxDynamicDispatchedLocalStateStore; @@ -948,8 +948,8 @@ pub mod boxed_state_store { self.seal_epoch(epoch, is_checkpoint); } - async fn clear_shared_buffer(&self, prev_epoch: u64) { - self.clear_shared_buffer(prev_epoch).await + async fn clear_shared_buffer(&self) -> StorageResult<()> { + self.clear_shared_buffer().await } async fn new_local(&self, option: NewLocalOptions) -> BoxDynamicDispatchedLocalStateStore { @@ -1018,8 +1018,8 @@ pub mod boxed_state_store { self.deref().sync(epoch) } - fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_ { - self.deref().clear_shared_buffer(prev_epoch) + fn clear_shared_buffer(&self) -> impl Future> + Send + '_ { + self.deref().clear_shared_buffer() } fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 9b77a9a7bf096..1cbd65fec151f 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -640,7 +640,7 @@ mod tests { drop(writer); // Recovery - test_env.storage.clear_shared_buffer(epoch2).await; + test_env.storage.clear_shared_buffer().await.unwrap(); // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -845,7 +845,7 @@ mod tests { drop(writer); // Recovery - test_env.storage.clear_shared_buffer(epoch2).await; + test_env.storage.clear_shared_buffer().await.unwrap(); // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -1072,7 +1072,7 @@ mod tests { drop(writer2); // Recovery - test_env.storage.clear_shared_buffer(epoch2).await; + test_env.storage.clear_shared_buffer().await.unwrap(); let vnodes = build_bitmap(0..VirtualNode::COUNT); let factory = KvLogStoreFactory::new( diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 385825a23e0d2..b838314729ad3 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -72,10 +72,7 @@ pub(super) enum LocalBarrierEvent { actor_ids_to_collect: HashSet, result_sender: oneshot::Sender>, }, - Reset { - prev_epoch: u64, - result_sender: oneshot::Sender<()>, - }, + Reset(oneshot::Sender<()>), ReportActorCollected { actor_id: ActorId, barrier: Barrier, @@ -226,10 +223,9 @@ impl LocalBarrierWorker { event = event_rx.recv() => { if let Some(event) = event { match event { - LocalBarrierEvent::Reset { - result_sender, prev_epoch} => { - self.reset(prev_epoch).await; - let _ = result_sender.send(()); + LocalBarrierEvent::Reset(finish_sender) => { + self.reset().await; + let _ = finish_sender.send(()); } event => { self.handle_event(event); @@ -272,7 +268,7 @@ impl LocalBarrierWorker { warn!(err=?e, "fail to send inject barrier result"); }); } - LocalBarrierEvent::Reset { .. } => { + LocalBarrierEvent::Reset(_) => { unreachable!("Reset event should be handled separately in async context") } ReportActorCollected { actor_id, barrier } => self.collect(actor_id, &barrier), diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 11eb9a44290cf..5a2bde99da491 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -231,12 +231,9 @@ impl LocalStreamManager { } /// Force stop all actors on this worker, and then drop their resources. - pub async fn reset(&self, prev_epoch: u64) { + pub async fn reset(&self) { self.local_barrier_manager - .send_and_await(|result_sender| LocalBarrierEvent::Reset { - result_sender, - prev_epoch, - }) + .send_and_await(LocalBarrierEvent::Reset) .await .expect("should receive reset") } @@ -271,7 +268,7 @@ impl LocalBarrierWorker { } /// Force stop all actors on this worker, and then drop their resources. - pub(super) async fn reset(&mut self, prev_epoch: u64) { + pub(super) async fn reset(&mut self) { let actor_handles = self.actor_manager_state.drain_actor_handles(); for (actor_id, handle) in &actor_handles { tracing::debug!("force stopping actor {}", actor_id); @@ -298,7 +295,7 @@ impl LocalBarrierWorker { m.lock().clear(); } dispatch_state_store!(&self.actor_manager.env.state_store(), store, { - store.clear_shared_buffer(prev_epoch).await; + store.clear_shared_buffer().await.unwrap(); }); self.reset_state(); self.actor_manager.env.dml_manager_ref().clear();