Skip to content

Commit

Permalink
test: use EventListener to test the flush task during truncate
Browse files Browse the repository at this point in the history
  • Loading branch information
DevilExileSu committed Sep 14, 2023
1 parent 05d7425 commit d846c56
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 70 deletions.
18 changes: 0 additions & 18 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,6 @@ impl MitoEngine {
pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
self.inner.workers.get_region(id)
}

#[cfg(test)]
pub(crate) async fn handle_worker_request(
&self,
region_id: RegionId,
request: WorkerRequest,
) -> Result<()> {
self.inner.handle_worker_request(region_id, request).await
}
}

/// Inner struct of [MitoEngine].
Expand Down Expand Up @@ -166,15 +157,6 @@ impl EngineInner {
region.set_writable(writable);
Ok(())
}

#[cfg(test)]
pub(crate) async fn handle_worker_request(
&self,
region_id: RegionId,
request: WorkerRequest,
) -> Result<()> {
self.workers.submit_to_worker(region_id, request).await
}
}

#[async_trait]
Expand Down
42 changes: 42 additions & 0 deletions src/mito2/src/engine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
use std::sync::Arc;

use async_trait::async_trait;
use common_telemetry::info;
use store_api::storage::RegionId;
use tokio::sync::Notify;

/// Mito engine background event listener.
#[async_trait]
pub trait EventListener: Send + Sync {
/// Notifies the listener that a region is flushed successfully.
fn on_flush_success(&self, region_id: RegionId);

/// Notifies the listener that the engine is stalled.
fn on_write_stall(&self);

async fn on_handle_finished_begin(&self, region_id: RegionId);
}

pub type EventListenerRef = Arc<dyn EventListener>;
Expand All @@ -50,6 +54,7 @@ impl FlushListener {
}
}

#[async_trait]
impl EventListener for FlushListener {
fn on_flush_success(&self, region_id: RegionId) {
info!("Region {} flush successfully", region_id);
Expand All @@ -58,6 +63,8 @@ impl EventListener for FlushListener {
}

fn on_write_stall(&self) {}

async fn on_handle_finished_begin(&self, _region_id: RegionId) {}
}

/// Listener to watch stall events.
Expand All @@ -79,6 +86,7 @@ impl StallListener {
}
}

#[async_trait]
impl EventListener for StallListener {
fn on_flush_success(&self, _region_id: RegionId) {}

Expand All @@ -87,4 +95,38 @@ impl EventListener for StallListener {

self.notify.notify_one();
}

async fn on_handle_finished_begin(&self, _region_id: RegionId) {}
}

pub struct HandleFinishedListener {
notify: Notify,
}

impl HandleFinishedListener {
pub fn new() -> HandleFinishedListener {
HandleFinishedListener {
notify: Notify::new(),
}
}

pub fn notify(&self) {
self.notify.notify_one();
}

pub async fn wait(&self) {
self.notify.notified().await;
}
}

#[async_trait]
impl EventListener for HandleFinishedListener {
fn on_flush_success(&self, _region_id: RegionId) {}

fn on_write_stall(&self) {}

async fn on_handle_finished_begin(&self, region_id: RegionId) {
info!("Region {} begin handle finished flush", region_id);
self.wait().await;
}
}
84 changes: 32 additions & 52 deletions src/mito2/src/engine/truncate_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,25 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use api::v1::Rows;
use common_recordbatch::RecordBatches;
use object_store::util::join_path;
use smallvec::SmallVec;
use common_telemetry::init_default_ut_logging;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
};
use store_api::storage::RegionId;
use tokio::sync::oneshot;

use super::ScanRequest;
use crate::config::MitoConfig;
use crate::request::{BackgroundNotify, FlushFinished, WorkerRequest};
use crate::sst::file::{FileId, FileMeta, FileTimeRange};
use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv};
use crate::engine::listener::HandleFinishedListener;
use crate::test_util::{
build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, MockWriteBufferManager,
TestEnv,
};

#[tokio::test]
async fn test_engine_truncate_region_basic() {
Expand Down Expand Up @@ -270,13 +271,21 @@ async fn test_engine_truncate_reopen() {

#[tokio::test]
async fn test_engine_truncate_during_flush() {
init_default_ut_logging();
let mut env = TestEnv::with_prefix("truncate-during-flush");
let engine = env.create_engine(MitoConfig::default()).await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(HandleFinishedListener::new());
let engine = env
.create_engine_with(
MitoConfig::default(),
write_buffer_manager.clone(),
Some(listener.clone()),
)
.await;

// Create the region.
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();

let column_schemas = rows_schema(&request);
engine
Expand All @@ -293,62 +302,33 @@ async fn test_engine_truncate_during_flush() {

let region = engine.get_region(region_id).unwrap();

// Create a parquet file.
// Simulate that the `do_flush()` function is currently being executed.
let file_id = FileId::random();
let file_name = format!("{}.parquet", file_id);
let file_meta = FileMeta {
region_id,
file_id,
time_range: FileTimeRange::default(),
level: 0,
file_size: 0,
};
env.get_object_store()
.unwrap()
.write(&join_path(&region_dir, &file_name), vec![])
.await
.unwrap();

let (sender, receiver) = oneshot::channel();
let version_data = region.version_control.current();
let entry_id = version_data.last_entry_id;
let sequence = version_data.committed_sequence;

let flushed_entry_id = region.version_control.current().last_entry_id;

let current_version = region.version_control.current().version;
assert_eq!(current_version.truncated_entry_id, None);
// Flush reigon.
let engine_cloned = engine.clone();
tokio::spawn(async move {
flush_region(&engine_cloned, region_id).await;
});

// Truncate the region.
// Truncate the region
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.await
.unwrap();

// The flush task is finished, and the `handle_flush_finished()` is executed.
let finished = FlushFinished {
region_id,
file_metas: vec![file_meta.clone()],
flushed_entry_id,
flushed_sequence: flushed_entry_id,
memtables_to_remove: SmallVec::new(),
file_purger: region.file_purger.clone(),
senders: vec![sender],
};

let worker_request = WorkerRequest::Background {
region_id,
notify: BackgroundNotify::FlushFinished(finished),
};

engine
.handle_worker_request(region_id, worker_request)
.await
.unwrap();
listener.notify();

let _ = receiver.await;
let version_data = region.version_control.current();
let truncated_entry_id = version_data.version.truncated_entry_id.unwrap();
let truncated_sequence = version_data.version.flushed_sequence;

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request.clone()).unwrap();
assert_eq!(0, scanner.num_files());
assert_eq!(entry_id, truncated_entry_id);
assert_eq!(sequence, truncated_sequence);

// Put data to the region.
let rows = Rows {
Expand Down
9 changes: 9 additions & 0 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,15 @@ impl WorkerListener {
listener.on_write_stall();
}
}

pub(crate) async fn on_handle_finishd_begin(&self, region_id: RegionId) {
#[cfg(test)]
if let Some(listener) = &self.listener {
listener.on_handle_finished_begin(region_id).await;
}
// Avoid compiler warning.
let _ = region_id;
}
}

#[cfg(test)]
Expand Down

0 comments on commit d846c56

Please sign in to comment.