Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement truncate region for mito2 #2335

Merged
merged 20 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl RegionServerInner {
| RegionRequest::Delete(_)
| RegionRequest::Alter(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_) => RegionChange::None,
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_) => RegionChange::None,
DevilExileSu marked this conversation as resolved.
Show resolved Hide resolved
};

let engine = match &region_change {
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub(crate) mod listener;
mod open_test;
#[cfg(test)]
mod projection_test;
#[cfg(test)]
mod truncate_test;

use std::sync::Arc;

Expand Down
62 changes: 62 additions & 0 deletions src/mito2/src/engine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@

use std::sync::Arc;

use async_trait::async_trait;
use common_telemetry::info;
use store_api::storage::RegionId;
use tokio::sync::Notify;

/// Mito engine background event listener.
#[async_trait]
pub trait EventListener: Send + Sync {
/// Notifies the listener that a region is flushed successfully.
fn on_flush_success(&self, region_id: RegionId);

/// Notifies the listener that the engine is stalled.
fn on_write_stall(&self);

/// Notifies the listener that the region starts to do flush.
async fn on_flush_begin(&self, region_id: RegionId);
DevilExileSu marked this conversation as resolved.
Show resolved Hide resolved
}

pub type EventListenerRef = Arc<dyn EventListener>;
Expand All @@ -50,6 +55,7 @@ impl FlushListener {
}
}

#[async_trait]
impl EventListener for FlushListener {
fn on_flush_success(&self, region_id: RegionId) {
info!("Region {} flush successfully", region_id);
Expand All @@ -58,6 +64,8 @@ impl EventListener for FlushListener {
}

fn on_write_stall(&self) {}

async fn on_flush_begin(&self, _region_id: RegionId) {}
}

/// Listener to watch stall events.
Expand All @@ -79,6 +87,7 @@ impl StallListener {
}
}

#[async_trait]
impl EventListener for StallListener {
fn on_flush_success(&self, _region_id: RegionId) {}

Expand All @@ -87,4 +96,57 @@ impl EventListener for StallListener {

self.notify.notify_one();
}

async fn on_flush_begin(&self, _region_id: RegionId) {}
}

/// Listener to watch begin flush events.
///
/// Crate a background thread to execute flush region, and the main thread calls `wait_truncate()`
/// to block and wait for `on_flush_region()`.
/// When the background thread calls `on_flush_begin()`, the main thread is notified to truncate
/// region, and background thread thread blocks and waits for `notify_flush()` to continue flushing.
pub struct FlushTruncateListener {
DevilExileSu marked this conversation as resolved.
Show resolved Hide resolved
/// Notify flush operation.
notify_flush: Notify,
/// Notify truncate operation.
notify_truncate: Notify,
}

impl FlushTruncateListener {
/// Creates a new listener.
pub fn new() -> FlushTruncateListener {
FlushTruncateListener {
notify_flush: Notify::new(),
notify_truncate: Notify::new(),
}
}

/// Notify flush region to proceed.
pub fn notify_flush(&self) {
self.notify_flush.notify_one();
}

/// Wait for a truncate event.
pub async fn wait_truncate(&self) {
self.notify_truncate.notified().await;
}
}

#[async_trait]
impl EventListener for FlushTruncateListener {
fn on_flush_success(&self, _region_id: RegionId) {}

fn on_write_stall(&self) {}

/// Calling this function will block the thread!
/// Notify the listener to perform a truncate region and block the flush region job.
async fn on_flush_begin(&self, region_id: RegionId) {
info!(
"Region {} begin do flush, notify region to truncate",
region_id
);
self.notify_truncate.notify_one();
self.notify_flush.notified().await;
}
}
Loading
Loading