diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index c25775c944fe..94ad5870866f 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -417,7 +417,8 @@ impl RegionServerInner { | RegionRequest::Alter(_) | RegionRequest::Flush(_) | RegionRequest::Compact(_) - | RegionRequest::Truncate(_) => RegionChange::None, + | RegionRequest::Truncate(_) + | RegionRequest::Catchup(_) => RegionChange::None, }; let engine = match self.get_engine(region_id, ®ion_change)? { diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 72f169648f07..6da215361f83 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -131,6 +131,8 @@ impl RegionEngine for MetricEngine { RegionRequest::Flush(_) => todo!(), RegionRequest::Compact(_) => todo!(), RegionRequest::Truncate(_) => todo!(), + /// It always Ok(0), all data is latest. + RegionRequest::Catchup(_) => Ok(0), }; result.map_err(BoxedError::new) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index dfca088f2d96..2f758ee2233a 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -19,6 +19,8 @@ mod alter_test; #[cfg(test)] mod basic_test; #[cfg(test)] +mod catchup_test; +#[cfg(test)] mod close_test; #[cfg(test)] mod compaction_test; diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs new file mode 100644 index 000000000000..cc520d2357c8 --- /dev/null +++ b/src/mito2/src/engine/catchup_test.rs @@ -0,0 +1,419 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; +use std::collections::HashMap; + +use api::v1::Rows; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_recordbatch::RecordBatches; +use store_api::region_engine::{RegionEngine, SetReadonlyResponse}; +use store_api::region_request::{RegionCatchupRequest, RegionOpenRequest, RegionRequest}; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::error::{self, Error}; +use crate::test_util::{ + build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, +}; +use crate::wal::EntryId; + +fn get_last_entry_id(resp: SetReadonlyResponse) -> Option { + if let SetReadonlyResponse::Success { last_entry_id } = resp { + last_entry_id + } else { + unreachable!(); + } +} + +#[tokio::test] +async fn test_catchup_with_last_entry_id() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::with_prefix("last_entry_id"); + let leader_engine = env.create_engine(MitoConfig::default()).await; + let follower_engine = env.create_follower_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + leader_engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + follower_engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + skip_wal_replay: false, + }), + ) + .await + .unwrap(); + + // Ensures the mutable is empty. + let region = follower_engine.get_region(region_id).unwrap(); + assert!(region.version().memtables.mutable.is_empty()); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + + put_rows(&leader_engine, region_id, rows).await; + + let resp = leader_engine + .set_readonly_gracefully(region_id) + .await + .unwrap(); + + let last_entry_id = get_last_entry_id(resp); + assert!(last_entry_id.is_some()); + + // Replays the memtable. + let resp = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: false, + entry_id: last_entry_id, + }), + ) + .await; + let region = follower_engine.get_region(region_id).unwrap(); + assert!(!region.is_writable()); + assert!(resp.is_ok()); + + // Scans + let request = ScanRequest::default(); + let stream = follower_engine + .handle_query(region_id, request) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); + + // Replays the memtable again, should be ok. + let resp = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: true, + entry_id: last_entry_id, + }), + ) + .await; + let region = follower_engine.get_region(region_id).unwrap(); + assert!(region.is_writable()); + assert!(resp.is_ok()); +} + +#[tokio::test] +async fn test_catchup_with_incorrect_last_entry_id() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::with_prefix("incorrect_last_entry_id"); + let leader_engine = env.create_engine(MitoConfig::default()).await; + let follower_engine = env.create_follower_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + leader_engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + follower_engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + skip_wal_replay: false, + }), + ) + .await + .unwrap(); + + // Ensures the mutable is empty. + let region = follower_engine.get_region(region_id).unwrap(); + assert!(region.version().memtables.mutable.is_empty()); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + + put_rows(&leader_engine, region_id, rows).await; + + let resp = leader_engine + .set_readonly_gracefully(region_id) + .await + .unwrap(); + + let last_entry_id = get_last_entry_id(resp); + assert!(last_entry_id.is_some()); + + let incorrect_last_entry_id = last_entry_id.map(|e| e + 1); + + // Replays the memtable. + let err = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: false, + entry_id: incorrect_last_entry_id, + }), + ) + .await + .unwrap_err(); + let err = err.as_any().downcast_ref::().unwrap(); + + assert_matches!(err, error::Error::UnexpectedReplay { .. }); + + // It should ignore requests to writable regions. + region.set_writable(true); + let resp = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: false, + entry_id: incorrect_last_entry_id, + }), + ) + .await; + assert!(resp.is_ok()); +} + +#[tokio::test] +async fn test_catchup_without_last_entry_id() { + let mut env = TestEnv::with_prefix("without_last_entry_id"); + let leader_engine = env.create_engine(MitoConfig::default()).await; + let follower_engine = env.create_follower_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + leader_engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + follower_engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + skip_wal_replay: false, + }), + ) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 3), + }; + put_rows(&leader_engine, region_id, rows).await; + + // Replays the memtable. + let resp = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: false, + entry_id: None, + }), + ) + .await; + assert!(resp.is_ok()); + let region = follower_engine.get_region(region_id).unwrap(); + assert!(!region.is_writable()); + + let request = ScanRequest::default(); + let stream = follower_engine + .handle_query(region_id, request) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); + + // Replays the memtable again, should be ok. + let resp = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: true, + entry_id: None, + }), + ) + .await; + assert!(resp.is_ok()); + let region = follower_engine.get_region(region_id).unwrap(); + assert!(region.is_writable()); +} + +#[tokio::test] +async fn test_catchup_with_manifest_update() { + let mut env = TestEnv::with_prefix("without_manifest_update"); + let leader_engine = env.create_engine(MitoConfig::default()).await; + let follower_engine = env.create_follower_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + leader_engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + follower_engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + skip_wal_replay: false, + }), + ) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&leader_engine, region_id, rows).await; + + // Triggers to create a new manifest file. + flush_region(&leader_engine, region_id, None).await; + + // Puts to WAL. + let rows = Rows { + schema: column_schemas, + rows: build_rows(3, 5), + }; + put_rows(&leader_engine, region_id, rows).await; + + // Triggers to create a new manifest file. + flush_region(&leader_engine, region_id, None).await; + + let region = follower_engine.get_region(region_id).unwrap(); + // Ensures the mutable is empty. + assert!(region.version().memtables.mutable.is_empty()); + + let manifest = region.manifest_manager.manifest().await; + assert_eq!(manifest.manifest_version, 0); + + let resp = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: false, + entry_id: None, + }), + ) + .await; + assert!(resp.is_ok()); + + // The inner region was replaced. We must get it again. + let region = follower_engine.get_region(region_id).unwrap(); + let manifest = region.manifest_manager.manifest().await; + assert_eq!(manifest.manifest_version, 2); + assert!(!region.is_writable()); + + let request = ScanRequest::default(); + let stream = follower_engine + .handle_query(region_id, request) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); + + // Replays the memtable again, should be ok. + let resp = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: true, + entry_id: None, + }), + ) + .await; + let region = follower_engine.get_region(region_id).unwrap(); + assert!(resp.is_ok()); + assert!(region.is_writable()); +} + +#[tokio::test] +async fn test_catchup_not_exist() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let non_exist_region_id = RegionId::new(1, 1); + + let err = engine + .handle_request( + non_exist_region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: true, + entry_id: None, + }), + ) + .await + .unwrap_err(); + assert_matches!(err.status_code(), StatusCode::RegionNotFound); +} diff --git a/src/mito2/src/engine/set_readonly_test.rs b/src/mito2/src/engine/set_readonly_test.rs index e658352ceb17..9b92d7c23a17 100644 --- a/src/mito2/src/engine/set_readonly_test.rs +++ b/src/mito2/src/engine/set_readonly_test.rs @@ -20,7 +20,6 @@ use store_api::region_request::{RegionPutRequest, RegionRequest}; use store_api::storage::RegionId; use crate::config::MitoConfig; -use crate::request::WorkerRequest; use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; #[tokio::test] @@ -90,18 +89,6 @@ async fn test_set_readonly_gracefully_not_exist() { let non_exist_region_id = RegionId::new(1, 1); - // For inner `handle_inter_command`. - let (set_readonly_request, recv) = - WorkerRequest::new_set_readonly_gracefully(non_exist_region_id); - engine - .inner - .workers - .submit_to_worker(non_exist_region_id, set_readonly_request) - .await - .unwrap(); - let result = recv.await.unwrap(); - assert_eq!(SetReadonlyResponse::NotFound, result); - // For fast-path. let result = engine .set_readonly_gracefully(non_exist_region_id) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index d29450e50a22..b1d48f8c654e 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -35,6 +35,17 @@ use crate::worker::WorkerId; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display( + "Failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}", + region_id, expected_last_entry_id, replayed_last_entry_id + ))] + UnexpectedReplay { + location: Location, + region_id: RegionId, + expected_last_entry_id: u64, + replayed_last_entry_id: u64, + }, + #[snafu(display("OpenDAL operator failed"))] OpenDal { location: Location, @@ -392,6 +403,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Empty manifest directory, manifest_dir: {}", manifest_dir,))] + EmptyManifestDir { + manifest_dir: String, + location: Location, + }, + #[snafu(display("Failed to read arrow record batch from parquet file {}", path))] ArrowReader { path: String, @@ -435,7 +452,8 @@ impl ErrorExt for Error { | NewRecordBatch { .. } | RegionCorrupted { .. } | CreateDefault { .. } - | InvalidParquet { .. } => StatusCode::Unexpected, + | InvalidParquet { .. } + | UnexpectedReplay { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } | InvalidScanIndex { .. } @@ -476,7 +494,7 @@ impl ErrorExt for Error { InvalidRegionRequest { source, .. } => source.status_code(), RegionReadonly { .. } => StatusCode::RegionReadonly, JsonOptions { .. } => StatusCode::InvalidArguments, - EmptyRegionDir { .. } => StatusCode::RegionNotFound, + EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 1a32351547a4..10b275c48183 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -17,6 +17,7 @@ //! Mito is the a region engine to store timeseries data. #![feature(let_chains)] +#![feature(assert_matches)] #[cfg(any(test, feature = "test"))] #[cfg_attr(feature = "test", allow(unused))] diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 613253dffa4e..9cdafed29beb 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -16,17 +16,19 @@ use std::sync::Arc; use common_datasource::compression::CompressionType; use common_telemetry::{debug, info}; +use futures::TryStreamExt; use object_store::ObjectStore; +use snafu::{OptionExt, ResultExt}; use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION}; use store_api::metadata::RegionMetadataRef; use tokio::sync::RwLock; -use crate::error::Result; +use crate::error::{self, Result}; use crate::manifest::action::{ RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction, RegionMetaActionList, }; -use crate::manifest::storage::ManifestObjectStore; +use crate::manifest::storage::{file_version, is_delta_file, ManifestObjectStore}; /// Options for [RegionManifestManager]. #[derive(Debug, Clone)] @@ -160,6 +162,12 @@ impl RegionManifestManager { let inner = self.inner.read().await; inner.total_manifest_size() } + + /// Returns true if a newer version manifest file is found. + pub async fn has_update(&self) -> Result { + let inner = self.inner.read().await; + inner.has_update().await + } } #[cfg(test)] @@ -233,7 +241,7 @@ impl RegionManifestManagerInner { }) } - /// Open an existing manifest. + /// Opens an existing manifest. /// /// Returns `Ok(None)` if no such manifest. async fn open(options: RegionManifestOptions) -> Result> { @@ -323,7 +331,7 @@ impl RegionManifestManagerInner { Ok(()) } - /// Update the manifest. Return the current manifest version number. + /// Updates the manifest. Return the current manifest version number. async fn update(&mut self, action_list: RegionMetaActionList) -> Result { let version = self.increase_version(); self.store.save(version, &action_list.encode()?).await?; @@ -385,7 +393,7 @@ impl RegionManifestManagerInner { Ok(()) } - /// Make a new checkpoint. Return the fresh one if there are some actions to compact. + /// Makes a new checkpoint. Return the fresh one if there are some actions to compact. async fn do_checkpoint(&mut self) -> Result> { let last_checkpoint = Self::last_checkpoint(&mut self.store).await?; let current_version = self.last_version; @@ -459,7 +467,7 @@ impl RegionManifestManagerInner { Ok(Some(checkpoint)) } - /// Fetch the last [RegionCheckpoint] from storage. + /// Fetches the last [RegionCheckpoint] from storage. pub(crate) async fn last_checkpoint( store: &mut ManifestObjectStore, ) -> Result> { @@ -472,6 +480,37 @@ impl RegionManifestManagerInner { Ok(None) } } + + /// Returns true if a newer version manifest file is found. + /// + /// It is typically used in read-only regions to catch up with manifest. + pub(crate) async fn has_update(&self) -> Result { + let last_version = self.last_version; + + let streamer = + self.store + .manifest_lister() + .await? + .context(error::EmptyManifestDirSnafu { + manifest_dir: self.store.manifest_dir(), + })?; + + let need_update = streamer + .try_any(|entry| async move { + let file_name = entry.name(); + if is_delta_file(file_name) { + let version = file_version(file_name); + if version > last_version { + return true; + } + } + false + }) + .await + .context(error::OpenDalSnafu)?; + + Ok(need_update) + } } #[cfg(test)] diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index b470a1255aa1..32802c128b6f 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -21,7 +21,7 @@ use common_telemetry::debug; use futures::future::try_join_all; use futures::TryStreamExt; use lazy_static::lazy_static; -use object_store::{util, Entry, ErrorKind, ObjectStore}; +use object_store::{util, Entry, ErrorKind, Lister, ObjectStore}; use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -148,6 +148,23 @@ impl ManifestObjectStore { format!("{}{}", self.path, LAST_CHECKPOINT_FILE) } + /// Returns the manifest dir + pub(crate) fn manifest_dir(&self) -> &str { + &self.path + } + + /// Returns a iterator of manifests. + pub(crate) async fn manifest_lister(&self) -> Result> { + match self.object_store.lister_with(&self.path).await { + Ok(streamer) => Ok(Some(streamer)), + Err(e) if e.kind() == ErrorKind::NotFound => { + debug!("Manifest directory does not exists: {}", self.path); + Ok(None) + } + Err(e) => Err(e).context(OpenDalSnafu)?, + } + } + /// Return all `R`s in the root directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`), /// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`) /// Return an empty vector when directory is not found. @@ -155,13 +172,8 @@ impl ManifestObjectStore { where F: Fn(Entry) -> Option, { - let streamer = match self.object_store.lister_with(&self.path).await { - Ok(streamer) => streamer, - Err(e) if e.kind() == ErrorKind::NotFound => { - debug!("Manifest directory does not exists: {}", self.path); - return Ok(vec![]); - } - Err(e) => Err(e).context(OpenDalSnafu)?, + let Some(streamer) = self.manifest_lister().await? else { + return Ok(vec![]); }; streamer @@ -171,7 +183,12 @@ impl ManifestObjectStore { .context(OpenDalSnafu) } - /// Scan the manifest files in the range of [start, end) and return all manifest entries. + /// Sorts the manifest files. + fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) { + entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2)); + } + + /// Scans the manifest files in the range of [start, end) and return all manifest entries. pub async fn scan( &self, start: ManifestVersion, @@ -192,7 +209,7 @@ impl ManifestObjectStore { }) .await?; - entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2)); + Self::sort_manifests(&mut entries); Ok(entries) } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 99e8348dc7ab..4819a9bc11ed 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -126,6 +126,11 @@ impl MitoRegion { self.writable.load(Ordering::Relaxed) } + /// Returns the region dir. + pub(crate) fn region_dir(&self) -> &str { + self.access_layer.region_dir() + } + /// Sets the writable flag. pub(crate) fn set_writable(&self, writable: bool) { self.writable.store(writable, Ordering::Relaxed); diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index cf59c2b5b674..ffb3696a97e5 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -53,7 +53,7 @@ pub(crate) struct RegionOpener { object_store_manager: ObjectStoreManagerRef, region_dir: String, scheduler: SchedulerRef, - options: HashMap, + options: Option, cache_manager: Option, skip_wal_replay: bool, } @@ -74,7 +74,7 @@ impl RegionOpener { object_store_manager, region_dir: normalize_dir(region_dir), scheduler, - options: HashMap::new(), + options: None, cache_manager: None, skip_wal_replay: false, } @@ -86,9 +86,15 @@ impl RegionOpener { self } + /// Parses and sets options for the region. + pub(crate) fn parse_options(mut self, options: HashMap) -> Result { + self.options = Some(RegionOptions::try_from(&options)?); + Ok(self) + } + /// Sets options for the region. - pub(crate) fn options(mut self, value: HashMap) -> Self { - self.options = value; + pub(crate) fn options(mut self, options: RegionOptions) -> Self { + self.options = Some(options); self } @@ -108,9 +114,10 @@ impl RegionOpener { /// Opens the region if it already exists. /// /// # Panics - /// Panics if metadata is not set. + /// - Panics if metadata is not set. + /// - Panics if options is not set. pub(crate) async fn create_or_open( - self, + mut self, config: &MitoConfig, wal: &Wal, ) -> Result { @@ -145,9 +152,8 @@ impl RegionOpener { ); } } - let options = RegionOptions::try_from(&self.options)?; + let options = self.options.take().unwrap(); let wal_options = options.wal_options.clone(); - let object_store = self.object_store(&options.storage)?.clone(); // Create a manifest manager for this region and writes regions to the manifest file. @@ -218,7 +224,7 @@ impl RegionOpener { config: &MitoConfig, wal: &Wal, ) -> Result> { - let region_options = RegionOptions::try_from(&self.options)?; + let region_options = self.options.as_ref().unwrap().clone(); let wal_options = region_options.wal_options.clone(); let region_manifest_options = self.manifest_options(config, ®ion_options)?; @@ -358,13 +364,13 @@ pub(crate) fn check_recovered_region( } /// Replays the mutations from WAL and inserts mutations to memtable of given region. -async fn replay_memtable( +pub(crate) async fn replay_memtable( wal: &Wal, wal_options: &WalOptions, region_id: RegionId, flushed_entry_id: EntryId, version_control: &VersionControlRef, -) -> Result<()> { +) -> Result { let mut rows_replayed = 0; // Last entry id should start from flushed entry id since there might be no // data in the WAL. @@ -392,7 +398,7 @@ async fn replay_memtable( "Replay WAL for region: {}, rows recovered: {}, last entry id: {}", region_id, rows_replayed, last_entry_id ); - Ok(()) + Ok(last_entry_id) } /// Returns the directory to the manifest files. diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 311638792d99..8f04492ba234 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -32,9 +32,9 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_engine::SetReadonlyResponse; use store_api::region_request::{ - AffectedRows, RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, - RegionCreateRequest, RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, - RegionTruncateRequest, + AffectedRows, RegionAlterRequest, RegionCatchupRequest, RegionCloseRequest, + RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest, + RegionOpenRequest, RegionRequest, RegionTruncateRequest, }; use store_api::storage::{RegionId, SequenceNumber}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -550,6 +550,11 @@ impl WorkerRequest { sender: sender.into(), request: DdlRequest::Truncate(v), }), + RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest { + region_id, + sender: sender.into(), + request: DdlRequest::Catchup(v), + }), }; Ok((worker_request, receiver)) @@ -578,6 +583,7 @@ pub(crate) enum DdlRequest { Flush(RegionFlushRequest), Compact(RegionCompactRequest), Truncate(RegionTruncateRequest), + Catchup(RegionCatchupRequest), } /// Sender and Ddl request. diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 63416a640f3b..b2006098b6a3 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -139,6 +139,14 @@ impl TestEnv { MitoEngine::new(config, logstore, object_store_manager) } + /// Creates a new engine with specific config and existing logstore and object store manager. + pub async fn create_follower_engine(&mut self, config: MitoConfig) -> MitoEngine { + let logstore = self.logstore.as_ref().unwrap().clone(); + let object_store_manager = self.object_store_manager.as_ref().unwrap().clone(); + + MitoEngine::new(config, logstore, object_store_manager) + } + /// Creates a new engine with specific config and manager/listener under this env. pub async fn create_engine_with( &mut self, @@ -221,6 +229,7 @@ impl TestEnv { ) } + /// Returns the log store and object store manager. async fn create_log_and_object_store_manager( &self, ) -> (RaftEngineLogStore, ObjectStoreManager) { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 750e29b56751..c969fdd008c1 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -15,6 +15,7 @@ //! Structs and utilities for writing regions. mod handle_alter; +mod handle_catchup; mod handle_close; mod handle_compaction; mod handle_create; @@ -546,6 +547,7 @@ impl RegionWorkerLoop { continue; } DdlRequest::Truncate(_) => self.handle_truncate_request(ddl.region_id).await, + DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await, }; ddl.sender.send(res); diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs new file mode 100644 index 000000000000..e8f3655cf71a --- /dev/null +++ b/src/mito2/src/worker/handle_catchup.rs @@ -0,0 +1,95 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Handling catchup request. + +use std::sync::Arc; + +use snafu::ensure; +use store_api::logstore::LogStore; +use store_api::region_request::{AffectedRows, RegionCatchupRequest}; +use store_api::storage::RegionId; + +use crate::error::{self, Result}; +use crate::region::opener::{replay_memtable, RegionOpener}; +use crate::worker::RegionWorkerLoop; + +impl RegionWorkerLoop { + pub(crate) async fn handle_catchup_request( + &mut self, + region_id: RegionId, + request: RegionCatchupRequest, + ) -> Result { + let Some(region) = self.regions.get_region(region_id) else { + return error::RegionNotFoundSnafu { region_id }.fail(); + }; + + if region.is_writable() { + return Ok(0); + } + // Note: Currently, We protect the split brain by ensuring the mutable table is empty. + // It's expensive to execute catch-up requests without `set_writable=true` multiple times. + let is_mutable_empty = region.version().memtables.mutable.is_empty(); + + // Utilizes the short circuit evaluation. + let region = if !is_mutable_empty || region.manifest_manager.has_update().await? { + let reopened_region = Arc::new( + RegionOpener::new( + region_id, + region.region_dir(), + self.memtable_builder.clone(), + self.object_store_manager.clone(), + self.scheduler.clone(), + ) + .cache(Some(self.cache_manager.clone())) + .options(region.version().options.clone()) + .skip_wal_replay(true) + .open(&self.config, &self.wal) + .await?, + ); + debug_assert!(!reopened_region.is_writable()); + self.regions.insert_region(reopened_region.clone()); + + reopened_region + } else { + region + }; + + let flushed_entry_id = region.version_control.current().last_entry_id; + let last_entry_id = replay_memtable( + &self.wal, + ®ion.wal_options, + region_id, + flushed_entry_id, + ®ion.version_control, + ) + .await?; + if let Some(expected_last_entry_id) = request.entry_id { + ensure!( + expected_last_entry_id == last_entry_id, + error::UnexpectedReplaySnafu { + region_id, + expected_last_entry_id, + replayed_last_entry_id: last_entry_id, + } + ) + } + + if request.set_writable { + region.set_writable(true); + } + + Ok(0) + } +} diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 6d86ddb1887f..0af60793a0e9 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -63,7 +63,7 @@ impl RegionWorkerLoop { self.scheduler.clone(), ) .metadata(metadata) - .options(request.options) + .parse_options(request.options)? .cache(Some(self.cache_manager.clone())) .create_or_open(&self.config, &self.wal) .await?; diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 49100bee5199..a2a7f7d6609a 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -69,8 +69,8 @@ impl RegionWorkerLoop { self.object_store_manager.clone(), self.scheduler.clone(), ) - .options(request.options) .skip_wal_replay(request.skip_wal_replay) + .parse_options(request.options)? .cache(Some(self.cache_manager.clone())) .open(&self.config, &self.wal) .await?; diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 5e738a00e5ad..88663ee0a310 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -15,7 +15,7 @@ pub use opendal::raw::oio::Pager; pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient}; pub use opendal::{ - services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey, + services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Lister, Metakey, Operator as ObjectStore, Reader, Result, Writer, }; diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index cffd6cf61d18..7c0456f44726 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -21,6 +21,7 @@ use api::v1::{self, Rows, SemanticType}; use snafu::{ensure, OptionExt}; use strum::IntoStaticStr; +use crate::logstore::entry; use crate::metadata::{ ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, MetadataError, RegionMetadata, Result, @@ -42,6 +43,7 @@ pub enum RegionRequest { Flush(RegionFlushRequest), Compact(RegionCompactRequest), Truncate(RegionTruncateRequest), + Catchup(RegionCatchupRequest), } impl RegionRequest { @@ -59,6 +61,7 @@ impl RegionRequest { RegionRequest::Flush(_) => "flush", RegionRequest::Compact(_) => "compact", RegionRequest::Truncate(_) => "truncate", + RegionRequest::Catchup(_) => "catchup", } } @@ -453,6 +456,19 @@ pub struct RegionCompactRequest {} #[derive(Debug)] pub struct RegionTruncateRequest {} +/// Catchup region request. +/// +/// Makes a readonly region to catch up to leader region changes. +/// There is no effect if it operating on a leader region. +#[derive(Debug)] +pub struct RegionCatchupRequest { + /// Sets it to writable if it's available after it has caught up with all changes. + pub set_writable: bool, + /// The `entry_id` that was expected to reply to. + /// `None` stands replaying to latest. + pub entry_id: Option, +} + impl fmt::Display for RegionRequest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -466,6 +482,7 @@ impl fmt::Display for RegionRequest { RegionRequest::Flush(_) => write!(f, "Flush"), RegionRequest::Compact(_) => write!(f, "Compact"), RegionRequest::Truncate(_) => write!(f, "Truncate"), + RegionRequest::Catchup(_) => write!(f, "Catchup"), } } }