Skip to content

Commit

Permalink
feat: implement truncate region for mito2 (GreptimeTeam#2335)
Browse files Browse the repository at this point in the history
* feat: implement truncate region for mito2.

* chore: add license header and fix typos

* Update src/mito2/src/worker/handle_truncate.rs

Co-authored-by: Yingwen <[email protected]>

* cr

* chore: consider the flush task being executed before truncating the region.

* test

* feat: check flush and compaction tasks

* chore: remove useless changes

* Update src/mito2/src/manifest/action.rs

Co-authored-by: Yingwen <[email protected]>

* Update src/mito2/src/worker/handle_flush.rs

Co-authored-by: Yingwen <[email protected]>

* chore: CR, consider sequence number

* test: use EventListener to test the flush task during truncate

* fix: fix listener error

* Update src/mito2/src/engine/truncate_test.rs

Co-authored-by: Yingwen <[email protected]>

* chore: cr

* fix: remove set None

* Update src/mito2/src/region/version.rs

Co-authored-by: dennis zhuang <[email protected]>

* Update src/mito2/src/worker/handle_flush.rs

Co-authored-by: dennis zhuang <[email protected]>

* Update src/mito2/src/worker/handle_truncate.rs

Co-authored-by: dennis zhuang <[email protected]>

* doc: add some doc for FlushTruncateListener and RegionTruncate

---------

Co-authored-by: Yingwen <[email protected]>
Co-authored-by: dennis zhuang <[email protected]>
  • Loading branch information
3 people authored and paomian committed Oct 19, 2023
1 parent 1fe3e36 commit 35bdd28
Show file tree
Hide file tree
Showing 16 changed files with 623 additions and 14 deletions.
3 changes: 2 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl RegionServerInner {
| RegionRequest::Delete(_)
| RegionRequest::Alter(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_) => RegionChange::None,
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_) => RegionChange::None,
};

let engine = match &region_change {
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub(crate) mod listener;
mod open_test;
#[cfg(test)]
mod projection_test;
#[cfg(test)]
mod truncate_test;

use std::sync::Arc;

Expand Down
62 changes: 62 additions & 0 deletions src/mito2/src/engine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@
use std::sync::Arc;

use async_trait::async_trait;
use common_telemetry::info;
use store_api::storage::RegionId;
use tokio::sync::Notify;

/// Mito engine background event listener.
#[async_trait]
pub trait EventListener: Send + Sync {
/// Notifies the listener that a region is flushed successfully.
fn on_flush_success(&self, region_id: RegionId);

/// Notifies the listener that the engine is stalled.
fn on_write_stall(&self);

/// Notifies the listener that the region starts to do flush.
async fn on_flush_begin(&self, region_id: RegionId);
}

pub type EventListenerRef = Arc<dyn EventListener>;
Expand All @@ -50,6 +55,7 @@ impl FlushListener {
}
}

#[async_trait]
impl EventListener for FlushListener {
fn on_flush_success(&self, region_id: RegionId) {
info!("Region {} flush successfully", region_id);
Expand All @@ -58,6 +64,8 @@ impl EventListener for FlushListener {
}

fn on_write_stall(&self) {}

async fn on_flush_begin(&self, _region_id: RegionId) {}
}

/// Listener to watch stall events.
Expand All @@ -79,6 +87,7 @@ impl StallListener {
}
}

#[async_trait]
impl EventListener for StallListener {
fn on_flush_success(&self, _region_id: RegionId) {}

Expand All @@ -87,4 +96,57 @@ impl EventListener for StallListener {

self.notify.notify_one();
}

async fn on_flush_begin(&self, _region_id: RegionId) {}
}

/// Listener to watch begin flush events.
///
/// Crate a background thread to execute flush region, and the main thread calls `wait_truncate()`
/// to block and wait for `on_flush_region()`.
/// When the background thread calls `on_flush_begin()`, the main thread is notified to truncate
/// region, and background thread thread blocks and waits for `notify_flush()` to continue flushing.
pub struct FlushTruncateListener {
/// Notify flush operation.
notify_flush: Notify,
/// Notify truncate operation.
notify_truncate: Notify,
}

impl FlushTruncateListener {
/// Creates a new listener.
pub fn new() -> FlushTruncateListener {
FlushTruncateListener {
notify_flush: Notify::new(),
notify_truncate: Notify::new(),
}
}

/// Notify flush region to proceed.
pub fn notify_flush(&self) {
self.notify_flush.notify_one();
}

/// Wait for a truncate event.
pub async fn wait_truncate(&self) {
self.notify_truncate.notified().await;
}
}

#[async_trait]
impl EventListener for FlushTruncateListener {
fn on_flush_success(&self, _region_id: RegionId) {}

fn on_write_stall(&self) {}

/// Calling this function will block the thread!
/// Notify the listener to perform a truncate region and block the flush region job.
async fn on_flush_begin(&self, region_id: RegionId) {
info!(
"Region {} begin do flush, notify region to truncate",
region_id
);
self.notify_truncate.notify_one();
self.notify_flush.notified().await;
}
}
Loading

0 comments on commit 35bdd28

Please sign in to comment.