diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index 2c0f689b142b..2e7632b04d2c 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -30,7 +30,7 @@ pub trait EventListener: Send + Sync { /// Notifies the listener that the engine is stalled. fn on_write_stall(&self); - async fn on_handle_finished_begin(&self, region_id: RegionId); + async fn on_flush_begin(&self, region_id: RegionId); } pub type EventListenerRef = Arc; @@ -64,7 +64,7 @@ impl EventListener for FlushListener { fn on_write_stall(&self) {} - async fn on_handle_finished_begin(&self, _region_id: RegionId) {} + async fn on_flush_begin(&self, _region_id: RegionId) {} } /// Listener to watch stall events. @@ -96,37 +96,43 @@ impl EventListener for StallListener { self.notify.notify_one(); } - async fn on_handle_finished_begin(&self, _region_id: RegionId) {} + async fn on_flush_begin(&self, _region_id: RegionId) {} } -pub struct HandleFinishedListener { - notify: Notify, +pub struct FlushTruncateListener { + notify_flush: Notify, + notify_truncate: Notify, } -impl HandleFinishedListener { - pub fn new() -> HandleFinishedListener { - HandleFinishedListener { - notify: Notify::new(), +impl FlushTruncateListener { + pub fn new() -> FlushTruncateListener { + FlushTruncateListener { + notify_flush: Notify::new(), + notify_truncate: Notify::new(), } } - pub fn notify(&self) { - self.notify.notify_one(); + pub fn notify_flush(&self) { + self.notify_flush.notify_one(); } - pub async fn wait(&self) { - self.notify.notified().await; + pub async fn wait_truncate(&self) { + self.notify_truncate.notified().await; } } #[async_trait] -impl EventListener for HandleFinishedListener { +impl EventListener for FlushTruncateListener { fn on_flush_success(&self, _region_id: RegionId) {} fn on_write_stall(&self) {} - async fn on_handle_finished_begin(&self, region_id: RegionId) { - info!("Region {} begin handle finished flush", region_id); - self.wait().await; + async fn on_flush_begin(&self, region_id: RegionId) { + info!( + "Region {} begin do flush, notify region to truncate", + region_id + ); + self.notify_truncate.notify_one(); + self.notify_flush.notified().await; } } diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index c89e438910f7..8d1f9ad520c5 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -18,7 +18,7 @@ use std::time::Duration; use api::v1::Rows; use common_recordbatch::RecordBatches; -use common_telemetry::init_default_ut_logging; +use common_telemetry::{info, init_default_ut_logging}; use store_api::region_engine::RegionEngine; use store_api::region_request::{ RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, @@ -27,10 +27,9 @@ use store_api::storage::RegionId; use super::ScanRequest; use crate::config::MitoConfig; -use crate::engine::listener::HandleFinishedListener; +use crate::engine::listener::FlushTruncateListener; use crate::test_util::{ - build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, MockWriteBufferManager, - TestEnv, + build_rows, put_rows, rows_schema, CreateRequestBuilder, MockWriteBufferManager, TestEnv, }; #[tokio::test] @@ -274,7 +273,7 @@ async fn test_engine_truncate_during_flush() { init_default_ut_logging(); let mut env = TestEnv::with_prefix("truncate-during-flush"); let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); - let listener = Arc::new(HandleFinishedListener::new()); + let listener = Arc::new(FlushTruncateListener::new()); let engine = env .create_engine_with( MitoConfig::default(), @@ -286,6 +285,7 @@ async fn test_engine_truncate_during_flush() { // Create the region. let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); let column_schemas = rows_schema(&request); engine @@ -308,41 +308,54 @@ async fn test_engine_truncate_during_flush() { // Flush reigon. let engine_cloned = engine.clone(); - tokio::spawn(async move { - flush_region(&engine_cloned, region_id).await; + let flush_task = tokio::spawn(async move { + info!("do flush task!!!!"); + engine_cloned + .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .await }); - // Truncate the region + // Wait truncate before flush memtable. + listener.wait_truncate().await; + + // Truncate the region. engine .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); - listener.notify(); + // Notify region to continue flushing. + listener.notify_flush(); + + // Wait handle flushed finish. + let _err = flush_task.await.unwrap().unwrap_err(); + // Check sequences and entry id. let version_data = region.version_control.current(); - let truncated_entry_id = version_data.version.truncated_entry_id.unwrap(); + let truncated_entry_id = version_data.version.truncated_entry_id; let truncated_sequence = version_data.version.flushed_sequence; let request = ScanRequest::default(); let scanner = engine.scan(region_id, request.clone()).unwrap(); assert_eq!(0, scanner.num_files()); - assert_eq!(entry_id, truncated_entry_id); + assert_eq!(Some(entry_id), truncated_entry_id); assert_eq!(sequence, truncated_sequence); - // Put data to the region. - let rows = Rows { - schema: column_schemas, - rows: build_rows(5, 8), - }; - put_rows(&engine, region_id, rows).await; - - // Flush the region. + // Reopen the engine. + let engine = env.reopen_engine(engine, MitoConfig::default()).await; engine - .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + }), + ) .await .unwrap(); + let region = engine.get_region(region_id).unwrap(); let current_version = region.version_control.current().version; assert_eq!(current_version.truncated_entry_id, None); } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 32db62a33adf..ea78f585da79 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -40,7 +40,7 @@ use crate::schedule::scheduler::{Job, SchedulerRef}; use crate::sst::file::{FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::parquet::WriteOptions; -use crate::worker::send_result; +use crate::worker::{send_result, WorkerListener}; /// Global write buffer (memtable) manager. /// @@ -187,6 +187,7 @@ pub(crate) struct RegionFlushTask { pub(crate) access_layer: AccessLayerRef, pub(crate) memtable_builder: MemtableBuilderRef, pub(crate) file_purger: FilePurgerRef, + pub(crate) listener: WorkerListener, } impl RegionFlushTask { @@ -222,6 +223,7 @@ impl RegionFlushTask { /// Runs the flush task. async fn do_flush(&mut self, version_data: VersionControlData) { + self.listener.on_flush_begin(self.region_id).await; let worker_request = match self.flush_memtables(&version_data.version).await { Ok(file_metas) => { let memtables_to_remove = version_data diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index f9fc2ee83727..0702a258798e 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -105,6 +105,7 @@ impl VersionControl { VersionBuilder::from_version(version) .apply_edit(edit, purger) .remove_memtables(memtables_to_remove) + .truncated_entry_id(None) .build(), ); @@ -240,7 +241,7 @@ impl VersionBuilder { ssts: version.ssts.clone(), flushed_entry_id: version.flushed_entry_id, flushed_sequence: version.flushed_sequence, - truncated_entry_id: None, + truncated_entry_id: version.truncated_entry_id, } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 8865b18a927a..4977c0bd108f 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -554,7 +554,7 @@ impl RegionWorkerLoop { } /// Wrapper that only calls event listener in tests. -#[derive(Default)] +#[derive(Default, Clone)] pub(crate) struct WorkerListener { #[cfg(test)] listener: Option, @@ -586,10 +586,10 @@ impl WorkerListener { } } - pub(crate) async fn on_handle_finishd_begin(&self, region_id: RegionId) { + pub(crate) async fn on_flush_begin(&self, region_id: RegionId) { #[cfg(test)] if let Some(listener) = &self.listener { - listener.on_handle_finished_begin(region_id).await; + listener.on_flush_begin(region_id).await; } // Avoid compiler warning. let _ = region_id; diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 0bc83b6a364c..d26ce908fd61 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -35,7 +35,6 @@ impl RegionWorkerLoop { region_id: RegionId, mut request: FlushFinished, ) { - self.listener.on_handle_finishd_begin(region_id).await; let Some(region) = self.regions.get_region(region_id) else { // We may dropped or closed the region. request.on_failure(RegionNotFoundSnafu { region_id }.build()); @@ -73,6 +72,10 @@ impl RegionWorkerLoop { region.file_purger.clone(), ); region.update_flush_millis(); + info!( + "truncated_entry_id = {:?}", + region.version_control.current().version.truncated_entry_id + ); // Delete wal. info!( @@ -205,6 +208,7 @@ impl RegionWorkerLoop { access_layer: region.access_layer.clone(), memtable_builder: self.memtable_builder.clone(), file_purger: region.file_purger.clone(), + listener: self.listener.clone(), } } }