From fda18bd116a4541c4d49d4c798e962dba926938a Mon Sep 17 00:00:00 2001 From: Vanish Date: Tue, 5 Sep 2023 23:14:05 +0800 Subject: [PATCH 01/20] feat: implement truncate region for mito2. --- src/datanode/src/region_server.rs | 3 +- src/mito2/src/engine.rs | 2 + src/mito2/src/engine/truncate_test.rs | 272 ++++++++++++++++++++++++ src/mito2/src/error.rs | 7 + src/mito2/src/flush.rs | 15 +- src/mito2/src/manifest/action.rs | 12 ++ src/mito2/src/manifest/manager.rs | 9 + src/mito2/src/region/version.rs | 17 ++ src/mito2/src/request.rs | 8 +- src/mito2/src/test_util.rs | 5 + src/mito2/src/worker.rs | 2 + src/mito2/src/worker/handle_truncate.rs | 51 +++++ src/store-api/src/region_request.rs | 4 + 13 files changed, 404 insertions(+), 3 deletions(-) create mode 100644 src/mito2/src/engine/truncate_test.rs create mode 100644 src/mito2/src/worker/handle_truncate.rs diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 15c295933735..414f2823ade1 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -210,7 +210,8 @@ impl RegionServerInner { | RegionRequest::Delete(_) | RegionRequest::Alter(_) | RegionRequest::Flush(_) - | RegionRequest::Compact(_) => RegionChange::None, + | RegionRequest::Compact(_) + | RegionRequest::Truncate(_) => RegionChange::None, }; let engine = match ®ion_change { diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 8200e1778b71..7cd1079d2d12 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -34,6 +34,8 @@ pub(crate) mod listener; mod open_test; #[cfg(test)] mod projection_test; +#[cfg(test)] +mod truncate_test; use std::sync::Arc; diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs new file mode 100644 index 000000000000..8673bc57db39 --- /dev/null +++ b/src/mito2/src/engine/truncate_test.rs @@ -0,0 +1,272 @@ +use std::collections::HashMap; +use std::time::Duration; + +use api::v1::Rows; +use common_recordbatch::RecordBatches; +use common_telemetry::init_default_ut_logging; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{ + RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, +}; +use store_api::storage::RegionId; + +use super::ScanRequest; +use crate::config::MitoConfig; +use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; + +fn has_parquet_file(sst_dir: &str) -> bool { + for entry in std::fs::read_dir(sst_dir).unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + if !path.is_dir() { + assert_eq!("parquet", path.extension().unwrap()); + return true; + } + } + + false +} + +#[tokio::test] +async fn test_engine_truncate_region_basic() { + let mut env = TestEnv::with_prefix("truncate-basic"); + let engine = env.create_engine(MitoConfig::default()).await; + + // Create the region. + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Scan the region. + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); + + // Truncate the region. + engine + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap(); + + // Scan the region. + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "++\n++"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_engine_put_data_after_truncate() { + let mut env = TestEnv::with_prefix("truncate-put"); + let engine = env.create_engine(MitoConfig::default()).await; + + // Create the region. + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Scan the region. + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); + + // Truncate the region. + engine + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap(); + + // Put data to the region again. + let rows = Rows { + schema: column_schemas, + rows: build_rows(5, 8), + }; + put_rows(&engine, region_id, rows).await; + + // Scan the region. + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_engine_truncate_after_flush() { + init_default_ut_logging(); + let mut env = TestEnv::with_prefix("truncate-flush"); + let region_dir; + { + let engine = env.create_engine(MitoConfig::default()).await; + + // Create the region. + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Flush the region. + engine + .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + region_dir = env + .get_data_path() + .join(region.access_layer.region_dir()) + .display() + .to_string(); + assert!(has_parquet_file(®ion_dir)); + + // Truncate the region. + engine + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas, + rows: build_rows(5, 8), + }; + put_rows(&engine, region_id, rows).await; + + // Scan the region. + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); + } + tokio::time::sleep(Duration::from_millis(100)).await; + assert!(!has_parquet_file(®ion_dir)); +} + +#[tokio::test] +async fn test_engine_truncate_reopen() { + let mut env = TestEnv::with_prefix("truncate-reopen"); + let engine = env.create_engine(MitoConfig::default()).await; + + // Create the region. + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Flush the region. + engine + .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .await + .unwrap(); + + // Truncate the region + engine + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap(); + + // Close the region. + engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + + // Reopen the region again. + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + }), + ) + .await + .unwrap(); + + // Scan the region. + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "++\n++"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 3c776ee55039..3833d32db2f3 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -420,6 +420,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Region {} is truncating, location: {}", region_id, location))] + RegionTruncating { + region_id: RegionId, + location: Location, + }, + #[snafu(display( "Engine write buffer is full, rejecting write requests of region {}, location: {}", region_id, @@ -540,6 +546,7 @@ impl ErrorExt for Error { FlushRegion { source, .. } => source.status_code(), RegionDropped { .. } => StatusCode::Cancelled, RegionClosed { .. } => StatusCode::Cancelled, + RegionTruncating { .. } => StatusCode::Cancelled, BuildCompactionPredicate { .. } => StatusCode::Internal, RejectWrite { .. } => StatusCode::StorageUnavailable, CompactRegion { source, .. } => source.status_code(), diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 15dc80de0e14..588c5b26c324 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -25,7 +25,9 @@ use store_api::storage::RegionId; use tokio::sync::mpsc; use crate::access_layer::AccessLayerRef; -use crate::error::{Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, Result}; +use crate::error::{ + Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatingSnafu, Result, +}; use crate::memtable::MemtableBuilderRef; use crate::read::Source; use crate::region::version::{VersionControlData, VersionRef}; @@ -470,6 +472,17 @@ impl FlushScheduler { flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build())); } + /// Notifies the scheduler that the region is truncating. + pub(crate) fn on_region_truncating(&mut self, region_id: RegionId) { + // Remove this region. + let Some(flush_status) = self.region_status.remove(®ion_id) else { + return; + }; + + // Notifies all pending tasks. + flush_status.on_failure(Arc::new(RegionTruncatingSnafu { region_id }.build())); + } + /// Add ddl request to pending queue. /// /// # Panics diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 812082bb44d2..05863b41cba3 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -35,6 +35,8 @@ pub enum RegionMetaAction { Edit(RegionEdit), /// Remove the region. Remove(RegionRemove), + /// Truncate the region. + Truncate(RegionTruncate), } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -57,6 +59,11 @@ pub struct RegionRemove { pub region_id: RegionId, } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct RegionTruncate { + pub region_id: RegionId, +} + /// The region manifest data. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionManifest { @@ -118,6 +125,11 @@ impl RegionManifestBuilder { } } + pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, _truncate: RegionTruncate) { + self.manifest_version = manifest_version; + self.files.clear(); + } + /// Check if the builder keeps a [RegionMetadata](crate::metadata::RegionMetadata). pub fn contains_metadata(&self) -> bool { self.metadata.is_some() diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 53c6b0127cc5..e9c85eea3958 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -279,6 +279,9 @@ impl RegionManifestManagerInner { options.manifest_dir, action ); } + RegionMetaAction::Truncate(action) => { + manifest_builder.apply_truncate(manifest_version, action); + } } } } @@ -329,6 +332,9 @@ impl RegionManifestManagerInner { self.manifest.metadata.region_id, action ); } + RegionMetaAction::Truncate(action) => { + manifest_builder.apply_truncate(version, action); + } } } let new_manifest = manifest_builder.try_build()?; @@ -400,6 +406,9 @@ impl RegionManifestManagerInner { self.manifest.metadata.region_id, action ); } + RegionMetaAction::Truncate(action) => { + manifest_builder.apply_truncate(version, action); + } } } last_version = version; diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 502087f3eb15..7f9c8a39a84b 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -138,6 +138,23 @@ impl VersionControl { let mut version_data = self.data.write().unwrap(); version_data.version = new_version; } + + /// Reset current version. + pub(crate) fn reset(&self, flushed_entry_id: u64, memtable_builder: &MemtableBuilderRef) { + let version = self.current().version; + + let new_mutable = memtable_builder.build(&version.metadata); + let new_memtables = MemtableVersion::new(new_mutable); + + let mut version_builder = VersionBuilder::from_version(version).memtables(new_memtables); + version_builder.flushed_entry_id = flushed_entry_id; + version_builder.ssts = Arc::new(SstVersion::new()); + let new_version = Arc::new(version_builder.build()); + + let mut version_data = self.data.write().unwrap(); + version_data.version.ssts.mark_all_deleted(); + version_data.version = new_version; + } } pub(crate) type VersionControlRef = Arc; diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index e0fc4ad2e340..f4b4352cc493 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -35,7 +35,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_request::{ RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, - RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, + RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, }; use store_api::storage::{CompactionStrategy, RegionId, SequenceNumber}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -552,6 +552,11 @@ impl WorkerRequest { sender: sender.into(), request: DdlRequest::Compact(v), }), + RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest { + region_id, + sender: sender.into(), + request: DdlRequest::Truncate(v), + }), }; Ok((worker_request, receiver)) @@ -568,6 +573,7 @@ pub(crate) enum DdlRequest { Alter(RegionAlterRequest), Flush(RegionFlushRequest), Compact(RegionCompactRequest), + Truncate(RegionTruncateRequest), } /// Sender and Ddl request. diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 6fbf197ab9b4..6052dd9d78f1 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -15,6 +15,7 @@ //! Utilities for testing. use std::collections::HashMap; +use std::path::PathBuf; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; @@ -195,6 +196,10 @@ impl TestEnv { RegionManifestManager::open(manifest_opts).await } } + + pub fn get_data_path(&self) -> PathBuf { + self.data_home.path().join("data") + } } /// Builder to mock a [RegionCreateRequest]. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 3d824815ce4d..6ca21a3bf902 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -21,6 +21,7 @@ mod handle_create; mod handle_drop; mod handle_flush; mod handle_open; +mod handle_truncate; mod handle_write; use std::collections::hash_map::DefaultHasher; @@ -503,6 +504,7 @@ impl RegionWorkerLoop { self.handle_compaction_request(ddl.region_id, ddl.sender); continue; } + DdlRequest::Truncate(_) => self.handle_truncate_request(ddl.region_id).await, }; ddl.sender.send(res); diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs new file mode 100644 index 000000000000..9391bab5d5b0 --- /dev/null +++ b/src/mito2/src/worker/handle_truncate.rs @@ -0,0 +1,51 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Handling flush related requests. + +use common_query::Output; +use common_telemetry::info; +use store_api::logstore::LogStore; +use store_api::storage::RegionId; + +use crate::error::{RegionNotFoundSnafu, Result}; +use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTruncate}; +use crate::worker::RegionWorkerLoop; + +impl RegionWorkerLoop { + pub(crate) async fn handle_truncate_request(&mut self, region_id: RegionId) -> Result { + let Some(region) = self.regions.get_region(region_id) else { + return RegionNotFoundSnafu { region_id }.fail(); + }; + info!("Try to truncate region {}", region_id); + + // Write region truncated to manifest. + let truncate = RegionTruncate { region_id }; + let action_list = + RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); + region.manifest_manager.update(action_list).await?; + + // Notifies flush scheduler. + self.flush_scheduler.on_region_truncating(region_id); + + // Maek all data obsolete. + let version_data = region.version_control.current(); + let commited_sequence = version_data.committed_sequence; + self.wal.obsolete(region_id, commited_sequence).await?; + + // Reset region's version and mark all SSTs deleted. + region.version_control.reset(0, &self.memtable_builder); + Ok(Output::AffectedRows(0)) + } +} diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 7fb65cdc6e18..9b9eb845eab8 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -38,6 +38,7 @@ pub enum RegionRequest { Alter(RegionAlterRequest), Flush(RegionFlushRequest), Compact(RegionCompactRequest), + Truncate(RegionTruncateRequest), } impl RegionRequest { @@ -412,6 +413,9 @@ pub struct RegionFlushRequest {} #[derive(Debug)] pub struct RegionCompactRequest {} +#[derive(Debug)] +pub struct RegionTruncateRequest {} + #[cfg(test)] mod tests { use api::v1::region::RegionColumnDef; From 09f42b50c442ff38fbb312bed3d46791382d353b Mon Sep 17 00:00:00 2001 From: Vanish Date: Tue, 5 Sep 2023 23:40:50 +0800 Subject: [PATCH 02/20] chore: add license header and fix typos --- src/mito2/src/engine/truncate_test.rs | 14 ++++++++++++++ src/mito2/src/worker/handle_truncate.rs | 6 +++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 8673bc57db39..31059dce4026 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -1,3 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::collections::HashMap; use std::time::Duration; diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 9391bab5d5b0..1b9caadf2bec 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Handling flush related requests. +//! Handling truncate related requests. use common_query::Output; use common_telemetry::info; @@ -41,8 +41,8 @@ impl RegionWorkerLoop { // Maek all data obsolete. let version_data = region.version_control.current(); - let commited_sequence = version_data.committed_sequence; - self.wal.obsolete(region_id, commited_sequence).await?; + let committed_sequence = version_data.committed_sequence; + self.wal.obsolete(region_id, committed_sequence).await?; // Reset region's version and mark all SSTs deleted. region.version_control.reset(0, &self.memtable_builder); From 72a3c8d9399a3916553c52fa82d388aee0ce1b4c Mon Sep 17 00:00:00 2001 From: Vanish Date: Wed, 6 Sep 2023 21:34:47 +0800 Subject: [PATCH 03/20] Update src/mito2/src/worker/handle_truncate.rs Co-authored-by: Yingwen --- src/mito2/src/worker/handle_truncate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 1b9caadf2bec..690d3b90e25e 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -39,7 +39,7 @@ impl RegionWorkerLoop { // Notifies flush scheduler. self.flush_scheduler.on_region_truncating(region_id); - // Maek all data obsolete. + // Make all data obsolete. let version_data = region.version_control.current(); let committed_sequence = version_data.committed_sequence; self.wal.obsolete(region_id, committed_sequence).await?; From 3b08746da00b44f0554428d5c585cad588a385a0 Mon Sep 17 00:00:00 2001 From: Vanish Date: Wed, 6 Sep 2023 22:51:13 +0800 Subject: [PATCH 04/20] cr --- src/mito2/src/engine/truncate_test.rs | 121 +++++++++++------------- src/mito2/src/flush.rs | 28 +++--- src/mito2/src/manifest/action.rs | 4 +- src/mito2/src/region/version.rs | 11 +-- src/mito2/src/worker/handle_truncate.rs | 19 ++-- 5 files changed, 87 insertions(+), 96 deletions(-) diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 31059dce4026..6524ee8454b0 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -28,19 +28,6 @@ use super::ScanRequest; use crate::config::MitoConfig; use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; -fn has_parquet_file(sst_dir: &str) -> bool { - for entry in std::fs::read_dir(sst_dir).unwrap() { - let entry = entry.unwrap(); - let path = entry.path(); - if !path.is_dir() { - assert_eq!("parquet", path.extension().unwrap()); - return true; - } - } - - false -} - #[tokio::test] async fn test_engine_truncate_region_basic() { let mut env = TestEnv::with_prefix("truncate-basic"); @@ -158,58 +145,54 @@ async fn test_engine_put_data_after_truncate() { async fn test_engine_truncate_after_flush() { init_default_ut_logging(); let mut env = TestEnv::with_prefix("truncate-flush"); - let region_dir; - { - let engine = env.create_engine(MitoConfig::default()).await; - - // Create the region. - let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); - let column_schemas = rows_schema(&request); - engine - .handle_request(region_id, RegionRequest::Create(request)) - .await - .unwrap(); - - // Put data to the region. - let rows = Rows { - schema: column_schemas.clone(), - rows: build_rows(0, 3), - }; - put_rows(&engine, region_id, rows).await; - - // Flush the region. - engine - .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) - .await - .unwrap(); - - let region = engine.get_region(region_id).unwrap(); - region_dir = env - .get_data_path() - .join(region.access_layer.region_dir()) - .display() - .to_string(); - assert!(has_parquet_file(®ion_dir)); - - // Truncate the region. - engine - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) - .await - .unwrap(); - - // Put data to the region. - let rows = Rows { - schema: column_schemas, - rows: build_rows(5, 8), - }; - put_rows(&engine, region_id, rows).await; - - // Scan the region. - let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); - let batches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ + let engine = env.create_engine(MitoConfig::default()).await; + + // Create the region. + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Flush the region. + engine + .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .await + .unwrap(); + + let request = ScanRequest::default(); + let scanner = engine.scan(region_id, request.clone()).unwrap(); + assert_eq!(1, scanner.num_files()); + + // Truncate the region. + engine + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas, + rows: build_rows(5, 8), + }; + put_rows(&engine, region_id, rows).await; + + // Scan the region. + let stream = engine + .handle_query(region_id, request.clone()) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ +-------+---------+---------------------+ | tag_0 | field_0 | ts | +-------+---------+---------------------+ @@ -217,10 +200,12 @@ async fn test_engine_truncate_after_flush() { | 6 | 6.0 | 1970-01-01T00:00:06 | | 7 | 7.0 | 1970-01-01T00:00:07 | +-------+---------+---------------------+"; - assert_eq!(expected, batches.pretty_print().unwrap()); - } + assert_eq!(expected, batches.pretty_print().unwrap()); + tokio::time::sleep(Duration::from_millis(100)).await; - assert!(!has_parquet_file(®ion_dir)); + + let scanner = engine.scan(region_id, request).unwrap(); + assert_eq!(0, scanner.num_files()); } #[tokio::test] diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 588c5b26c324..06580794be45 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -452,35 +452,33 @@ impl FlushScheduler { /// Notifies the scheduler that the region is dropped. pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) { - // Remove this region. - let Some(flush_status) = self.region_status.remove(®ion_id) else { - return; - }; - - // Notifies all pending tasks. - flush_status.on_failure(Arc::new(RegionDroppedSnafu { region_id }.build())); + self.remove_region_on_failure( + region_id, + Arc::new(RegionDroppedSnafu { region_id }.build()), + ); } /// Notifies the scheduler that the region is closed. pub(crate) fn on_region_closed(&mut self, region_id: RegionId) { - // Remove this region. - let Some(flush_status) = self.region_status.remove(®ion_id) else { - return; - }; - - // Notifies all pending tasks. - flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build())); + self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build())); } /// Notifies the scheduler that the region is truncating. pub(crate) fn on_region_truncating(&mut self, region_id: RegionId) { + self.remove_region_on_failure( + region_id, + Arc::new(RegionTruncatingSnafu { region_id }.build()), + ); + } + + pub(crate) fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc) { // Remove this region. let Some(flush_status) = self.region_status.remove(®ion_id) else { return; }; // Notifies all pending tasks. - flush_status.on_failure(Arc::new(RegionTruncatingSnafu { region_id }.build())); + flush_status.on_failure(err); } /// Add ddl request to pending queue. diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 05863b41cba3..949898236c3b 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -62,6 +62,7 @@ pub struct RegionRemove { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionTruncate { pub region_id: RegionId, + pub flushed_entry_id: EntryId, } /// The region manifest data. @@ -125,8 +126,9 @@ impl RegionManifestBuilder { } } - pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, _truncate: RegionTruncate) { + pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) { self.manifest_version = manifest_version; + self.flushed_entry_id = truncate.flushed_entry_id; self.files.clear(); } diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 7f9c8a39a84b..78b3eca6d832 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -144,12 +144,11 @@ impl VersionControl { let version = self.current().version; let new_mutable = memtable_builder.build(&version.metadata); - let new_memtables = MemtableVersion::new(new_mutable); - - let mut version_builder = VersionBuilder::from_version(version).memtables(new_memtables); - version_builder.flushed_entry_id = flushed_entry_id; - version_builder.ssts = Arc::new(SstVersion::new()); - let new_version = Arc::new(version_builder.build()); + let new_version = Arc::new( + VersionBuilder::new(version.metadata.clone(), new_mutable) + .flushed_entry_id(flushed_entry_id) + .build(), + ); let mut version_data = self.data.write().unwrap(); version_data.version.ssts.mark_all_deleted(); diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 690d3b90e25e..2702c94ea7d9 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -30,8 +30,13 @@ impl RegionWorkerLoop { }; info!("Try to truncate region {}", region_id); + let version_data = region.version_control.current(); + // Write region truncated to manifest. - let truncate = RegionTruncate { region_id }; + let truncate = RegionTruncate { + region_id, + flushed_entry_id: version_data.last_entry_id, + }; let action_list = RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); region.manifest_manager.update(action_list).await?; @@ -39,13 +44,15 @@ impl RegionWorkerLoop { // Notifies flush scheduler. self.flush_scheduler.on_region_truncating(region_id); + // Reset region's version and mark all SSTs deleted. + let entry_id = version_data.last_entry_id; + region + .version_control + .reset(entry_id, &self.memtable_builder); + // Make all data obsolete. - let version_data = region.version_control.current(); - let committed_sequence = version_data.committed_sequence; - self.wal.obsolete(region_id, committed_sequence).await?; + self.wal.obsolete(region_id, entry_id).await?; - // Reset region's version and mark all SSTs deleted. - region.version_control.reset(0, &self.memtable_builder); Ok(Output::AffectedRows(0)) } } From fa3fd6086eb9b1692c904e5ea7fc865c298c33a4 Mon Sep 17 00:00:00 2001 From: Vanish Date: Fri, 8 Sep 2023 02:10:41 +0800 Subject: [PATCH 05/20] chore: consider the flush task being executed before truncating the region. --- src/mito2/src/engine.rs | 18 +++++ src/mito2/src/engine/truncate_test.rs | 92 ++++++++++++++++++++++--- src/mito2/src/manifest/action.rs | 5 ++ src/mito2/src/worker/handle_flush.rs | 13 +++- src/mito2/src/worker/handle_truncate.rs | 13 ++-- 5 files changed, 127 insertions(+), 14 deletions(-) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 7cd1079d2d12..fe44d3acc818 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -91,6 +91,15 @@ impl MitoEngine { pub(crate) fn get_region(&self, id: RegionId) -> Option { self.inner.workers.get_region(id) } + + #[cfg(test)] + pub(crate) async fn handle_worker_request( + &self, + region_id: RegionId, + request: WorkerRequest, + ) -> Result<()> { + self.inner.handle_worker_request(region_id, request).await + } } /// Inner struct of [MitoEngine]. @@ -159,6 +168,15 @@ impl EngineInner { region.set_writable(writable); Ok(()) } + + #[cfg(test)] + pub(crate) async fn handle_worker_request( + &self, + region_id: RegionId, + request: WorkerRequest, + ) -> Result<()> { + self.workers.submit_to_worker(region_id, request).await + } } #[async_trait] diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 6524ee8454b0..c368212801fa 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -17,15 +17,19 @@ use std::time::Duration; use api::v1::Rows; use common_recordbatch::RecordBatches; -use common_telemetry::init_default_ut_logging; +use object_store::util::join_path; +use smallvec::SmallVec; use store_api::region_engine::RegionEngine; use store_api::region_request::{ RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, }; use store_api::storage::RegionId; +use tokio::sync::oneshot; use super::ScanRequest; use crate::config::MitoConfig; +use crate::request::{BackgroundNotify, FlushFinished, WorkerRequest}; +use crate::sst::file::{FileId, FileMeta, FileTimeRange}; use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; #[tokio::test] @@ -143,7 +147,6 @@ async fn test_engine_put_data_after_truncate() { #[tokio::test] async fn test_engine_truncate_after_flush() { - init_default_ut_logging(); let mut env = TestEnv::with_prefix("truncate-flush"); let engine = env.create_engine(MitoConfig::default()).await; @@ -231,12 +234,6 @@ async fn test_engine_truncate_reopen() { }; put_rows(&engine, region_id, rows).await; - // Flush the region. - engine - .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) - .await - .unwrap(); - // Truncate the region engine .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) @@ -269,3 +266,82 @@ async fn test_engine_truncate_reopen() { let expected = "++\n++"; assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_engine_truncate_during_flush() { + let mut env = TestEnv::with_prefix("truncate-reopen"); + let engine = env.create_engine(MitoConfig::default()).await; + + // Create the region. + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Put data to the region. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + let region = engine.get_region(region_id).unwrap(); + + // Create a parquet file. + // Simulate that the `do_flush()` function is currently being executed. + let file_id = FileId::random(); + let file_name = format!("{}.parquet", file_id); + let file_meta = FileMeta { + region_id, + file_id, + time_range: FileTimeRange::default(), + level: 0, + file_size: 0, + }; + env.get_object_store() + .unwrap() + .write(&join_path(®ion_dir, &file_name), vec![]) + .await + .unwrap(); + + let (sender, receiver) = oneshot::channel(); + + let flushed_entry_id = region.version_control.current().last_entry_id; + + // Truncate the region. + engine + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap(); + + // The flush task is finished, and the `handle_flush_finished()` is executed. + let finished = FlushFinished { + region_id, + file_metas: vec![file_meta.clone()], + flushed_entry_id, + memtables_to_remove: SmallVec::new(), + file_purger: region.file_purger.clone(), + senders: vec![sender], + }; + + let worker_request = WorkerRequest::Background { + region_id, + notify: BackgroundNotify::FlushFinished(finished), + }; + + engine + .handle_worker_request(region_id, worker_request) + .await + .unwrap(); + + let _ = receiver.await.unwrap(); + + let request = ScanRequest::default(); + let scanner = engine.scan(region_id, request.clone()).unwrap(); + assert_eq!(0, scanner.num_files()); +} diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 949898236c3b..9df52e65b913 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; +use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::manifest::ManifestVersion; @@ -129,6 +130,10 @@ impl RegionManifestBuilder { pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) { self.manifest_version = manifest_version; self.flushed_entry_id = truncate.flushed_entry_id; + info!( + "Truncate region {} to entry {}", + truncate.region_id, truncate.flushed_entry_id + ); self.files.clear(); } diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 36eaec042eb1..9bf32cdfd092 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -19,7 +19,7 @@ use common_time::util::current_time_millis; use store_api::logstore::LogStore; use store_api::storage::RegionId; -use crate::error::Result; +use crate::error::{RegionTruncatingSnafu, Result}; use crate::flush::{FlushReason, RegionFlushTask}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; @@ -136,6 +136,17 @@ impl RegionWorkerLoop { return; }; + // The flush task before truncating the region. + let version_data = region.version_control.current(); + if version_data.version.flushed_entry_id >= request.flushed_entry_id { + info!( + "Truncate region {} to {}", + region_id, request.flushed_entry_id + ); + request.on_failure(RegionTruncatingSnafu { region_id }.build()); + return; + } + // Write region edit to manifest. let edit = RegionEdit { files_to_add: std::mem::take(&mut request.file_metas), diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 2702c94ea7d9..cb4b18dcc091 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -31,27 +31,30 @@ impl RegionWorkerLoop { info!("Try to truncate region {}", region_id); let version_data = region.version_control.current(); + let entry_id = version_data.last_entry_id; + + // Notifies flush scheduler. + self.flush_scheduler.on_region_truncating(region_id); + + // TODO(DevilExileSu): Consider compaction tasks during truncate. // Write region truncated to manifest. let truncate = RegionTruncate { region_id, - flushed_entry_id: version_data.last_entry_id, + flushed_entry_id: entry_id, }; let action_list = RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); region.manifest_manager.update(action_list).await?; - // Notifies flush scheduler. - self.flush_scheduler.on_region_truncating(region_id); - // Reset region's version and mark all SSTs deleted. - let entry_id = version_data.last_entry_id; region .version_control .reset(entry_id, &self.memtable_builder); // Make all data obsolete. self.wal.obsolete(region_id, entry_id).await?; + info!("Done truncate"); Ok(Output::AffectedRows(0)) } From 778f7288f7dd994ca9c1c8c5e243ecb7a355f4e7 Mon Sep 17 00:00:00 2001 From: Vanish Date: Mon, 11 Sep 2023 15:33:19 +0800 Subject: [PATCH 06/20] test --- src/mito2/src/engine/truncate_test.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index c368212801fa..0632fe7f235b 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -21,7 +21,7 @@ use object_store::util::join_path; use smallvec::SmallVec; use store_api::region_engine::RegionEngine; use store_api::region_request::{ - RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, + RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, }; use store_api::storage::RegionId; use tokio::sync::oneshot; @@ -103,7 +103,7 @@ async fn test_engine_put_data_after_truncate() { }; put_rows(&engine, region_id, rows).await; - // Scan the region. + // Scan the region.mut let request = ScanRequest::default(); let stream = engine.handle_query(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -234,19 +234,17 @@ async fn test_engine_truncate_reopen() { }; put_rows(&engine, region_id, rows).await; + let region = engine.get_region(region_id).unwrap(); + let last_entry_id = region.version_control.current().last_entry_id; + // Truncate the region engine .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); - // Close the region. - engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) - .await - .unwrap(); - // Reopen the region again. + let engine = env.reopen_engine(engine, MitoConfig::default()).await; engine .handle_request( region_id, @@ -259,6 +257,9 @@ async fn test_engine_truncate_reopen() { .await .unwrap(); + let region = engine.get_region(region_id).unwrap(); + assert_eq!(last_entry_id, region.version().flushed_entry_id); + // Scan the region. let request = ScanRequest::default(); let stream = engine.handle_query(region_id, request).await.unwrap(); From 996834d0edb1b4a5184b78a08e46561e518148db Mon Sep 17 00:00:00 2001 From: Vanish Date: Tue, 12 Sep 2023 16:04:45 +0800 Subject: [PATCH 07/20] feat: check flush and compaction tasks --- src/mito2/src/compaction/twcs.rs | 4 + src/mito2/src/engine/truncate_test.rs | 99 ++++++++++++++++++++++- src/mito2/src/manifest/action.rs | 5 -- src/mito2/src/manifest/manager.rs | 5 ++ src/mito2/src/region/version.rs | 43 +++++++++- src/mito2/src/request.rs | 3 + src/mito2/src/worker/handle_compaction.rs | 11 +++ src/mito2/src/worker/handle_flush.rs | 12 ++- src/mito2/src/worker/handle_truncate.rs | 5 +- 9 files changed, 168 insertions(+), 19 deletions(-) diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 1b6293b9c82a..19cc923d279b 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -24,6 +24,7 @@ use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; use snafu::ResultExt; +use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; use tokio::sync::mpsc; @@ -171,6 +172,7 @@ impl Picker for TwcsPicker { request_sender, sender: waiter, file_purger, + last_truncate_manifest_version: current_version.last_truncate_manifest_version, }; Some(Box::new(task)) } @@ -229,6 +231,7 @@ pub(crate) struct TwcsCompactionTask { pub(crate) request_sender: mpsc::Sender, /// Sender that are used to notify waiters waiting for pending compaction tasks. pub sender: OptionOutputTx, + pub last_truncate_manifest_version: Option, } impl Debug for TwcsCompactionTask { @@ -354,6 +357,7 @@ impl CompactionTask for TwcsCompactionTask { compacted_files: deleted, sender: self.sender.take(), file_purger: self.file_purger.clone(), + last_truncate_manifest_version: self.last_truncate_manifest_version, }) } Err(e) => { diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 0632fe7f235b..2aca44b73e60 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -28,7 +28,7 @@ use tokio::sync::oneshot; use super::ScanRequest; use crate::config::MitoConfig; -use crate::request::{BackgroundNotify, FlushFinished, WorkerRequest}; +use crate::request::{BackgroundNotify, CompactionFinished, FlushFinished, WorkerRequest}; use crate::sst::file::{FileId, FileMeta, FileTimeRange}; use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; @@ -270,7 +270,7 @@ async fn test_engine_truncate_reopen() { #[tokio::test] async fn test_engine_truncate_during_flush() { - let mut env = TestEnv::with_prefix("truncate-reopen"); + let mut env = TestEnv::with_prefix("truncate-during-flush"); let engine = env.create_engine(MitoConfig::default()).await; // Create the region. @@ -314,6 +314,10 @@ async fn test_engine_truncate_during_flush() { let flushed_entry_id = region.version_control.current().last_entry_id; + let current_version = region.version_control.current().version; + assert_eq!(current_version.truncate_entry_id, None); + assert_eq!(current_version.last_truncate_manifest_version, None); + // Truncate the region. engine .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) @@ -340,9 +344,98 @@ async fn test_engine_truncate_during_flush() { .await .unwrap(); - let _ = receiver.await.unwrap(); + let _ = receiver.await; let request = ScanRequest::default(); let scanner = engine.scan(region_id, request.clone()).unwrap(); assert_eq!(0, scanner.num_files()); + + // Put data to the region. + let rows = Rows { + schema: column_schemas, + rows: build_rows(5, 8), + }; + put_rows(&engine, region_id, rows).await; + + // Flush the region. + engine + .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .await + .unwrap(); + + let current_version = region.version_control.current().version; + assert_eq!(current_version.truncate_entry_id, None); + assert_eq!(current_version.last_truncate_manifest_version, Some(0)); +} + +#[tokio::test] +async fn test_engine_truncate_during_compaction() { + let mut env = TestEnv::with_prefix("truncate-during-compaction"); + let engine = env.create_engine(MitoConfig::default()).await; + + // Create the region. + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + + // Create a parquet file. + // Simulate that the `handle_compaction()` function is currently being executed. + let file_id = FileId::random(); + let file_name = format!("{}.parquet", file_id); + let file_meta = FileMeta { + region_id, + file_id, + time_range: FileTimeRange::default(), + level: 0, + file_size: 0, + }; + env.get_object_store() + .unwrap() + .write(&join_path(®ion_dir, &file_name), vec![]) + .await + .unwrap(); + + let (sender, receiver) = oneshot::channel(); + + // Truncate the region. + engine + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .await + .unwrap(); + + // The compaction task is finished, and the `handle_compaction_finished()` is executed. + let finished = CompactionFinished { + region_id, + compaction_outputs: vec![file_meta], + compacted_files: vec![], + sender: Some(sender), + file_purger: region.file_purger.clone(), + last_truncate_manifest_version: None, + }; + + let worker_request = WorkerRequest::Background { + region_id, + notify: BackgroundNotify::CompactionFinished(finished), + }; + + engine + .handle_worker_request(region_id, worker_request) + .await + .unwrap(); + + let _ = receiver.await; + + let request = ScanRequest::default(); + let scanner = engine.scan(region_id, request.clone()).unwrap(); + assert_eq!(0, scanner.num_files()); + + let current_version = region.version_control.current().version; + assert_eq!(current_version.last_truncate_manifest_version, Some(0)); } diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 9df52e65b913..949898236c3b 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; -use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::manifest::ManifestVersion; @@ -130,10 +129,6 @@ impl RegionManifestBuilder { pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) { self.manifest_version = manifest_version; self.flushed_entry_id = truncate.flushed_entry_id; - info!( - "Truncate region {} to entry {}", - truncate.region_id, truncate.flushed_entry_id - ); self.files.clear(); } diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index e9c85eea3958..78b2b71954ea 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -149,6 +149,11 @@ impl RegionManifestManager { inner.manifest.clone() } + pub async fn manifest_version(&self) -> ManifestVersion { + let inner = self.inner.read().await; + inner.last_version + } + #[cfg(test)] pub async fn store(&self) -> ManifestObjectStore { let inner = self.inner.read().await; diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 78b3eca6d832..5ead6ab9aabd 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -25,6 +25,7 @@ use std::sync::{Arc, RwLock}; +use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::storage::SequenceNumber; @@ -139,14 +140,21 @@ impl VersionControl { version_data.version = new_version; } - /// Reset current version. - pub(crate) fn reset(&self, flushed_entry_id: u64, memtable_builder: &MemtableBuilderRef) { + /// Truncate current version. + pub(crate) fn truncate( + &self, + flushed_entry_id: u64, + manifest_version: ManifestVersion, + memtable_builder: &MemtableBuilderRef, + ) { let version = self.current().version; let new_mutable = memtable_builder.build(&version.metadata); let new_version = Arc::new( VersionBuilder::new(version.metadata.clone(), new_mutable) .flushed_entry_id(flushed_entry_id) + .truncate_entry_id(Some(flushed_entry_id)) + .last_truncate_manifest_version(Some(manifest_version)) .build(), ); @@ -193,6 +201,14 @@ pub(crate) struct Version { pub(crate) flushed_entry_id: EntryId, /// Inclusive max sequence of flushed data. pub(crate) flushed_sequence: SequenceNumber, + /// Latest entry id during the truncating table. + /// + /// Used to check if it is a flush task during the truncation table. + pub(crate) truncate_entry_id: Option, + /// Last truncate table `ManifestVersion` + /// + /// Used to check if it is a compaction task during the truncation table. + pub(crate) last_truncate_manifest_version: Option, // TODO(yingwen): RegionOptions. } @@ -205,6 +221,8 @@ pub(crate) struct VersionBuilder { ssts: SstVersionRef, flushed_entry_id: EntryId, flushed_sequence: SequenceNumber, + truncate_entry_id: Option, + last_truncate_manifest_version: Option, } impl VersionBuilder { @@ -216,6 +234,8 @@ impl VersionBuilder { ssts: Arc::new(SstVersion::new()), flushed_entry_id: 0, flushed_sequence: 0, + truncate_entry_id: None, + last_truncate_manifest_version: None, } } @@ -227,6 +247,8 @@ impl VersionBuilder { ssts: version.ssts.clone(), flushed_entry_id: version.flushed_entry_id, flushed_sequence: version.flushed_sequence, + truncate_entry_id: None, + last_truncate_manifest_version: version.last_truncate_manifest_version, } } @@ -254,6 +276,21 @@ impl VersionBuilder { self } + /// Sets truncated entty id. + pub(crate) fn truncate_entry_id(mut self, entry_id: Option) -> Self { + self.truncate_entry_id = entry_id; + self + } + + /// Sets last truncate manifest version. + pub(crate) fn last_truncate_manifest_version( + mut self, + manifest_version: Option, + ) -> Self { + self.last_truncate_manifest_version = manifest_version; + self + } + /// Apply edit to the builder. pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self { if let Some(entry_id) = edit.flushed_entry_id { @@ -303,6 +340,8 @@ impl VersionBuilder { ssts: self.ssts, flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, + truncate_entry_id: self.truncate_entry_id, + last_truncate_manifest_version: self.last_truncate_manifest_version, } } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index f4b4352cc493..61964b5f6da6 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -32,6 +32,7 @@ use datatypes::prelude::DataType; use prost::Message; use smallvec::SmallVec; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::manifest::ManifestVersion; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_request::{ RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, @@ -665,6 +666,8 @@ pub(crate) struct CompactionFinished { pub(crate) sender: OptionOutputTx, /// File purger for cleaning files on failure. pub(crate) file_purger: FilePurgerRef, + /// Last truncate `ManifestVersion` before requesting compaction task. + pub(crate) last_truncate_manifest_version: Option, } impl CompactionFinished { diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 101e9011ebad..7d9b48c08311 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -17,6 +17,7 @@ use store_api::logstore::LogStore; use store_api::storage::RegionId; use crate::compaction::CompactionRequest; +use crate::error::RegionTruncatingSnafu; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; @@ -54,6 +55,16 @@ impl RegionWorkerLoop { return; }; + let version_data = region.version_control.current(); + if version_data + .version + .last_truncate_manifest_version + .ne(&request.last_truncate_manifest_version) + { + request.on_failure(RegionTruncatingSnafu { region_id }.build()); + return; + } + // Write region edit to manifest. let edit = RegionEdit { files_to_add: std::mem::take(&mut request.compaction_outputs), diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 9bf32cdfd092..702f4b98e4e3 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -138,13 +138,11 @@ impl RegionWorkerLoop { // The flush task before truncating the region. let version_data = region.version_control.current(); - if version_data.version.flushed_entry_id >= request.flushed_entry_id { - info!( - "Truncate region {} to {}", - region_id, request.flushed_entry_id - ); - request.on_failure(RegionTruncatingSnafu { region_id }.build()); - return; + if let Some(truncate_entry_id) = version_data.version.truncate_entry_id { + if truncate_entry_id >= request.flushed_entry_id { + request.on_failure(RegionTruncatingSnafu { region_id }.build()); + return; + } } // Write region edit to manifest. diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index cb4b18dcc091..e16bb28b19a1 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -36,7 +36,8 @@ impl RegionWorkerLoop { // Notifies flush scheduler. self.flush_scheduler.on_region_truncating(region_id); - // TODO(DevilExileSu): Consider compaction tasks during truncate. + // TODO(DevilExileSu): Notifies compaction scheduler. + let last_manifest_version = region.manifest_manager.manifest_version().await; // Write region truncated to manifest. let truncate = RegionTruncate { @@ -50,7 +51,7 @@ impl RegionWorkerLoop { // Reset region's version and mark all SSTs deleted. region .version_control - .reset(entry_id, &self.memtable_builder); + .truncate(entry_id, last_manifest_version, &self.memtable_builder); // Make all data obsolete. self.wal.obsolete(region_id, entry_id).await?; From 86596f82b7db4dce650c9a5923aed21a25db5463 Mon Sep 17 00:00:00 2001 From: Vanish Date: Tue, 12 Sep 2023 21:13:56 +0800 Subject: [PATCH 08/20] chore: remove useless changes --- src/mito2/src/compaction/twcs.rs | 4 -- src/mito2/src/engine/truncate_test.rs | 79 +---------------------- src/mito2/src/manifest/manager.rs | 5 -- src/mito2/src/region/version.rs | 26 +------- src/mito2/src/request.rs | 3 - src/mito2/src/test_util.rs | 5 -- src/mito2/src/worker/handle_compaction.rs | 11 ---- src/mito2/src/worker/handle_truncate.rs | 3 +- 8 files changed, 5 insertions(+), 131 deletions(-) diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 19cc923d279b..1b6293b9c82a 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -24,7 +24,6 @@ use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; use snafu::ResultExt; -use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; use tokio::sync::mpsc; @@ -172,7 +171,6 @@ impl Picker for TwcsPicker { request_sender, sender: waiter, file_purger, - last_truncate_manifest_version: current_version.last_truncate_manifest_version, }; Some(Box::new(task)) } @@ -231,7 +229,6 @@ pub(crate) struct TwcsCompactionTask { pub(crate) request_sender: mpsc::Sender, /// Sender that are used to notify waiters waiting for pending compaction tasks. pub sender: OptionOutputTx, - pub last_truncate_manifest_version: Option, } impl Debug for TwcsCompactionTask { @@ -357,7 +354,6 @@ impl CompactionTask for TwcsCompactionTask { compacted_files: deleted, sender: self.sender.take(), file_purger: self.file_purger.clone(), - last_truncate_manifest_version: self.last_truncate_manifest_version, }) } Err(e) => { diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 2aca44b73e60..6f3e6c9bb267 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -28,7 +28,7 @@ use tokio::sync::oneshot; use super::ScanRequest; use crate::config::MitoConfig; -use crate::request::{BackgroundNotify, CompactionFinished, FlushFinished, WorkerRequest}; +use crate::request::{BackgroundNotify, FlushFinished, WorkerRequest}; use crate::sst::file::{FileId, FileMeta, FileTimeRange}; use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; @@ -103,7 +103,7 @@ async fn test_engine_put_data_after_truncate() { }; put_rows(&engine, region_id, rows).await; - // Scan the region.mut + // Scan the region let request = ScanRequest::default(); let stream = engine.handle_query(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -316,7 +316,6 @@ async fn test_engine_truncate_during_flush() { let current_version = region.version_control.current().version; assert_eq!(current_version.truncate_entry_id, None); - assert_eq!(current_version.last_truncate_manifest_version, None); // Truncate the region. engine @@ -329,6 +328,7 @@ async fn test_engine_truncate_during_flush() { region_id, file_metas: vec![file_meta.clone()], flushed_entry_id, + flushed_sequence: flushed_entry_id, memtables_to_remove: SmallVec::new(), file_purger: region.file_purger.clone(), senders: vec![sender], @@ -365,77 +365,4 @@ async fn test_engine_truncate_during_flush() { let current_version = region.version_control.current().version; assert_eq!(current_version.truncate_entry_id, None); - assert_eq!(current_version.last_truncate_manifest_version, Some(0)); -} - -#[tokio::test] -async fn test_engine_truncate_during_compaction() { - let mut env = TestEnv::with_prefix("truncate-during-compaction"); - let engine = env.create_engine(MitoConfig::default()).await; - - // Create the region. - let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); - let region_dir = request.region_dir.clone(); - - engine - .handle_request(region_id, RegionRequest::Create(request)) - .await - .unwrap(); - - let region = engine.get_region(region_id).unwrap(); - - // Create a parquet file. - // Simulate that the `handle_compaction()` function is currently being executed. - let file_id = FileId::random(); - let file_name = format!("{}.parquet", file_id); - let file_meta = FileMeta { - region_id, - file_id, - time_range: FileTimeRange::default(), - level: 0, - file_size: 0, - }; - env.get_object_store() - .unwrap() - .write(&join_path(®ion_dir, &file_name), vec![]) - .await - .unwrap(); - - let (sender, receiver) = oneshot::channel(); - - // Truncate the region. - engine - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) - .await - .unwrap(); - - // The compaction task is finished, and the `handle_compaction_finished()` is executed. - let finished = CompactionFinished { - region_id, - compaction_outputs: vec![file_meta], - compacted_files: vec![], - sender: Some(sender), - file_purger: region.file_purger.clone(), - last_truncate_manifest_version: None, - }; - - let worker_request = WorkerRequest::Background { - region_id, - notify: BackgroundNotify::CompactionFinished(finished), - }; - - engine - .handle_worker_request(region_id, worker_request) - .await - .unwrap(); - - let _ = receiver.await; - - let request = ScanRequest::default(); - let scanner = engine.scan(region_id, request.clone()).unwrap(); - assert_eq!(0, scanner.num_files()); - - let current_version = region.version_control.current().version; - assert_eq!(current_version.last_truncate_manifest_version, Some(0)); } diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 78b2b71954ea..e9c85eea3958 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -149,11 +149,6 @@ impl RegionManifestManager { inner.manifest.clone() } - pub async fn manifest_version(&self) -> ManifestVersion { - let inner = self.inner.read().await; - inner.last_version - } - #[cfg(test)] pub async fn store(&self) -> ManifestObjectStore { let inner = self.inner.read().await; diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 5ead6ab9aabd..60df423e1288 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -25,7 +25,6 @@ use std::sync::{Arc, RwLock}; -use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::storage::SequenceNumber; @@ -141,12 +140,7 @@ impl VersionControl { } /// Truncate current version. - pub(crate) fn truncate( - &self, - flushed_entry_id: u64, - manifest_version: ManifestVersion, - memtable_builder: &MemtableBuilderRef, - ) { + pub(crate) fn truncate(&self, flushed_entry_id: u64, memtable_builder: &MemtableBuilderRef) { let version = self.current().version; let new_mutable = memtable_builder.build(&version.metadata); @@ -154,7 +148,6 @@ impl VersionControl { VersionBuilder::new(version.metadata.clone(), new_mutable) .flushed_entry_id(flushed_entry_id) .truncate_entry_id(Some(flushed_entry_id)) - .last_truncate_manifest_version(Some(manifest_version)) .build(), ); @@ -205,10 +198,6 @@ pub(crate) struct Version { /// /// Used to check if it is a flush task during the truncation table. pub(crate) truncate_entry_id: Option, - /// Last truncate table `ManifestVersion` - /// - /// Used to check if it is a compaction task during the truncation table. - pub(crate) last_truncate_manifest_version: Option, // TODO(yingwen): RegionOptions. } @@ -222,7 +211,6 @@ pub(crate) struct VersionBuilder { flushed_entry_id: EntryId, flushed_sequence: SequenceNumber, truncate_entry_id: Option, - last_truncate_manifest_version: Option, } impl VersionBuilder { @@ -235,7 +223,6 @@ impl VersionBuilder { flushed_entry_id: 0, flushed_sequence: 0, truncate_entry_id: None, - last_truncate_manifest_version: None, } } @@ -248,7 +235,6 @@ impl VersionBuilder { flushed_entry_id: version.flushed_entry_id, flushed_sequence: version.flushed_sequence, truncate_entry_id: None, - last_truncate_manifest_version: version.last_truncate_manifest_version, } } @@ -282,15 +268,6 @@ impl VersionBuilder { self } - /// Sets last truncate manifest version. - pub(crate) fn last_truncate_manifest_version( - mut self, - manifest_version: Option, - ) -> Self { - self.last_truncate_manifest_version = manifest_version; - self - } - /// Apply edit to the builder. pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self { if let Some(entry_id) = edit.flushed_entry_id { @@ -341,7 +318,6 @@ impl VersionBuilder { flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, truncate_entry_id: self.truncate_entry_id, - last_truncate_manifest_version: self.last_truncate_manifest_version, } } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 61964b5f6da6..f4b4352cc493 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -32,7 +32,6 @@ use datatypes::prelude::DataType; use prost::Message; use smallvec::SmallVec; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::manifest::ManifestVersion; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_request::{ RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, @@ -666,8 +665,6 @@ pub(crate) struct CompactionFinished { pub(crate) sender: OptionOutputTx, /// File purger for cleaning files on failure. pub(crate) file_purger: FilePurgerRef, - /// Last truncate `ManifestVersion` before requesting compaction task. - pub(crate) last_truncate_manifest_version: Option, } impl CompactionFinished { diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 6052dd9d78f1..6fbf197ab9b4 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -15,7 +15,6 @@ //! Utilities for testing. use std::collections::HashMap; -use std::path::PathBuf; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; @@ -196,10 +195,6 @@ impl TestEnv { RegionManifestManager::open(manifest_opts).await } } - - pub fn get_data_path(&self) -> PathBuf { - self.data_home.path().join("data") - } } /// Builder to mock a [RegionCreateRequest]. diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 7d9b48c08311..101e9011ebad 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -17,7 +17,6 @@ use store_api::logstore::LogStore; use store_api::storage::RegionId; use crate::compaction::CompactionRequest; -use crate::error::RegionTruncatingSnafu; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; @@ -55,16 +54,6 @@ impl RegionWorkerLoop { return; }; - let version_data = region.version_control.current(); - if version_data - .version - .last_truncate_manifest_version - .ne(&request.last_truncate_manifest_version) - { - request.on_failure(RegionTruncatingSnafu { region_id }.build()); - return; - } - // Write region edit to manifest. let edit = RegionEdit { files_to_add: std::mem::take(&mut request.compaction_outputs), diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index e16bb28b19a1..455ac59dbe85 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -37,7 +37,6 @@ impl RegionWorkerLoop { self.flush_scheduler.on_region_truncating(region_id); // TODO(DevilExileSu): Notifies compaction scheduler. - let last_manifest_version = region.manifest_manager.manifest_version().await; // Write region truncated to manifest. let truncate = RegionTruncate { @@ -51,7 +50,7 @@ impl RegionWorkerLoop { // Reset region's version and mark all SSTs deleted. region .version_control - .truncate(entry_id, last_manifest_version, &self.memtable_builder); + .truncate(entry_id, &self.memtable_builder); // Make all data obsolete. self.wal.obsolete(region_id, entry_id).await?; From cd18bb022ad053731fad6439588e3f571feeace3 Mon Sep 17 00:00:00 2001 From: Vanish Date: Tue, 12 Sep 2023 22:29:56 +0800 Subject: [PATCH 09/20] Update src/mito2/src/manifest/action.rs Co-authored-by: Yingwen --- src/mito2/src/manifest/action.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 949898236c3b..29552fbfde81 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -62,7 +62,7 @@ pub struct RegionRemove { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionTruncate { pub region_id: RegionId, - pub flushed_entry_id: EntryId, + pub truncated_entry_id: EntryId, } /// The region manifest data. From c7bd72943cdaf89b93bb76fb9f60d08c0e15aee8 Mon Sep 17 00:00:00 2001 From: Vanish Date: Tue, 12 Sep 2023 22:35:04 +0800 Subject: [PATCH 10/20] Update src/mito2/src/worker/handle_flush.rs Co-authored-by: Yingwen --- src/mito2/src/worker/handle_flush.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 702f4b98e4e3..39ecae956a14 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -138,8 +138,8 @@ impl RegionWorkerLoop { // The flush task before truncating the region. let version_data = region.version_control.current(); - if let Some(truncate_entry_id) = version_data.version.truncate_entry_id { - if truncate_entry_id >= request.flushed_entry_id { + if let Some(truncated_entry_id) = version_data.version.truncated_entry_id { + if truncated_entry_id >= request.flushed_entry_id { request.on_failure(RegionTruncatingSnafu { region_id }.build()); return; } From 6286893ab01d4e65a1d211c787bb2781f84f6ee4 Mon Sep 17 00:00:00 2001 From: Vanish Date: Tue, 12 Sep 2023 23:11:06 +0800 Subject: [PATCH 11/20] chore: CR, consider sequence number --- src/mito2/src/engine/truncate_test.rs | 4 ++-- src/mito2/src/manifest/action.rs | 4 +++- src/mito2/src/region/version.rs | 26 +++++++++++++++---------- src/mito2/src/worker/handle_truncate.rs | 25 +++++++++++++----------- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 6f3e6c9bb267..f6b70a58b572 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -315,7 +315,7 @@ async fn test_engine_truncate_during_flush() { let flushed_entry_id = region.version_control.current().last_entry_id; let current_version = region.version_control.current().version; - assert_eq!(current_version.truncate_entry_id, None); + assert_eq!(current_version.truncated_entry_id, None); // Truncate the region. engine @@ -364,5 +364,5 @@ async fn test_engine_truncate_during_flush() { .unwrap(); let current_version = region.version_control.current().version; - assert_eq!(current_version.truncate_entry_id, None); + assert_eq!(current_version.truncated_entry_id, None); } diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 29552fbfde81..89dfeccf0de6 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -63,6 +63,7 @@ pub struct RegionRemove { pub struct RegionTruncate { pub region_id: RegionId, pub truncated_entry_id: EntryId, + pub truncated_sequence: SequenceNumber, } /// The region manifest data. @@ -128,7 +129,8 @@ impl RegionManifestBuilder { pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) { self.manifest_version = manifest_version; - self.flushed_entry_id = truncate.flushed_entry_id; + self.flushed_entry_id = truncate.truncated_entry_id; + self.flushed_sequence = truncate.truncated_sequence; self.files.clear(); } diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 60df423e1288..f9fc2ee83727 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -140,14 +140,20 @@ impl VersionControl { } /// Truncate current version. - pub(crate) fn truncate(&self, flushed_entry_id: u64, memtable_builder: &MemtableBuilderRef) { + pub(crate) fn truncate( + &self, + truncated_entry_id: EntryId, + truncated_sequence: SequenceNumber, + memtable_builder: &MemtableBuilderRef, + ) { let version = self.current().version; let new_mutable = memtable_builder.build(&version.metadata); let new_version = Arc::new( VersionBuilder::new(version.metadata.clone(), new_mutable) - .flushed_entry_id(flushed_entry_id) - .truncate_entry_id(Some(flushed_entry_id)) + .flushed_entry_id(truncated_entry_id) + .flushed_sequence(truncated_sequence) + .truncated_entry_id(Some(truncated_entry_id)) .build(), ); @@ -197,7 +203,7 @@ pub(crate) struct Version { /// Latest entry id during the truncating table. /// /// Used to check if it is a flush task during the truncation table. - pub(crate) truncate_entry_id: Option, + pub(crate) truncated_entry_id: Option, // TODO(yingwen): RegionOptions. } @@ -210,7 +216,7 @@ pub(crate) struct VersionBuilder { ssts: SstVersionRef, flushed_entry_id: EntryId, flushed_sequence: SequenceNumber, - truncate_entry_id: Option, + truncated_entry_id: Option, } impl VersionBuilder { @@ -222,7 +228,7 @@ impl VersionBuilder { ssts: Arc::new(SstVersion::new()), flushed_entry_id: 0, flushed_sequence: 0, - truncate_entry_id: None, + truncated_entry_id: None, } } @@ -234,7 +240,7 @@ impl VersionBuilder { ssts: version.ssts.clone(), flushed_entry_id: version.flushed_entry_id, flushed_sequence: version.flushed_sequence, - truncate_entry_id: None, + truncated_entry_id: None, } } @@ -263,8 +269,8 @@ impl VersionBuilder { } /// Sets truncated entty id. - pub(crate) fn truncate_entry_id(mut self, entry_id: Option) -> Self { - self.truncate_entry_id = entry_id; + pub(crate) fn truncated_entry_id(mut self, entry_id: Option) -> Self { + self.truncated_entry_id = entry_id; self } @@ -317,7 +323,7 @@ impl VersionBuilder { ssts: self.ssts, flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, - truncate_entry_id: self.truncate_entry_id, + truncated_entry_id: self.truncated_entry_id, } } } diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 455ac59dbe85..42a418893846 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -31,29 +31,32 @@ impl RegionWorkerLoop { info!("Try to truncate region {}", region_id); let version_data = region.version_control.current(); - let entry_id = version_data.last_entry_id; - - // Notifies flush scheduler. - self.flush_scheduler.on_region_truncating(region_id); - - // TODO(DevilExileSu): Notifies compaction scheduler. + let truncated_entry_id = version_data.last_entry_id; + let truncated_sequence = version_data.committed_sequence; // Write region truncated to manifest. let truncate = RegionTruncate { region_id, - flushed_entry_id: entry_id, + truncated_entry_id, + truncated_sequence, }; let action_list = RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); region.manifest_manager.update(action_list).await?; + // Notifies flush scheduler. + self.flush_scheduler.on_region_truncating(region_id); + // TODO(DevilExileSu): Notifies compaction scheduler. + // Reset region's version and mark all SSTs deleted. - region - .version_control - .truncate(entry_id, &self.memtable_builder); + region.version_control.truncate( + truncated_entry_id, + truncated_sequence, + &self.memtable_builder, + ); // Make all data obsolete. - self.wal.obsolete(region_id, entry_id).await?; + self.wal.obsolete(region_id, truncated_entry_id).await?; info!("Done truncate"); Ok(Output::AffectedRows(0)) From 07e184c1465e07c023043325ed516b7d3d565512 Mon Sep 17 00:00:00 2001 From: Vanish Date: Wed, 13 Sep 2023 16:36:00 +0800 Subject: [PATCH 12/20] test: use EventListener to test the flush task during truncate --- src/mito2/src/engine.rs | 18 ------ src/mito2/src/engine/listener.rs | 42 ++++++++++++++ src/mito2/src/engine/truncate_test.rs | 84 ++++++++++----------------- src/mito2/src/worker.rs | 9 +++ 4 files changed, 83 insertions(+), 70 deletions(-) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index fe44d3acc818..7cd1079d2d12 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -91,15 +91,6 @@ impl MitoEngine { pub(crate) fn get_region(&self, id: RegionId) -> Option { self.inner.workers.get_region(id) } - - #[cfg(test)] - pub(crate) async fn handle_worker_request( - &self, - region_id: RegionId, - request: WorkerRequest, - ) -> Result<()> { - self.inner.handle_worker_request(region_id, request).await - } } /// Inner struct of [MitoEngine]. @@ -168,15 +159,6 @@ impl EngineInner { region.set_writable(writable); Ok(()) } - - #[cfg(test)] - pub(crate) async fn handle_worker_request( - &self, - region_id: RegionId, - request: WorkerRequest, - ) -> Result<()> { - self.workers.submit_to_worker(region_id, request).await - } } #[async_trait] diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index e50d3b5a222f..2c0f689b142b 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -16,17 +16,21 @@ use std::sync::Arc; +use async_trait::async_trait; use common_telemetry::info; use store_api::storage::RegionId; use tokio::sync::Notify; /// Mito engine background event listener. +#[async_trait] pub trait EventListener: Send + Sync { /// Notifies the listener that a region is flushed successfully. fn on_flush_success(&self, region_id: RegionId); /// Notifies the listener that the engine is stalled. fn on_write_stall(&self); + + async fn on_handle_finished_begin(&self, region_id: RegionId); } pub type EventListenerRef = Arc; @@ -50,6 +54,7 @@ impl FlushListener { } } +#[async_trait] impl EventListener for FlushListener { fn on_flush_success(&self, region_id: RegionId) { info!("Region {} flush successfully", region_id); @@ -58,6 +63,8 @@ impl EventListener for FlushListener { } fn on_write_stall(&self) {} + + async fn on_handle_finished_begin(&self, _region_id: RegionId) {} } /// Listener to watch stall events. @@ -79,6 +86,7 @@ impl StallListener { } } +#[async_trait] impl EventListener for StallListener { fn on_flush_success(&self, _region_id: RegionId) {} @@ -87,4 +95,38 @@ impl EventListener for StallListener { self.notify.notify_one(); } + + async fn on_handle_finished_begin(&self, _region_id: RegionId) {} +} + +pub struct HandleFinishedListener { + notify: Notify, +} + +impl HandleFinishedListener { + pub fn new() -> HandleFinishedListener { + HandleFinishedListener { + notify: Notify::new(), + } + } + + pub fn notify(&self) { + self.notify.notify_one(); + } + + pub async fn wait(&self) { + self.notify.notified().await; + } +} + +#[async_trait] +impl EventListener for HandleFinishedListener { + fn on_flush_success(&self, _region_id: RegionId) {} + + fn on_write_stall(&self) {} + + async fn on_handle_finished_begin(&self, region_id: RegionId) { + info!("Region {} begin handle finished flush", region_id); + self.wait().await; + } } diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index f6b70a58b572..c89e438910f7 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -13,24 +13,25 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use std::time::Duration; use api::v1::Rows; use common_recordbatch::RecordBatches; -use object_store::util::join_path; -use smallvec::SmallVec; +use common_telemetry::init_default_ut_logging; use store_api::region_engine::RegionEngine; use store_api::region_request::{ RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, }; use store_api::storage::RegionId; -use tokio::sync::oneshot; use super::ScanRequest; use crate::config::MitoConfig; -use crate::request::{BackgroundNotify, FlushFinished, WorkerRequest}; -use crate::sst::file::{FileId, FileMeta, FileTimeRange}; -use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; +use crate::engine::listener::HandleFinishedListener; +use crate::test_util::{ + build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, MockWriteBufferManager, + TestEnv, +}; #[tokio::test] async fn test_engine_truncate_region_basic() { @@ -270,13 +271,21 @@ async fn test_engine_truncate_reopen() { #[tokio::test] async fn test_engine_truncate_during_flush() { + init_default_ut_logging(); let mut env = TestEnv::with_prefix("truncate-during-flush"); - let engine = env.create_engine(MitoConfig::default()).await; + let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); + let listener = Arc::new(HandleFinishedListener::new()); + let engine = env + .create_engine_with( + MitoConfig::default(), + write_buffer_manager.clone(), + Some(listener.clone()), + ) + .await; // Create the region. let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); - let region_dir = request.region_dir.clone(); let column_schemas = rows_schema(&request); engine @@ -293,62 +302,33 @@ async fn test_engine_truncate_during_flush() { let region = engine.get_region(region_id).unwrap(); - // Create a parquet file. - // Simulate that the `do_flush()` function is currently being executed. - let file_id = FileId::random(); - let file_name = format!("{}.parquet", file_id); - let file_meta = FileMeta { - region_id, - file_id, - time_range: FileTimeRange::default(), - level: 0, - file_size: 0, - }; - env.get_object_store() - .unwrap() - .write(&join_path(®ion_dir, &file_name), vec![]) - .await - .unwrap(); - - let (sender, receiver) = oneshot::channel(); + let version_data = region.version_control.current(); + let entry_id = version_data.last_entry_id; + let sequence = version_data.committed_sequence; - let flushed_entry_id = region.version_control.current().last_entry_id; - - let current_version = region.version_control.current().version; - assert_eq!(current_version.truncated_entry_id, None); + // Flush reigon. + let engine_cloned = engine.clone(); + tokio::spawn(async move { + flush_region(&engine_cloned, region_id).await; + }); - // Truncate the region. + // Truncate the region engine .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); - // The flush task is finished, and the `handle_flush_finished()` is executed. - let finished = FlushFinished { - region_id, - file_metas: vec![file_meta.clone()], - flushed_entry_id, - flushed_sequence: flushed_entry_id, - memtables_to_remove: SmallVec::new(), - file_purger: region.file_purger.clone(), - senders: vec![sender], - }; - - let worker_request = WorkerRequest::Background { - region_id, - notify: BackgroundNotify::FlushFinished(finished), - }; - - engine - .handle_worker_request(region_id, worker_request) - .await - .unwrap(); + listener.notify(); - let _ = receiver.await; + let version_data = region.version_control.current(); + let truncated_entry_id = version_data.version.truncated_entry_id.unwrap(); + let truncated_sequence = version_data.version.flushed_sequence; let request = ScanRequest::default(); let scanner = engine.scan(region_id, request.clone()).unwrap(); assert_eq!(0, scanner.num_files()); + assert_eq!(entry_id, truncated_entry_id); + assert_eq!(sequence, truncated_sequence); // Put data to the region. let rows = Rows { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 6ca21a3bf902..04d838f29b4b 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -573,6 +573,15 @@ impl WorkerListener { listener.on_write_stall(); } } + + pub(crate) async fn on_handle_finishd_begin(&self, region_id: RegionId) { + #[cfg(test)] + if let Some(listener) = &self.listener { + listener.on_handle_finished_begin(region_id).await; + } + // Avoid compiler warning. + let _ = region_id; + } } #[cfg(test)] From 100230beb1c955ce2c335b382b53bf3b0420cdcd Mon Sep 17 00:00:00 2001 From: Vanish Date: Thu, 14 Sep 2023 00:24:17 +0800 Subject: [PATCH 13/20] fix: fix listener error --- src/mito2/src/engine/listener.rs | 40 ++++++++++------- src/mito2/src/engine/truncate_test.rs | 59 +++++++++++++++---------- src/mito2/src/flush.rs | 3 ++ src/mito2/src/region/version.rs | 3 +- src/mito2/src/worker.rs | 6 +-- src/mito2/src/worker/handle_flush.rs | 1 + src/mito2/src/worker/handle_truncate.rs | 7 ++- 7 files changed, 71 insertions(+), 48 deletions(-) diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index 2c0f689b142b..2e7632b04d2c 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -30,7 +30,7 @@ pub trait EventListener: Send + Sync { /// Notifies the listener that the engine is stalled. fn on_write_stall(&self); - async fn on_handle_finished_begin(&self, region_id: RegionId); + async fn on_flush_begin(&self, region_id: RegionId); } pub type EventListenerRef = Arc; @@ -64,7 +64,7 @@ impl EventListener for FlushListener { fn on_write_stall(&self) {} - async fn on_handle_finished_begin(&self, _region_id: RegionId) {} + async fn on_flush_begin(&self, _region_id: RegionId) {} } /// Listener to watch stall events. @@ -96,37 +96,43 @@ impl EventListener for StallListener { self.notify.notify_one(); } - async fn on_handle_finished_begin(&self, _region_id: RegionId) {} + async fn on_flush_begin(&self, _region_id: RegionId) {} } -pub struct HandleFinishedListener { - notify: Notify, +pub struct FlushTruncateListener { + notify_flush: Notify, + notify_truncate: Notify, } -impl HandleFinishedListener { - pub fn new() -> HandleFinishedListener { - HandleFinishedListener { - notify: Notify::new(), +impl FlushTruncateListener { + pub fn new() -> FlushTruncateListener { + FlushTruncateListener { + notify_flush: Notify::new(), + notify_truncate: Notify::new(), } } - pub fn notify(&self) { - self.notify.notify_one(); + pub fn notify_flush(&self) { + self.notify_flush.notify_one(); } - pub async fn wait(&self) { - self.notify.notified().await; + pub async fn wait_truncate(&self) { + self.notify_truncate.notified().await; } } #[async_trait] -impl EventListener for HandleFinishedListener { +impl EventListener for FlushTruncateListener { fn on_flush_success(&self, _region_id: RegionId) {} fn on_write_stall(&self) {} - async fn on_handle_finished_begin(&self, region_id: RegionId) { - info!("Region {} begin handle finished flush", region_id); - self.wait().await; + async fn on_flush_begin(&self, region_id: RegionId) { + info!( + "Region {} begin do flush, notify region to truncate", + region_id + ); + self.notify_truncate.notify_one(); + self.notify_flush.notified().await; } } diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index c89e438910f7..af90bd1a0d5f 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -18,7 +18,7 @@ use std::time::Duration; use api::v1::Rows; use common_recordbatch::RecordBatches; -use common_telemetry::init_default_ut_logging; +use common_telemetry::{info, init_default_ut_logging}; use store_api::region_engine::RegionEngine; use store_api::region_request::{ RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, @@ -27,10 +27,9 @@ use store_api::storage::RegionId; use super::ScanRequest; use crate::config::MitoConfig; -use crate::engine::listener::HandleFinishedListener; +use crate::engine::listener::FlushTruncateListener; use crate::test_util::{ - build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, MockWriteBufferManager, - TestEnv, + build_rows, put_rows, rows_schema, CreateRequestBuilder, MockWriteBufferManager, TestEnv, }; #[tokio::test] @@ -174,7 +173,7 @@ async fn test_engine_truncate_after_flush() { .unwrap(); let request = ScanRequest::default(); - let scanner = engine.scan(region_id, request.clone()).unwrap(); + let scanner = engine.scanner(region_id, request.clone()).unwrap(); assert_eq!(1, scanner.num_files()); // Truncate the region. @@ -208,7 +207,7 @@ async fn test_engine_truncate_after_flush() { tokio::time::sleep(Duration::from_millis(100)).await; - let scanner = engine.scan(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).unwrap(); assert_eq!(0, scanner.num_files()); } @@ -274,7 +273,7 @@ async fn test_engine_truncate_during_flush() { init_default_ut_logging(); let mut env = TestEnv::with_prefix("truncate-during-flush"); let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); - let listener = Arc::new(HandleFinishedListener::new()); + let listener = Arc::new(FlushTruncateListener::new()); let engine = env .create_engine_with( MitoConfig::default(), @@ -286,6 +285,7 @@ async fn test_engine_truncate_during_flush() { // Create the region. let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); let column_schemas = rows_schema(&request); engine @@ -308,41 +308,54 @@ async fn test_engine_truncate_during_flush() { // Flush reigon. let engine_cloned = engine.clone(); - tokio::spawn(async move { - flush_region(&engine_cloned, region_id).await; + let flush_task = tokio::spawn(async move { + info!("do flush task!!!!"); + engine_cloned + .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .await }); - // Truncate the region + // Wait truncate before flush memtable. + listener.wait_truncate().await; + + // Truncate the region. engine .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); - listener.notify(); + // Notify region to continue flushing. + listener.notify_flush(); + + // Wait handle flushed finish. + let _err = flush_task.await.unwrap().unwrap_err(); + // Check sequences and entry id. let version_data = region.version_control.current(); - let truncated_entry_id = version_data.version.truncated_entry_id.unwrap(); + let truncated_entry_id = version_data.version.truncated_entry_id; let truncated_sequence = version_data.version.flushed_sequence; let request = ScanRequest::default(); - let scanner = engine.scan(region_id, request.clone()).unwrap(); + let scanner = engine.scanner(region_id, request.clone()).unwrap(); assert_eq!(0, scanner.num_files()); - assert_eq!(entry_id, truncated_entry_id); + assert_eq!(Some(entry_id), truncated_entry_id); assert_eq!(sequence, truncated_sequence); - // Put data to the region. - let rows = Rows { - schema: column_schemas, - rows: build_rows(5, 8), - }; - put_rows(&engine, region_id, rows).await; - - // Flush the region. + // Reopen the engine. + let engine = env.reopen_engine(engine, MitoConfig::default()).await; engine - .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + }), + ) .await .unwrap(); + let region = engine.get_region(region_id).unwrap(); let current_version = region.version_control.current().version; assert_eq!(current_version.truncated_entry_id, None); } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 06580794be45..1ae246ed208e 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -40,6 +40,7 @@ use crate::schedule::scheduler::{Job, SchedulerRef}; use crate::sst::file::{FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::parquet::WriteOptions; +use crate::worker::WorkerListener; /// Global write buffer (memtable) manager. /// @@ -187,6 +188,7 @@ pub(crate) struct RegionFlushTask { pub(crate) access_layer: AccessLayerRef, pub(crate) memtable_builder: MemtableBuilderRef, pub(crate) file_purger: FilePurgerRef, + pub(crate) listener: WorkerListener, } impl RegionFlushTask { @@ -228,6 +230,7 @@ impl RegionFlushTask { /// Runs the flush task. async fn do_flush(&mut self, version_data: VersionControlData) { + self.listener.on_flush_begin(self.region_id).await; let worker_request = match self.flush_memtables(&version_data.version).await { Ok(file_metas) => { let memtables_to_remove = version_data diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index f9fc2ee83727..0702a258798e 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -105,6 +105,7 @@ impl VersionControl { VersionBuilder::from_version(version) .apply_edit(edit, purger) .remove_memtables(memtables_to_remove) + .truncated_entry_id(None) .build(), ); @@ -240,7 +241,7 @@ impl VersionBuilder { ssts: version.ssts.clone(), flushed_entry_id: version.flushed_entry_id, flushed_sequence: version.flushed_sequence, - truncated_entry_id: None, + truncated_entry_id: version.truncated_entry_id, } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 04d838f29b4b..3cef393f7c87 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -542,7 +542,7 @@ impl RegionWorkerLoop { } /// Wrapper that only calls event listener in tests. -#[derive(Default)] +#[derive(Default, Clone)] pub(crate) struct WorkerListener { #[cfg(test)] listener: Option, @@ -574,10 +574,10 @@ impl WorkerListener { } } - pub(crate) async fn on_handle_finishd_begin(&self, region_id: RegionId) { + pub(crate) async fn on_flush_begin(&self, region_id: RegionId) { #[cfg(test)] if let Some(listener) = &self.listener { - listener.on_handle_finished_begin(region_id).await; + listener.on_flush_begin(region_id).await; } // Avoid compiler warning. let _ = region_id; diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 39ecae956a14..fa29b4f65472 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -121,6 +121,7 @@ impl RegionWorkerLoop { access_layer: region.access_layer.clone(), memtable_builder: self.memtable_builder.clone(), file_purger: region.file_purger.clone(), + listener: self.listener.clone(), } } } diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 42a418893846..4ee6dda2cdfe 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -19,15 +19,14 @@ use common_telemetry::info; use store_api::logstore::LogStore; use store_api::storage::RegionId; -use crate::error::{RegionNotFoundSnafu, Result}; +use crate::error::Result; use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTruncate}; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { pub(crate) async fn handle_truncate_request(&mut self, region_id: RegionId) -> Result { - let Some(region) = self.regions.get_region(region_id) else { - return RegionNotFoundSnafu { region_id }.fail(); - }; + let region = self.regions.writable_region(region_id)?; + info!("Try to truncate region {}", region_id); let version_data = region.version_control.current(); From f323daebd044acf63f0b2221563306ad554455d9 Mon Sep 17 00:00:00 2001 From: Vanish Date: Thu, 14 Sep 2023 15:05:30 +0800 Subject: [PATCH 14/20] Update src/mito2/src/engine/truncate_test.rs Co-authored-by: Yingwen --- src/mito2/src/engine/truncate_test.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index af90bd1a0d5f..83cdfbc279d4 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -190,10 +190,9 @@ async fn test_engine_truncate_after_flush() { put_rows(&engine, region_id, rows).await; // Scan the region. - let stream = engine - .handle_query(region_id, request.clone()) - .await - .unwrap(); + let scanner = engine.scanner(region_id, request).unwrap(); + assert_eq!(0, scanner.num_files()); + let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ @@ -204,11 +203,6 @@ async fn test_engine_truncate_after_flush() { | 7 | 7.0 | 1970-01-01T00:00:07 | +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); - - tokio::time::sleep(Duration::from_millis(100)).await; - - let scanner = engine.scanner(region_id, request).unwrap(); - assert_eq!(0, scanner.num_files()); } #[tokio::test] From ee38e98080b1ae100cec2f9ce33567a06cf58fe4 Mon Sep 17 00:00:00 2001 From: Vanish Date: Thu, 14 Sep 2023 17:38:59 +0800 Subject: [PATCH 15/20] chore: cr --- src/mito2/src/engine/truncate_test.rs | 3 +-- src/mito2/src/manifest/action.rs | 6 ++++++ src/mito2/src/manifest/tests/checkpoint.rs | 2 +- src/mito2/src/region/opener.rs | 1 + src/mito2/src/worker/handle_truncate.rs | 5 ++++- 5 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 83cdfbc279d4..c39f3a0e374b 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; use api::v1::Rows; use common_recordbatch::RecordBatches; @@ -351,5 +350,5 @@ async fn test_engine_truncate_during_flush() { let region = engine.get_region(region_id).unwrap(); let current_version = region.version_control.current().version; - assert_eq!(current_version.truncated_entry_id, None); + assert_eq!(current_version.truncated_entry_id, Some(entry_id)); } diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 89dfeccf0de6..68899c788771 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -79,6 +79,8 @@ pub struct RegionManifest { pub flushed_sequence: SequenceNumber, /// Current manifest version. pub manifest_version: ManifestVersion, + /// Last WAL entry id of truncated data. + pub truncated_entry_id: Option, } #[derive(Debug, Default)] @@ -88,6 +90,7 @@ pub struct RegionManifestBuilder { flushed_entry_id: EntryId, flushed_sequence: SequenceNumber, manifest_version: ManifestVersion, + truncated_entry_id: Option, } impl RegionManifestBuilder { @@ -100,6 +103,7 @@ impl RegionManifestBuilder { flushed_entry_id: s.flushed_entry_id, manifest_version: s.manifest_version, flushed_sequence: s.flushed_sequence, + truncated_entry_id: s.truncated_entry_id, } } else { Default::default() @@ -131,6 +135,7 @@ impl RegionManifestBuilder { self.manifest_version = manifest_version; self.flushed_entry_id = truncate.truncated_entry_id; self.flushed_sequence = truncate.truncated_sequence; + self.truncated_entry_id = Some(truncate.truncated_entry_id); self.files.clear(); } @@ -147,6 +152,7 @@ impl RegionManifestBuilder { flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, manifest_version: self.manifest_version, + truncated_entry_id: self.truncated_entry_id, }) } } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 9dd635b8a73a..e1b1c54cd82c 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -150,7 +150,7 @@ async fn manager_with_checkpoint_distance_1() { .await .unwrap(); let raw_json = std::str::from_utf8(&raw_bytes).unwrap(); - let expected_json = "{\"size\":790,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; + let expected_json = "{\"size\":816,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; assert_eq!(expected_json, raw_json); // reopen the manager diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 91de9cf4761c..157eff1a4a5d 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -156,6 +156,7 @@ impl RegionOpener { .add_files(file_purger.clone(), manifest.files.values().cloned()) .flushed_entry_id(manifest.flushed_entry_id) .flushed_sequence(manifest.flushed_sequence) + .truncated_entry_id(manifest.truncated_entry_id) .build(); let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 4ee6dda2cdfe..2afa76fd754b 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -56,7 +56,10 @@ impl RegionWorkerLoop { // Make all data obsolete. self.wal.obsolete(region_id, truncated_entry_id).await?; - info!("Done truncate"); + info!( + "Complete truncate region: {}, entry id: {} and sequence: {} are truncated", + region_id, truncated_entry_id, truncated_sequence + ); Ok(Output::AffectedRows(0)) } From a60fe3191701626e83cd44610b27c569fae07657 Mon Sep 17 00:00:00 2001 From: Vanish Date: Fri, 15 Sep 2023 16:52:10 +0800 Subject: [PATCH 16/20] fix: remove set None --- src/mito2/src/region/version.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 0702a258798e..d59dd66de1b6 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -105,7 +105,6 @@ impl VersionControl { VersionBuilder::from_version(version) .apply_edit(edit, purger) .remove_memtables(memtables_to_remove) - .truncated_entry_id(None) .build(), ); From 640958ccad294000869872c7ad5f21def18630d4 Mon Sep 17 00:00:00 2001 From: Vanish Date: Fri, 15 Sep 2023 19:59:26 +0800 Subject: [PATCH 17/20] Update src/mito2/src/region/version.rs Co-authored-by: dennis zhuang --- src/mito2/src/region/version.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index d59dd66de1b6..87cc71bb29be 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -202,7 +202,7 @@ pub(crate) struct Version { pub(crate) flushed_sequence: SequenceNumber, /// Latest entry id during the truncating table. /// - /// Used to check if it is a flush task during the truncation table. + /// Used to check if it is a flush task during the truncating table. pub(crate) truncated_entry_id: Option, // TODO(yingwen): RegionOptions. } From 4a132d9e726c733878a967f65f57826cab981947 Mon Sep 17 00:00:00 2001 From: Vanish Date: Fri, 15 Sep 2023 19:59:37 +0800 Subject: [PATCH 18/20] Update src/mito2/src/worker/handle_flush.rs Co-authored-by: dennis zhuang --- src/mito2/src/worker/handle_flush.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index fa29b4f65472..cb2f2746dc99 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -137,7 +137,7 @@ impl RegionWorkerLoop { return; }; - // The flush task before truncating the region. + // The flush task before truncating the region fails immediately. let version_data = region.version_control.current(); if let Some(truncated_entry_id) = version_data.version.truncated_entry_id { if truncated_entry_id >= request.flushed_entry_id { From 74336fb87bbb5f1636f3699deb79a624c3b9f442 Mon Sep 17 00:00:00 2001 From: Vanish Date: Fri, 15 Sep 2023 19:59:48 +0800 Subject: [PATCH 19/20] Update src/mito2/src/worker/handle_truncate.rs Co-authored-by: dennis zhuang --- src/mito2/src/worker/handle_truncate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 2afa76fd754b..759f17b90cff 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -57,7 +57,7 @@ impl RegionWorkerLoop { // Make all data obsolete. self.wal.obsolete(region_id, truncated_entry_id).await?; info!( - "Complete truncate region: {}, entry id: {} and sequence: {} are truncated", + "Complete truncating region: {}, entry id: {} and sequence: {}.", region_id, truncated_entry_id, truncated_sequence ); From 98c25f7cb830b4689c76c7b134beb5afccf2deb9 Mon Sep 17 00:00:00 2001 From: Vanish Date: Fri, 15 Sep 2023 21:06:14 +0800 Subject: [PATCH 20/20] doc: add some doc for FlushTruncateListener and RegionTruncate --- src/mito2/src/engine/listener.rs | 14 ++++++++++++++ src/mito2/src/manifest/action.rs | 3 +++ 2 files changed, 17 insertions(+) diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index 2e7632b04d2c..e591d2b5a576 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -30,6 +30,7 @@ pub trait EventListener: Send + Sync { /// Notifies the listener that the engine is stalled. fn on_write_stall(&self); + /// Notifies the listener that the region starts to do flush. async fn on_flush_begin(&self, region_id: RegionId); } @@ -99,12 +100,21 @@ impl EventListener for StallListener { async fn on_flush_begin(&self, _region_id: RegionId) {} } +/// Listener to watch begin flush events. +/// +/// Crate a background thread to execute flush region, and the main thread calls `wait_truncate()` +/// to block and wait for `on_flush_region()`. +/// When the background thread calls `on_flush_begin()`, the main thread is notified to truncate +/// region, and background thread thread blocks and waits for `notify_flush()` to continue flushing. pub struct FlushTruncateListener { + /// Notify flush operation. notify_flush: Notify, + /// Notify truncate operation. notify_truncate: Notify, } impl FlushTruncateListener { + /// Creates a new listener. pub fn new() -> FlushTruncateListener { FlushTruncateListener { notify_flush: Notify::new(), @@ -112,10 +122,12 @@ impl FlushTruncateListener { } } + /// Notify flush region to proceed. pub fn notify_flush(&self) { self.notify_flush.notify_one(); } + /// Wait for a truncate event. pub async fn wait_truncate(&self) { self.notify_truncate.notified().await; } @@ -127,6 +139,8 @@ impl EventListener for FlushTruncateListener { fn on_write_stall(&self) {} + /// Calling this function will block the thread! + /// Notify the listener to perform a truncate region and block the flush region job. async fn on_flush_begin(&self, region_id: RegionId) { info!( "Region {} begin do flush, notify region to truncate", diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 68899c788771..2ed8670b516d 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -59,10 +59,13 @@ pub struct RegionRemove { pub region_id: RegionId, } +/// Last data truncated in the region. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionTruncate { pub region_id: RegionId, + /// Last WAL entry id of truncated data. pub truncated_entry_id: EntryId, + // Last sequence number of truncated data. pub truncated_sequence: SequenceNumber, }