diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index bf74480153a0..16537f1ae8d0 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -439,8 +439,11 @@ impl CompactionTask for TwcsCompactionTask { let notify = match self.handle_compaction().await { Ok((added, deleted)) => { info!( - "Compacted SST files, input: {:?}, output: {:?}, window: {:?}", - deleted, added, self.compaction_time_window + "Compacted SST files, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}", + deleted, + added, + self.compaction_time_window, + self.waiters.len(), ); BackgroundNotify::CompactionFinished(CompactionFinished { diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index afe118810589..85a51edb9aa9 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -401,6 +401,11 @@ impl MitoEngine { }), }) } + + /// Returns the purge scheduler. + pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef { + self.inner.workers.purge_scheduler() + } } #[cfg(test)] diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 71b859cced3f..a5277bf2a496 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::ops::Range; +use std::sync::Arc; use api::v1::{ColumnSchema, Rows}; use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; @@ -23,8 +24,10 @@ use store_api::region_request::{ RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest, }; use store_api::storage::{RegionId, ScanRequest}; +use tokio::sync::Notify; use crate::config::MitoConfig; +use crate::engine::listener::CompactionListener; use crate::engine::MitoEngine; use crate::test_util::{ build_rows_for_key, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv, @@ -145,3 +148,73 @@ async fn test_compaction_region() { let vec = collect_stream_ts(stream).await; assert_eq!((0..25).map(|v| v * 1000).collect::>(), vec); } + +// For issue https://github.com/GreptimeTeam/greptimedb/issues/3633 +#[tokio::test] +async fn test_readonly_during_compaction() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new(); + let listener = Arc::new(CompactionListener::default()); + let engine = env + .create_engine_with( + MitoConfig { + // Ensure there is only one background worker for purge task. + max_background_jobs: 1, + ..Default::default() + }, + None, + Some(listener.clone()), + ) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_files", "1") + .build(); + + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + // Flush 2 SSTs for compaction. + put_and_flush(&engine, region_id, &column_schemas, 0..10).await; + put_and_flush(&engine, region_id, &column_schemas, 10..20).await; + + // Waits until the engine receives compaction finished request. + listener.wait_handle_finished().await; + + // Sets the region to read only mode. + engine.set_writable(region_id, false).unwrap(); + // Wakes up the listener. + listener.wake(); + + let notify = Arc::new(Notify::new()); + // We already sets max background jobs to 1, so we can submit a task to the + // purge scheduler to ensure all purge tasks are finished. + let job_notify = notify.clone(); + engine + .purge_scheduler() + .schedule(Box::pin(async move { + job_notify.notify_one(); + })) + .unwrap(); + notify.notified().await; + + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!( + 2, + scanner.num_files(), + "unexpected files: {:?}", + scanner.file_ids() + ); + let stream = scanner.scan().await.unwrap(); + + let vec = collect_stream_ts(stream).await; + assert_eq!((0..20).map(|v| v * 1000).collect::>(), vec); +} diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index 0629b0e92dfe..95ef82f27711 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -26,13 +26,17 @@ use tokio::sync::Notify; #[async_trait] pub trait EventListener: Send + Sync { /// Notifies the listener that a region is flushed successfully. - fn on_flush_success(&self, region_id: RegionId); + fn on_flush_success(&self, region_id: RegionId) { + let _ = region_id; + } /// Notifies the listener that the engine is stalled. - fn on_write_stall(&self); + fn on_write_stall(&self) {} /// Notifies the listener that the region starts to do flush. - async fn on_flush_begin(&self, region_id: RegionId); + async fn on_flush_begin(&self, region_id: RegionId) { + let _ = region_id; + } /// Notifies the listener that the later drop task starts running. /// Returns the gc interval if we want to override the default one. @@ -46,6 +50,12 @@ pub trait EventListener: Send + Sync { let _ = region_id; let _ = removed; } + + /// Notifies the listener that the region is going to handle the compaction + /// finished request. + async fn on_handle_compaction_finished(&self, region_id: RegionId) { + let _ = region_id; + } } pub type EventListenerRef = Arc; @@ -70,10 +80,6 @@ impl EventListener for FlushListener { self.notify.notify_one() } - - fn on_write_stall(&self) {} - - async fn on_flush_begin(&self, _region_id: RegionId) {} } /// Listener to watch stall events. @@ -98,8 +104,6 @@ impl EventListener for StallListener { self.notify.notify_one(); } - - async fn on_flush_begin(&self, _region_id: RegionId) {} } /// Listener to watch begin flush events. @@ -130,10 +134,6 @@ impl FlushTruncateListener { #[async_trait] impl EventListener for FlushTruncateListener { - fn on_flush_success(&self, _region_id: RegionId) {} - - fn on_write_stall(&self) {} - /// Calling this function will block the thread! /// Notify the listener to perform a truncate region and block the flush region job. async fn on_flush_begin(&self, region_id: RegionId) { @@ -169,12 +169,6 @@ impl DropListener { #[async_trait] impl EventListener for DropListener { - fn on_flush_success(&self, _region_id: RegionId) {} - - fn on_write_stall(&self) {} - - async fn on_flush_begin(&self, _region_id: RegionId) {} - fn on_later_drop_begin(&self, _region_id: RegionId) -> Option { Some(self.gc_duration) } @@ -185,3 +179,34 @@ impl EventListener for DropListener { self.notify.notify_one(); } } + +/// Listener on handling compaction requests. +#[derive(Default)] +pub struct CompactionListener { + handle_finished_notify: Notify, + blocker: Notify, +} + +impl CompactionListener { + /// Waits for handling compaction finished request. + pub async fn wait_handle_finished(&self) { + self.handle_finished_notify.notified().await; + } + + /// Wakes up the listener. + pub fn wake(&self) { + self.blocker.notify_one(); + } +} + +#[async_trait] +impl EventListener for CompactionListener { + async fn on_handle_compaction_finished(&self, region_id: RegionId) { + info!("Handle compaction finished request, region {region_id}"); + + self.handle_finished_notify.notify_one(); + + // Blocks current task. + self.blocker.notified().await; + } +} diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 4f0f84222a19..5efa9dae8919 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -157,7 +157,7 @@ impl TestEnv { .unwrap() } - /// Creates a new engine with specific config and manager/listener under this env. + /// Creates a new engine with specific config and manager/listener/purge_scheduler under this env. pub async fn create_engine_with( &mut self, config: MitoConfig, diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index c3fe64e83cd8..851928ec14c3 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -292,6 +292,11 @@ impl WorkerGroup { cache_manager, }) } + + /// Returns the purge scheduler. + pub(crate) fn purge_scheduler(&self) -> &SchedulerRef { + &self.purge_scheduler + } } fn region_id_to_index(id: RegionId, num_workers: usize) -> usize { @@ -819,6 +824,15 @@ impl WorkerListener { let _ = region_id; let _ = removed; } + + pub(crate) async fn on_handle_compaction_finished(&self, region_id: RegionId) { + #[cfg(any(test, feature = "test"))] + if let Some(listener) = &self.listener { + listener.on_handle_compaction_finished(region_id).await; + } + // Avoid compiler warning. + let _ = region_id; + } } #[cfg(test)] diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index c00eaa865417..5f797f894f59 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_telemetry::{error, info}; +use common_telemetry::{error, info, warn}; use store_api::logstore::LogStore; use store_api::storage::RegionId; @@ -55,7 +55,13 @@ impl RegionWorkerLoop { region_id: RegionId, mut request: CompactionFinished, ) { + self.listener.on_handle_compaction_finished(region_id).await; + let Some(region) = self.regions.writable_region_or(region_id, &mut request) else { + warn!( + "Unable to finish the compaction task for a read only region {}", + region_id + ); return; }; diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index a4b0ebccd023..2f3e184bf748 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -191,6 +191,10 @@ impl RegionWorkerLoop { mut request: FlushFinished, ) { let Some(region) = self.regions.writable_region_or(region_id, &mut request) else { + warn!( + "Unable to finish the flush task for a read only region {}", + region_id + ); return; };