diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 15c295933735..414f2823ade1 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -210,7 +210,8 @@ impl RegionServerInner { | RegionRequest::Delete(_) | RegionRequest::Alter(_) | RegionRequest::Flush(_) - | RegionRequest::Compact(_) => RegionChange::None, + | RegionRequest::Compact(_) + | RegionRequest::Truncate(_) => RegionChange::None, }; let engine = match ®ion_change { diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 8200e1778b71..7cd1079d2d12 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -34,6 +34,8 @@ pub(crate) mod listener; mod open_test; #[cfg(test)] mod projection_test; +#[cfg(test)] +mod truncate_test; use std::sync::Arc; diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index e50d3b5a222f..e591d2b5a576 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -16,17 +16,22 @@ use std::sync::Arc; +use async_trait::async_trait; use common_telemetry::info; use store_api::storage::RegionId; use tokio::sync::Notify; /// Mito engine background event listener. +#[async_trait] pub trait EventListener: Send + Sync { /// Notifies the listener that a region is flushed successfully. fn on_flush_success(&self, region_id: RegionId); /// Notifies the listener that the engine is stalled. fn on_write_stall(&self); + + /// Notifies the listener that the region starts to do flush. + async fn on_flush_begin(&self, region_id: RegionId); } pub type EventListenerRef = Arc; @@ -50,6 +55,7 @@ impl FlushListener { } } +#[async_trait] impl EventListener for FlushListener { fn on_flush_success(&self, region_id: RegionId) { info!("Region {} flush successfully", region_id); @@ -58,6 +64,8 @@ impl EventListener for FlushListener { } fn on_write_stall(&self) {} + + async fn on_flush_begin(&self, _region_id: RegionId) {} } /// Listener to watch stall events. @@ -79,6 +87,7 @@ impl StallListener { } } +#[async_trait] impl EventListener for StallListener { fn on_flush_success(&self, _region_id: RegionId) {} @@ -87,4 +96,57 @@ impl EventListener for StallListener { self.notify.notify_one(); } + + async fn on_flush_begin(&self, _region_id: RegionId) {} +} + +/// Listener to watch begin flush events. +/// +/// Crate a background thread to execute flush region, and the main thread calls `wait_truncate()` +/// to block and wait for `on_flush_region()`. +/// When the background thread calls `on_flush_begin()`, the main thread is notified to truncate +/// region, and background thread thread blocks and waits for `notify_flush()` to continue flushing. +pub struct FlushTruncateListener { + /// Notify flush operation. + notify_flush: Notify, + /// Notify truncate operation. + notify_truncate: Notify, +} + +impl FlushTruncateListener { + /// Creates a new listener. + pub fn new() -> FlushTruncateListener { + FlushTruncateListener { + notify_flush: Notify::new(), + notify_truncate: Notify::new(), + } + } + + /// Notify flush region to proceed. + pub fn notify_flush(&self) { + self.notify_flush.notify_one(); + } + + /// Wait for a truncate event. + pub async fn wait_truncate(&self) { + self.notify_truncate.notified().await; + } +} + +#[async_trait] +impl EventListener for FlushTruncateListener { + fn on_flush_success(&self, _region_id: RegionId) {} + + fn on_write_stall(&self) {} + + /// Calling this function will block the thread! + /// Notify the listener to perform a truncate region and block the flush region job. + async fn on_flush_begin(&self, region_id: RegionId) { + info!( + "Region {} begin do flush, notify region to truncate", + region_id + ); + self.notify_truncate.notify_one(); + self.notify_flush.notified().await; + } } diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs new file mode 100644 index 000000000000..c39f3a0e374b --- /dev/null +++ b/src/mito2/src/engine/truncate_test.rs @@ -0,0 +1,354 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::Rows; +use common_recordbatch::RecordBatches; +use common_telemetry::{info, init_default_ut_logging}; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{ + RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, +}; +use store_api::storage::RegionId; + +use super::ScanRequest; +use crate::config::MitoConfig; +use crate::engine::listener::FlushTruncateListener; +use crate::test_util::{ + build_rows, put_rows, rows_schema, CreateRequestBuilder, MockWriteBufferManager, TestEnv, +}; + +#[tokio::test] +async fn test_engine_truncate_region_basic() { + let mut env = TestEnv::with_prefix("truncate-basic"); + let engine = env.create_engine(MitoConfig::default()).await; + + // Create the region. + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Scan the region. + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); + + // Truncate the region. + engine + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap(); + + // Scan the region. + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "++\n++"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_engine_put_data_after_truncate() { + let mut env = TestEnv::with_prefix("truncate-put"); + let engine = env.create_engine(MitoConfig::default()).await; + + // Create the region. + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Scan the region + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); + + // Truncate the region. + engine + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap(); + + // Put data to the region again. + let rows = Rows { + schema: column_schemas, + rows: build_rows(5, 8), + }; + put_rows(&engine, region_id, rows).await; + + // Scan the region. + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_engine_truncate_after_flush() { + let mut env = TestEnv::with_prefix("truncate-flush"); + let engine = env.create_engine(MitoConfig::default()).await; + + // Create the region. + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Flush the region. + engine + .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .await + .unwrap(); + + let request = ScanRequest::default(); + let scanner = engine.scanner(region_id, request.clone()).unwrap(); + assert_eq!(1, scanner.num_files()); + + // Truncate the region. + engine + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas, + rows: build_rows(5, 8), + }; + put_rows(&engine, region_id, rows).await; + + // Scan the region. + let scanner = engine.scanner(region_id, request).unwrap(); + assert_eq!(0, scanner.num_files()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_engine_truncate_reopen() { + let mut env = TestEnv::with_prefix("truncate-reopen"); + let engine = env.create_engine(MitoConfig::default()).await; + + // Create the region. + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + let region = engine.get_region(region_id).unwrap(); + let last_entry_id = region.version_control.current().last_entry_id; + + // Truncate the region + engine + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap(); + + // Reopen the region again. + let engine = env.reopen_engine(engine, MitoConfig::default()).await; + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + }), + ) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + assert_eq!(last_entry_id, region.version().flushed_entry_id); + + // Scan the region. + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "++\n++"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_engine_truncate_during_flush() { + init_default_ut_logging(); + let mut env = TestEnv::with_prefix("truncate-during-flush"); + let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); + let listener = Arc::new(FlushTruncateListener::new()); + let engine = env + .create_engine_with( + MitoConfig::default(), + write_buffer_manager.clone(), + Some(listener.clone()), + ) + .await; + + // Create the region. + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + let region = engine.get_region(region_id).unwrap(); + + let version_data = region.version_control.current(); + let entry_id = version_data.last_entry_id; + let sequence = version_data.committed_sequence; + + // Flush reigon. + let engine_cloned = engine.clone(); + let flush_task = tokio::spawn(async move { + info!("do flush task!!!!"); + engine_cloned + .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .await + }); + + // Wait truncate before flush memtable. + listener.wait_truncate().await; + + // Truncate the region. + engine + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap(); + + // Notify region to continue flushing. + listener.notify_flush(); + + // Wait handle flushed finish. + let _err = flush_task.await.unwrap().unwrap_err(); + + // Check sequences and entry id. + let version_data = region.version_control.current(); + let truncated_entry_id = version_data.version.truncated_entry_id; + let truncated_sequence = version_data.version.flushed_sequence; + + let request = ScanRequest::default(); + let scanner = engine.scanner(region_id, request.clone()).unwrap(); + assert_eq!(0, scanner.num_files()); + assert_eq!(Some(entry_id), truncated_entry_id); + assert_eq!(sequence, truncated_sequence); + + // Reopen the engine. + let engine = env.reopen_engine(engine, MitoConfig::default()).await; + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + }), + ) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + let current_version = region.version_control.current().version; + assert_eq!(current_version.truncated_entry_id, Some(entry_id)); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 3c776ee55039..3833d32db2f3 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -420,6 +420,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Region {} is truncating, location: {}", region_id, location))] + RegionTruncating { + region_id: RegionId, + location: Location, + }, + #[snafu(display( "Engine write buffer is full, rejecting write requests of region {}, location: {}", region_id, @@ -540,6 +546,7 @@ impl ErrorExt for Error { FlushRegion { source, .. } => source.status_code(), RegionDropped { .. } => StatusCode::Cancelled, RegionClosed { .. } => StatusCode::Cancelled, + RegionTruncating { .. } => StatusCode::Cancelled, BuildCompactionPredicate { .. } => StatusCode::Internal, RejectWrite { .. } => StatusCode::StorageUnavailable, CompactRegion { source, .. } => source.status_code(), diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 15dc80de0e14..1ae246ed208e 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -25,7 +25,9 @@ use store_api::storage::RegionId; use tokio::sync::mpsc; use crate::access_layer::AccessLayerRef; -use crate::error::{Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, Result}; +use crate::error::{ + Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatingSnafu, Result, +}; use crate::memtable::MemtableBuilderRef; use crate::read::Source; use crate::region::version::{VersionControlData, VersionRef}; @@ -38,6 +40,7 @@ use crate::schedule::scheduler::{Job, SchedulerRef}; use crate::sst::file::{FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::parquet::WriteOptions; +use crate::worker::WorkerListener; /// Global write buffer (memtable) manager. /// @@ -185,6 +188,7 @@ pub(crate) struct RegionFlushTask { pub(crate) access_layer: AccessLayerRef, pub(crate) memtable_builder: MemtableBuilderRef, pub(crate) file_purger: FilePurgerRef, + pub(crate) listener: WorkerListener, } impl RegionFlushTask { @@ -226,6 +230,7 @@ impl RegionFlushTask { /// Runs the flush task. async fn do_flush(&mut self, version_data: VersionControlData) { + self.listener.on_flush_begin(self.region_id).await; let worker_request = match self.flush_memtables(&version_data.version).await { Ok(file_metas) => { let memtables_to_remove = version_data @@ -450,24 +455,33 @@ impl FlushScheduler { /// Notifies the scheduler that the region is dropped. pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) { - // Remove this region. - let Some(flush_status) = self.region_status.remove(®ion_id) else { - return; - }; - - // Notifies all pending tasks. - flush_status.on_failure(Arc::new(RegionDroppedSnafu { region_id }.build())); + self.remove_region_on_failure( + region_id, + Arc::new(RegionDroppedSnafu { region_id }.build()), + ); } /// Notifies the scheduler that the region is closed. pub(crate) fn on_region_closed(&mut self, region_id: RegionId) { + self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build())); + } + + /// Notifies the scheduler that the region is truncating. + pub(crate) fn on_region_truncating(&mut self, region_id: RegionId) { + self.remove_region_on_failure( + region_id, + Arc::new(RegionTruncatingSnafu { region_id }.build()), + ); + } + + pub(crate) fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc) { // Remove this region. let Some(flush_status) = self.region_status.remove(®ion_id) else { return; }; // Notifies all pending tasks. - flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build())); + flush_status.on_failure(err); } /// Add ddl request to pending queue. diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 812082bb44d2..2ed8670b516d 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -35,6 +35,8 @@ pub enum RegionMetaAction { Edit(RegionEdit), /// Remove the region. Remove(RegionRemove), + /// Truncate the region. + Truncate(RegionTruncate), } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -57,6 +59,16 @@ pub struct RegionRemove { pub region_id: RegionId, } +/// Last data truncated in the region. +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct RegionTruncate { + pub region_id: RegionId, + /// Last WAL entry id of truncated data. + pub truncated_entry_id: EntryId, + // Last sequence number of truncated data. + pub truncated_sequence: SequenceNumber, +} + /// The region manifest data. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionManifest { @@ -70,6 +82,8 @@ pub struct RegionManifest { pub flushed_sequence: SequenceNumber, /// Current manifest version. pub manifest_version: ManifestVersion, + /// Last WAL entry id of truncated data. + pub truncated_entry_id: Option, } #[derive(Debug, Default)] @@ -79,6 +93,7 @@ pub struct RegionManifestBuilder { flushed_entry_id: EntryId, flushed_sequence: SequenceNumber, manifest_version: ManifestVersion, + truncated_entry_id: Option, } impl RegionManifestBuilder { @@ -91,6 +106,7 @@ impl RegionManifestBuilder { flushed_entry_id: s.flushed_entry_id, manifest_version: s.manifest_version, flushed_sequence: s.flushed_sequence, + truncated_entry_id: s.truncated_entry_id, } } else { Default::default() @@ -118,6 +134,14 @@ impl RegionManifestBuilder { } } + pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) { + self.manifest_version = manifest_version; + self.flushed_entry_id = truncate.truncated_entry_id; + self.flushed_sequence = truncate.truncated_sequence; + self.truncated_entry_id = Some(truncate.truncated_entry_id); + self.files.clear(); + } + /// Check if the builder keeps a [RegionMetadata](crate::metadata::RegionMetadata). pub fn contains_metadata(&self) -> bool { self.metadata.is_some() @@ -131,6 +155,7 @@ impl RegionManifestBuilder { flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, manifest_version: self.manifest_version, + truncated_entry_id: self.truncated_entry_id, }) } } diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 53c6b0127cc5..e9c85eea3958 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -279,6 +279,9 @@ impl RegionManifestManagerInner { options.manifest_dir, action ); } + RegionMetaAction::Truncate(action) => { + manifest_builder.apply_truncate(manifest_version, action); + } } } } @@ -329,6 +332,9 @@ impl RegionManifestManagerInner { self.manifest.metadata.region_id, action ); } + RegionMetaAction::Truncate(action) => { + manifest_builder.apply_truncate(version, action); + } } } let new_manifest = manifest_builder.try_build()?; @@ -400,6 +406,9 @@ impl RegionManifestManagerInner { self.manifest.metadata.region_id, action ); } + RegionMetaAction::Truncate(action) => { + manifest_builder.apply_truncate(version, action); + } } } last_version = version; diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 9dd635b8a73a..e1b1c54cd82c 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -150,7 +150,7 @@ async fn manager_with_checkpoint_distance_1() { .await .unwrap(); let raw_json = std::str::from_utf8(&raw_bytes).unwrap(); - let expected_json = "{\"size\":790,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; + let expected_json = "{\"size\":816,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; assert_eq!(expected_json, raw_json); // reopen the manager diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 91de9cf4761c..157eff1a4a5d 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -156,6 +156,7 @@ impl RegionOpener { .add_files(file_purger.clone(), manifest.files.values().cloned()) .flushed_entry_id(manifest.flushed_entry_id) .flushed_sequence(manifest.flushed_sequence) + .truncated_entry_id(manifest.truncated_entry_id) .build(); let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 502087f3eb15..87cc71bb29be 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -138,6 +138,29 @@ impl VersionControl { let mut version_data = self.data.write().unwrap(); version_data.version = new_version; } + + /// Truncate current version. + pub(crate) fn truncate( + &self, + truncated_entry_id: EntryId, + truncated_sequence: SequenceNumber, + memtable_builder: &MemtableBuilderRef, + ) { + let version = self.current().version; + + let new_mutable = memtable_builder.build(&version.metadata); + let new_version = Arc::new( + VersionBuilder::new(version.metadata.clone(), new_mutable) + .flushed_entry_id(truncated_entry_id) + .flushed_sequence(truncated_sequence) + .truncated_entry_id(Some(truncated_entry_id)) + .build(), + ); + + let mut version_data = self.data.write().unwrap(); + version_data.version.ssts.mark_all_deleted(); + version_data.version = new_version; + } } pub(crate) type VersionControlRef = Arc; @@ -177,6 +200,10 @@ pub(crate) struct Version { pub(crate) flushed_entry_id: EntryId, /// Inclusive max sequence of flushed data. pub(crate) flushed_sequence: SequenceNumber, + /// Latest entry id during the truncating table. + /// + /// Used to check if it is a flush task during the truncating table. + pub(crate) truncated_entry_id: Option, // TODO(yingwen): RegionOptions. } @@ -189,6 +216,7 @@ pub(crate) struct VersionBuilder { ssts: SstVersionRef, flushed_entry_id: EntryId, flushed_sequence: SequenceNumber, + truncated_entry_id: Option, } impl VersionBuilder { @@ -200,6 +228,7 @@ impl VersionBuilder { ssts: Arc::new(SstVersion::new()), flushed_entry_id: 0, flushed_sequence: 0, + truncated_entry_id: None, } } @@ -211,6 +240,7 @@ impl VersionBuilder { ssts: version.ssts.clone(), flushed_entry_id: version.flushed_entry_id, flushed_sequence: version.flushed_sequence, + truncated_entry_id: version.truncated_entry_id, } } @@ -238,6 +268,12 @@ impl VersionBuilder { self } + /// Sets truncated entty id. + pub(crate) fn truncated_entry_id(mut self, entry_id: Option) -> Self { + self.truncated_entry_id = entry_id; + self + } + /// Apply edit to the builder. pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self { if let Some(entry_id) = edit.flushed_entry_id { @@ -287,6 +323,7 @@ impl VersionBuilder { ssts: self.ssts, flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, + truncated_entry_id: self.truncated_entry_id, } } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index e0fc4ad2e340..f4b4352cc493 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -35,7 +35,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_request::{ RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, - RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, + RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, }; use store_api::storage::{CompactionStrategy, RegionId, SequenceNumber}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -552,6 +552,11 @@ impl WorkerRequest { sender: sender.into(), request: DdlRequest::Compact(v), }), + RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest { + region_id, + sender: sender.into(), + request: DdlRequest::Truncate(v), + }), }; Ok((worker_request, receiver)) @@ -568,6 +573,7 @@ pub(crate) enum DdlRequest { Alter(RegionAlterRequest), Flush(RegionFlushRequest), Compact(RegionCompactRequest), + Truncate(RegionTruncateRequest), } /// Sender and Ddl request. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 3d824815ce4d..3cef393f7c87 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -21,6 +21,7 @@ mod handle_create; mod handle_drop; mod handle_flush; mod handle_open; +mod handle_truncate; mod handle_write; use std::collections::hash_map::DefaultHasher; @@ -503,6 +504,7 @@ impl RegionWorkerLoop { self.handle_compaction_request(ddl.region_id, ddl.sender); continue; } + DdlRequest::Truncate(_) => self.handle_truncate_request(ddl.region_id).await, }; ddl.sender.send(res); @@ -540,7 +542,7 @@ impl RegionWorkerLoop { } /// Wrapper that only calls event listener in tests. -#[derive(Default)] +#[derive(Default, Clone)] pub(crate) struct WorkerListener { #[cfg(test)] listener: Option, @@ -571,6 +573,15 @@ impl WorkerListener { listener.on_write_stall(); } } + + pub(crate) async fn on_flush_begin(&self, region_id: RegionId) { + #[cfg(test)] + if let Some(listener) = &self.listener { + listener.on_flush_begin(region_id).await; + } + // Avoid compiler warning. + let _ = region_id; + } } #[cfg(test)] diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 36eaec042eb1..cb2f2746dc99 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -19,7 +19,7 @@ use common_time::util::current_time_millis; use store_api::logstore::LogStore; use store_api::storage::RegionId; -use crate::error::Result; +use crate::error::{RegionTruncatingSnafu, Result}; use crate::flush::{FlushReason, RegionFlushTask}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; @@ -121,6 +121,7 @@ impl RegionWorkerLoop { access_layer: region.access_layer.clone(), memtable_builder: self.memtable_builder.clone(), file_purger: region.file_purger.clone(), + listener: self.listener.clone(), } } } @@ -136,6 +137,15 @@ impl RegionWorkerLoop { return; }; + // The flush task before truncating the region fails immediately. + let version_data = region.version_control.current(); + if let Some(truncated_entry_id) = version_data.version.truncated_entry_id { + if truncated_entry_id >= request.flushed_entry_id { + request.on_failure(RegionTruncatingSnafu { region_id }.build()); + return; + } + } + // Write region edit to manifest. let edit = RegionEdit { files_to_add: std::mem::take(&mut request.file_metas), diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs new file mode 100644 index 000000000000..759f17b90cff --- /dev/null +++ b/src/mito2/src/worker/handle_truncate.rs @@ -0,0 +1,66 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Handling truncate related requests. + +use common_query::Output; +use common_telemetry::info; +use store_api::logstore::LogStore; +use store_api::storage::RegionId; + +use crate::error::Result; +use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTruncate}; +use crate::worker::RegionWorkerLoop; + +impl RegionWorkerLoop { + pub(crate) async fn handle_truncate_request(&mut self, region_id: RegionId) -> Result { + let region = self.regions.writable_region(region_id)?; + + info!("Try to truncate region {}", region_id); + + let version_data = region.version_control.current(); + let truncated_entry_id = version_data.last_entry_id; + let truncated_sequence = version_data.committed_sequence; + + // Write region truncated to manifest. + let truncate = RegionTruncate { + region_id, + truncated_entry_id, + truncated_sequence, + }; + let action_list = + RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); + region.manifest_manager.update(action_list).await?; + + // Notifies flush scheduler. + self.flush_scheduler.on_region_truncating(region_id); + // TODO(DevilExileSu): Notifies compaction scheduler. + + // Reset region's version and mark all SSTs deleted. + region.version_control.truncate( + truncated_entry_id, + truncated_sequence, + &self.memtable_builder, + ); + + // Make all data obsolete. + self.wal.obsolete(region_id, truncated_entry_id).await?; + info!( + "Complete truncating region: {}, entry id: {} and sequence: {}.", + region_id, truncated_entry_id, truncated_sequence + ); + + Ok(Output::AffectedRows(0)) + } +} diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 7fb65cdc6e18..9b9eb845eab8 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -38,6 +38,7 @@ pub enum RegionRequest { Alter(RegionAlterRequest), Flush(RegionFlushRequest), Compact(RegionCompactRequest), + Truncate(RegionTruncateRequest), } impl RegionRequest { @@ -412,6 +413,9 @@ pub struct RegionFlushRequest {} #[derive(Debug)] pub struct RegionCompactRequest {} +#[derive(Debug)] +pub struct RegionTruncateRequest {} + #[cfg(test)] mod tests { use api::v1::region::RegionColumnDef;