Skip to content

Commit

Permalink
fix: fix listener error
Browse files Browse the repository at this point in the history
  • Loading branch information
DevilExileSu committed Sep 14, 2023
1 parent d846c56 commit 4aa297a
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 48 deletions.
40 changes: 23 additions & 17 deletions src/mito2/src/engine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub trait EventListener: Send + Sync {
/// Notifies the listener that the engine is stalled.
fn on_write_stall(&self);

async fn on_handle_finished_begin(&self, region_id: RegionId);
async fn on_flush_begin(&self, region_id: RegionId);
}

pub type EventListenerRef = Arc<dyn EventListener>;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl EventListener for FlushListener {

fn on_write_stall(&self) {}

async fn on_handle_finished_begin(&self, _region_id: RegionId) {}
async fn on_flush_begin(&self, _region_id: RegionId) {}
}

/// Listener to watch stall events.
Expand Down Expand Up @@ -96,37 +96,43 @@ impl EventListener for StallListener {
self.notify.notify_one();
}

async fn on_handle_finished_begin(&self, _region_id: RegionId) {}
async fn on_flush_begin(&self, _region_id: RegionId) {}
}

pub struct HandleFinishedListener {
notify: Notify,
pub struct FlushTruncateListener {
notify_flush: Notify,
notify_truncate: Notify,
}

impl HandleFinishedListener {
pub fn new() -> HandleFinishedListener {
HandleFinishedListener {
notify: Notify::new(),
impl FlushTruncateListener {
pub fn new() -> FlushTruncateListener {
FlushTruncateListener {
notify_flush: Notify::new(),
notify_truncate: Notify::new(),
}
}

pub fn notify(&self) {
self.notify.notify_one();
pub fn notify_flush(&self) {
self.notify_flush.notify_one();
}

pub async fn wait(&self) {
self.notify.notified().await;
pub async fn wait_truncate(&self) {
self.notify_truncate.notified().await;
}
}

#[async_trait]
impl EventListener for HandleFinishedListener {
impl EventListener for FlushTruncateListener {
fn on_flush_success(&self, _region_id: RegionId) {}

fn on_write_stall(&self) {}

async fn on_handle_finished_begin(&self, region_id: RegionId) {
info!("Region {} begin handle finished flush", region_id);
self.wait().await;
async fn on_flush_begin(&self, region_id: RegionId) {
info!(
"Region {} begin do flush, notify region to truncate",
region_id
);
self.notify_truncate.notify_one();
self.notify_flush.notified().await;
}
}
59 changes: 36 additions & 23 deletions src/mito2/src/engine/truncate_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::Duration;

use api::v1::Rows;
use common_recordbatch::RecordBatches;
use common_telemetry::init_default_ut_logging;
use common_telemetry::{info, init_default_ut_logging};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
Expand All @@ -27,10 +27,9 @@ use store_api::storage::RegionId;

use super::ScanRequest;
use crate::config::MitoConfig;
use crate::engine::listener::HandleFinishedListener;
use crate::engine::listener::FlushTruncateListener;
use crate::test_util::{
build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, MockWriteBufferManager,
TestEnv,
build_rows, put_rows, rows_schema, CreateRequestBuilder, MockWriteBufferManager, TestEnv,
};

#[tokio::test]
Expand Down Expand Up @@ -174,7 +173,7 @@ async fn test_engine_truncate_after_flush() {
.unwrap();

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request.clone()).unwrap();
let scanner = engine.scanner(region_id, request.clone()).unwrap();
assert_eq!(1, scanner.num_files());

// Truncate the region.
Expand Down Expand Up @@ -208,7 +207,7 @@ async fn test_engine_truncate_after_flush() {

tokio::time::sleep(Duration::from_millis(100)).await;

let scanner = engine.scan(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(0, scanner.num_files());
}

Expand Down Expand Up @@ -274,7 +273,7 @@ async fn test_engine_truncate_during_flush() {
init_default_ut_logging();
let mut env = TestEnv::with_prefix("truncate-during-flush");
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(HandleFinishedListener::new());
let listener = Arc::new(FlushTruncateListener::new());
let engine = env
.create_engine_with(
MitoConfig::default(),
Expand All @@ -286,6 +285,7 @@ async fn test_engine_truncate_during_flush() {
// Create the region.
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();

let column_schemas = rows_schema(&request);
engine
Expand All @@ -308,41 +308,54 @@ async fn test_engine_truncate_during_flush() {

// Flush reigon.
let engine_cloned = engine.clone();
tokio::spawn(async move {
flush_region(&engine_cloned, region_id).await;
let flush_task = tokio::spawn(async move {
info!("do flush task!!!!");
engine_cloned
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.await
});

// Truncate the region
// Wait truncate before flush memtable.
listener.wait_truncate().await;

// Truncate the region.
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.await
.unwrap();

listener.notify();
// Notify region to continue flushing.
listener.notify_flush();

// Wait handle flushed finish.
let _err = flush_task.await.unwrap().unwrap_err();

// Check sequences and entry id.
let version_data = region.version_control.current();
let truncated_entry_id = version_data.version.truncated_entry_id.unwrap();
let truncated_entry_id = version_data.version.truncated_entry_id;
let truncated_sequence = version_data.version.flushed_sequence;

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request.clone()).unwrap();
let scanner = engine.scanner(region_id, request.clone()).unwrap();
assert_eq!(0, scanner.num_files());
assert_eq!(entry_id, truncated_entry_id);
assert_eq!(Some(entry_id), truncated_entry_id);
assert_eq!(sequence, truncated_sequence);

// Put data to the region.
let rows = Rows {
schema: column_schemas,
rows: build_rows(5, 8),
};
put_rows(&engine, region_id, rows).await;

// Flush the region.
// Reopen the engine.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();

let region = engine.get_region(region_id).unwrap();
let current_version = region.version_control.current().version;
assert_eq!(current_version.truncated_entry_id, None);
}
3 changes: 3 additions & 0 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::worker::WorkerListener;

/// Global write buffer (memtable) manager.
///
Expand Down Expand Up @@ -186,6 +187,7 @@ pub(crate) struct RegionFlushTask {
pub(crate) access_layer: AccessLayerRef,
pub(crate) memtable_builder: MemtableBuilderRef,
pub(crate) file_purger: FilePurgerRef,
pub(crate) listener: WorkerListener,
}

impl RegionFlushTask {
Expand Down Expand Up @@ -227,6 +229,7 @@ impl RegionFlushTask {

/// Runs the flush task.
async fn do_flush(&mut self, version_data: VersionControlData) {
self.listener.on_flush_begin(self.region_id).await;
let worker_request = match self.flush_memtables(&version_data.version).await {
Ok(file_metas) => {
let memtables_to_remove = version_data
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/region/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl VersionControl {
VersionBuilder::from_version(version)
.apply_edit(edit, purger)
.remove_memtables(memtables_to_remove)
.truncated_entry_id(None)
.build(),
);

Expand Down Expand Up @@ -240,7 +241,7 @@ impl VersionBuilder {
ssts: version.ssts.clone(),
flushed_entry_id: version.flushed_entry_id,
flushed_sequence: version.flushed_sequence,
truncated_entry_id: None,
truncated_entry_id: version.truncated_entry_id,
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ impl<S> RegionWorkerLoop<S> {
}

/// Wrapper that only calls event listener in tests.
#[derive(Default)]
#[derive(Default, Clone)]
pub(crate) struct WorkerListener {
#[cfg(test)]
listener: Option<crate::engine::listener::EventListenerRef>,
Expand Down Expand Up @@ -574,10 +574,10 @@ impl WorkerListener {
}
}

pub(crate) async fn on_handle_finishd_begin(&self, region_id: RegionId) {
pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
#[cfg(test)]
if let Some(listener) = &self.listener {
listener.on_handle_finished_begin(region_id).await;
listener.on_flush_begin(region_id).await;
}
// Avoid compiler warning.
let _ = region_id;
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/worker/handle_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region.file_purger.clone(),
);
region.update_flush_millis();
info!(
"truncated_entry_id = {:?}",
region.version_control.current().version.truncated_entry_id
);

// Delete wal.
info!(
Expand Down Expand Up @@ -197,6 +201,7 @@ impl<S> RegionWorkerLoop<S> {
access_layer: region.access_layer.clone(),
memtable_builder: self.memtable_builder.clone(),
file_purger: region.file_purger.clone(),
listener: self.listener.clone(),
}
}
}
7 changes: 3 additions & 4 deletions src/mito2/src/worker/handle_truncate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ use common_telemetry::info;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;

use crate::error::{RegionNotFoundSnafu, Result};
use crate::error::Result;
use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTruncate};
use crate::worker::RegionWorkerLoop;

impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_truncate_request(&mut self, region_id: RegionId) -> Result<Output> {
let Some(region) = self.regions.get_region(region_id) else {
return RegionNotFoundSnafu { region_id }.fail();
};
let region = self.regions.writable_region(region_id)?;

info!("Try to truncate region {}", region_id);

let version_data = region.version_control.current();
Expand Down

0 comments on commit 4aa297a

Please sign in to comment.