diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index 2e7632b04d2c..2c1a9fba81af 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -30,6 +30,7 @@ pub trait EventListener: Send + Sync { /// Notifies the listener that the engine is stalled. fn on_write_stall(&self); + /// Notifies the listener that the region starts to do flush. async fn on_flush_begin(&self, region_id: RegionId); } @@ -99,12 +100,21 @@ impl EventListener for StallListener { async fn on_flush_begin(&self, _region_id: RegionId) {} } +/// Listener to watch begin flush events. +/// +/// Crate a background thread to execute flush region, and the main thread calls `wait_truncate()` +/// to block and wait for `on_flush_region()`. +/// When the background thread calls `on_flush_begin()`, the main thread is notified to truncate +/// region, and background thread thread blocks and waits for `notify_flush()` to continue flusing. pub struct FlushTruncateListener { + /// Notify flush operation. notify_flush: Notify, + /// Notify truncate operation. notify_truncate: Notify, } impl FlushTruncateListener { + /// Creates a new listener. pub fn new() -> FlushTruncateListener { FlushTruncateListener { notify_flush: Notify::new(), @@ -112,10 +122,12 @@ impl FlushTruncateListener { } } + /// Notify flush region to proceed. pub fn notify_flush(&self) { self.notify_flush.notify_one(); } + /// Wait for a truncate event. pub async fn wait_truncate(&self) { self.notify_truncate.notified().await; } @@ -127,6 +139,8 @@ impl EventListener for FlushTruncateListener { fn on_write_stall(&self) {} + /// Calling this function will block the thread! + /// Notify the listener to perform a truncate region and block the flush region job. async fn on_flush_begin(&self, region_id: RegionId) { info!( "Region {} begin do flush, notify region to truncate", diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 68899c788771..5c3446d32bf5 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -59,10 +59,13 @@ pub struct RegionRemove { pub region_id: RegionId, } +/// Last data truncted in the region. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionTruncate { pub region_id: RegionId, + /// Last WAL entry id of truncated data. pub truncated_entry_id: EntryId, + // Last sequence number of truncated data. pub truncated_sequence: SequenceNumber, }