Skip to content

Commit

Permalink
Merge pull request #2 from evenyag/fix/remove-compaction-file
Browse files Browse the repository at this point in the history
test: add test for compaction failure
  • Loading branch information
v0y4g3r authored Apr 3, 2024
2 parents 05a403b + 4d03758 commit c8ec6c6
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 23 deletions.
7 changes: 5 additions & 2 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,11 @@ impl CompactionTask for TwcsCompactionTask {
let notify = match self.handle_compaction().await {
Ok((added, deleted)) => {
info!(
"Compacted SST files, input: {:?}, output: {:?}, window: {:?}",
deleted, added, self.compaction_time_window
"Compacted SST files, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}",
deleted,
added,
self.compaction_time_window,
self.waiters.len(),
);

BackgroundNotify::CompactionFinished(CompactionFinished {
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ impl MitoEngine {
}),
})
}

/// Returns the purge scheduler.
pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
self.inner.workers.purge_scheduler()
}
}

#[cfg(test)]
Expand Down
73 changes: 73 additions & 0 deletions src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::ops::Range;
use std::sync::Arc;

use api::v1::{ColumnSchema, Rows};
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
Expand All @@ -23,8 +24,10 @@ use store_api::region_request::{
RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::Notify;

use crate::config::MitoConfig;
use crate::engine::listener::CompactionListener;
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows_for_key, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv,
Expand Down Expand Up @@ -145,3 +148,73 @@ async fn test_compaction_region() {
let vec = collect_stream_ts(stream).await;
assert_eq!((0..25).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}

// For issue https://github.com/GreptimeTeam/greptimedb/issues/3633
#[tokio::test]
async fn test_readonly_during_compaction() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let listener = Arc::new(CompactionListener::default());
let engine = env
.create_engine_with(
MitoConfig {
// Ensure there is only one background worker for purge task.
max_background_jobs: 1,
..Default::default()
},
None,
Some(listener.clone()),
)
.await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_files", "1")
.build();

let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;

// Waits until the engine receives compaction finished request.
listener.wait_handle_finished().await;

// Sets the region to read only mode.
engine.set_writable(region_id, false).unwrap();
// Wakes up the listener.
listener.wake();

let notify = Arc::new(Notify::new());
// We already sets max background jobs to 1, so we can submit a task to the
// purge scheduler to ensure all purge tasks are finished.
let job_notify = notify.clone();
engine
.purge_scheduler()
.schedule(Box::pin(async move {
job_notify.notify_one();
}))
.unwrap();
notify.notified().await;

let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(
2,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
);
let stream = scanner.scan().await.unwrap();

let vec = collect_stream_ts(stream).await;
assert_eq!((0..20).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}
63 changes: 44 additions & 19 deletions src/mito2/src/engine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ use tokio::sync::Notify;
#[async_trait]
pub trait EventListener: Send + Sync {
/// Notifies the listener that a region is flushed successfully.
fn on_flush_success(&self, region_id: RegionId);
fn on_flush_success(&self, region_id: RegionId) {
let _ = region_id;
}

/// Notifies the listener that the engine is stalled.
fn on_write_stall(&self);
fn on_write_stall(&self) {}

/// Notifies the listener that the region starts to do flush.
async fn on_flush_begin(&self, region_id: RegionId);
async fn on_flush_begin(&self, region_id: RegionId) {
let _ = region_id;
}

/// Notifies the listener that the later drop task starts running.
/// Returns the gc interval if we want to override the default one.
Expand All @@ -46,6 +50,12 @@ pub trait EventListener: Send + Sync {
let _ = region_id;
let _ = removed;
}

/// Notifies the listener that the region is going to handle the compaction
/// finished request.
async fn on_handle_compaction_finished(&self, region_id: RegionId) {
let _ = region_id;
}
}

pub type EventListenerRef = Arc<dyn EventListener>;
Expand All @@ -70,10 +80,6 @@ impl EventListener for FlushListener {

self.notify.notify_one()
}

fn on_write_stall(&self) {}

async fn on_flush_begin(&self, _region_id: RegionId) {}
}

/// Listener to watch stall events.
Expand All @@ -98,8 +104,6 @@ impl EventListener for StallListener {

self.notify.notify_one();
}

async fn on_flush_begin(&self, _region_id: RegionId) {}
}

/// Listener to watch begin flush events.
Expand Down Expand Up @@ -130,10 +134,6 @@ impl FlushTruncateListener {

#[async_trait]
impl EventListener for FlushTruncateListener {
fn on_flush_success(&self, _region_id: RegionId) {}

fn on_write_stall(&self) {}

/// Calling this function will block the thread!
/// Notify the listener to perform a truncate region and block the flush region job.
async fn on_flush_begin(&self, region_id: RegionId) {
Expand Down Expand Up @@ -169,12 +169,6 @@ impl DropListener {

#[async_trait]
impl EventListener for DropListener {
fn on_flush_success(&self, _region_id: RegionId) {}

fn on_write_stall(&self) {}

async fn on_flush_begin(&self, _region_id: RegionId) {}

fn on_later_drop_begin(&self, _region_id: RegionId) -> Option<Duration> {
Some(self.gc_duration)
}
Expand All @@ -185,3 +179,34 @@ impl EventListener for DropListener {
self.notify.notify_one();
}
}

/// Listener on handling compaction requests.
#[derive(Default)]
pub struct CompactionListener {
handle_finished_notify: Notify,
blocker: Notify,
}

impl CompactionListener {
/// Waits for handling compaction finished request.
pub async fn wait_handle_finished(&self) {
self.handle_finished_notify.notified().await;
}

/// Wakes up the listener.
pub fn wake(&self) {
self.blocker.notify_one();
}
}

#[async_trait]
impl EventListener for CompactionListener {
async fn on_handle_compaction_finished(&self, region_id: RegionId) {
info!("Handle compaction finished request, region {region_id}");

self.handle_finished_notify.notify_one();

// Blocks current task.
self.blocker.notified().await;
}
}
2 changes: 1 addition & 1 deletion src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl TestEnv {
.unwrap()
}

/// Creates a new engine with specific config and manager/listener under this env.
/// Creates a new engine with specific config and manager/listener/purge_scheduler under this env.
pub async fn create_engine_with(
&mut self,
config: MitoConfig,
Expand Down
14 changes: 14 additions & 0 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ impl WorkerGroup {
cache_manager,
})
}

/// Returns the purge scheduler.
pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
&self.purge_scheduler
}
}

fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
Expand Down Expand Up @@ -819,6 +824,15 @@ impl WorkerListener {
let _ = region_id;
let _ = removed;
}

pub(crate) async fn on_handle_compaction_finished(&self, region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_handle_compaction_finished(region_id).await;
}
// Avoid compiler warning.
let _ = region_id;
}
}

#[cfg(test)]
Expand Down
8 changes: 7 additions & 1 deletion src/mito2/src/worker/handle_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_telemetry::{error, info};
use common_telemetry::{error, info, warn};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;

Expand Down Expand Up @@ -55,7 +55,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id: RegionId,
mut request: CompactionFinished,
) {
self.listener.on_handle_compaction_finished(region_id).await;

let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
warn!(
"Unable to finish the compaction task for a read only region {}",
region_id
);
return;
};

Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/worker/handle_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
mut request: FlushFinished,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
warn!(
"Unable to finish the flush task for a read only region {}",
region_id
);
return;
};

Expand Down

0 comments on commit c8ec6c6

Please sign in to comment.