diff --git a/Cargo.toml b/Cargo.toml index f5f4873b6792..87985d74935b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -191,7 +191,6 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git" rev = "abbd357c1e193cd270ea65ee7652334a150b628f" [profile.dev] -debug = 1 [profile.release] debug = 1 diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 6b820b85356d..3b8e825939c3 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -303,7 +303,8 @@ impl RegionServerInner { | RegionRequest::Alter(_) | RegionRequest::Flush(_) | RegionRequest::Compact(_) - | RegionRequest::Truncate(_) => RegionChange::None, + | RegionRequest::Truncate(_) + | RegionRequest::Catchup(_) => RegionChange::None, }; let engine = match ®ion_change { diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 35dd0ee65571..9e7acfc0fd64 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -128,6 +128,8 @@ impl RegionEngine for MetricEngine { RegionRequest::Flush(_) => todo!(), RegionRequest::Compact(_) => todo!(), RegionRequest::Truncate(_) => todo!(), + /// It always Ok(0), all data is latest. + RegionRequest::Catchup(_) => Ok(0), }; result.map_err(BoxedError::new) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index dfca088f2d96..2f758ee2233a 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -19,6 +19,8 @@ mod alter_test; #[cfg(test)] mod basic_test; #[cfg(test)] +mod catchup_test; +#[cfg(test)] mod close_test; #[cfg(test)] mod compaction_test; diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs new file mode 100644 index 000000000000..5040d3f200c4 --- /dev/null +++ b/src/mito2/src/engine/catchup_test.rs @@ -0,0 +1,333 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; +use std::collections::HashMap; + +use api::v1::Rows; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_recordbatch::RecordBatches; +use store_api::region_engine::{RegionEngine, SetReadonlyResponse}; +use store_api::region_request::{RegionCatchupRequest, RegionOpenRequest, RegionRequest}; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::test_util::{ + build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, +}; + +#[tokio::test] +async fn test_catchup_with_last_entry_id() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::with_prefix("last_entry_id"); + let leader_engine = env.create_engine(MitoConfig::default()).await; + let follower_engine = env.create_follower_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + leader_engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + follower_engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + skip_wal_replay: false, + }), + ) + .await + .unwrap(); + + // Ensures the mutable is empty. + let region = follower_engine.get_region(region_id).unwrap(); + assert!(region.version().memtables.mutable.is_empty()); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + + put_rows(&leader_engine, region_id, rows).await; + + let resp = leader_engine + .set_readonly_gracefully(region_id) + .await + .unwrap(); + + let last_entry_id = if let SetReadonlyResponse::Success { last_entry_id } = resp { + last_entry_id + } else { + unreachable!(); + }; + assert!(last_entry_id.is_some()); + + // Replays the memtable. + let resp = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: true, + entry_id: last_entry_id, + }), + ) + .await; + let region = follower_engine.get_region(region_id).unwrap(); + assert!(!region.is_writable()); + assert!(resp.is_ok()); + + // Scans + let request = ScanRequest::default(); + let stream = follower_engine + .handle_query(region_id, request) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); + + // Replays the memtable again, should be ok. + let resp = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: true, + entry_id: last_entry_id, + }), + ) + .await; + let region = follower_engine.get_region(region_id).unwrap(); + assert!(region.is_writable()); + assert!(resp.is_ok()); +} + +#[tokio::test] +async fn test_catchup_without_last_entry_id() { + let mut env = TestEnv::new(); + let leader_engine = env.create_engine(MitoConfig::default()).await; + let follower_engine = env.create_follower_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + leader_engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + follower_engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + skip_wal_replay: false, + }), + ) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 3), + }; + put_rows(&leader_engine, region_id, rows).await; + + // Replays the memtable. + let resp = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: false, + entry_id: None, + }), + ) + .await; + assert!(resp.is_ok()); + let region = follower_engine.get_region(region_id).unwrap(); + assert!(!region.is_writable()); + + let request = ScanRequest::default(); + let stream = follower_engine + .handle_query(region_id, request) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); + + // Replays the memtable again, should be ok. + let resp = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: true, + entry_id: None, + }), + ) + .await; + assert!(resp.is_ok()); + let region = follower_engine.get_region(region_id).unwrap(); + assert!(region.is_writable()); +} + +#[tokio::test] +async fn test_catchup_with_manifest_update() { + let mut env = TestEnv::new(); + let leader_engine = env.create_engine(MitoConfig::default()).await; + let follower_engine = env.create_follower_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + leader_engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + follower_engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + skip_wal_replay: false, + }), + ) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&leader_engine, region_id, rows).await; + + // Triggers to create a new manifest file. + flush_region(&leader_engine, region_id, None).await; + + // Puts to WAL. + let rows = Rows { + schema: column_schemas, + rows: build_rows(3, 5), + }; + put_rows(&leader_engine, region_id, rows).await; + + // Triggers to create a new manifest file. + flush_region(&leader_engine, region_id, None).await; + + let region = follower_engine.get_region(region_id).unwrap(); + // Ensures the mutable is empty. + assert!(region.version().memtables.mutable.is_empty()); + + let manifest = region.manifest_manager.manifest().await; + assert_eq!(manifest.manifest_version, 0); + + let resp = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: false, + entry_id: None, + }), + ) + .await; + assert!(resp.is_ok()); + + // The inner region was replaced. We must get it again. + let region = follower_engine.get_region(region_id).unwrap(); + let manifest = region.manifest_manager.manifest().await; + assert_eq!(manifest.manifest_version, 2); + assert!(!region.is_writable()); + + let request = ScanRequest::default(); + let stream = follower_engine + .handle_query(region_id, request) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); + + // Replays the memtable again, should be ok. + let resp = follower_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: true, + entry_id: None, + }), + ) + .await; + let region = follower_engine.get_region(region_id).unwrap(); + assert!(resp.is_ok()); + assert!(region.is_writable()); +} + +#[tokio::test] +async fn test_catchup_not_exist() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let non_exist_region_id = RegionId::new(1, 1); + + let err = engine + .handle_request( + non_exist_region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: true, + entry_id: None, + }), + ) + .await + .unwrap_err(); + assert_matches!(err.status_code(), StatusCode::RegionNotFound); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index d29450e50a22..6c84e5d82e0a 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -35,6 +35,14 @@ use crate::worker::WorkerId; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("Failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}", region_id,expected_last_entry_id,replayed_last_entry_id))] + UnexpectedReplay { + location: Location, + region_id: RegionId, + expected_last_entry_id: u64, + replayed_last_entry_id: u64, + }, + #[snafu(display("OpenDAL operator failed"))] OpenDal { location: Location, @@ -435,7 +443,8 @@ impl ErrorExt for Error { | NewRecordBatch { .. } | RegionCorrupted { .. } | CreateDefault { .. } - | InvalidParquet { .. } => StatusCode::Unexpected, + | InvalidParquet { .. } + | UnexpectedReplay { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } | InvalidScanIndex { .. } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 1a32351547a4..10b275c48183 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -17,6 +17,7 @@ //! Mito is the a region engine to store timeseries data. #![feature(let_chains)] +#![feature(assert_matches)] #[cfg(any(test, feature = "test"))] #[cfg_attr(feature = "test", allow(unused))] diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 613253dffa4e..f5839c2c4985 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -26,7 +26,7 @@ use crate::manifest::action::{ RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction, RegionMetaActionList, }; -use crate::manifest::storage::ManifestObjectStore; +use crate::manifest::storage::{file_version, is_delta_file, ManifestObjectStore}; /// Options for [RegionManifestManager]. #[derive(Debug, Clone)] @@ -160,6 +160,12 @@ impl RegionManifestManager { let inner = self.inner.read().await; inner.total_manifest_size() } + + /// Returns true if a newer version manifest file is found. + pub async fn check_update(&self) -> Result { + let inner = self.inner.read().await; + inner.check_update().await + } } #[cfg(test)] @@ -233,7 +239,7 @@ impl RegionManifestManagerInner { }) } - /// Open an existing manifest. + /// Opens an existing manifest. /// /// Returns `Ok(None)` if no such manifest. async fn open(options: RegionManifestOptions) -> Result> { @@ -323,7 +329,7 @@ impl RegionManifestManagerInner { Ok(()) } - /// Update the manifest. Return the current manifest version number. + /// Updates the manifest. Return the current manifest version number. async fn update(&mut self, action_list: RegionMetaActionList) -> Result { let version = self.increase_version(); self.store.save(version, &action_list.encode()?).await?; @@ -385,7 +391,7 @@ impl RegionManifestManagerInner { Ok(()) } - /// Make a new checkpoint. Return the fresh one if there are some actions to compact. + /// Makes a new checkpoint. Return the fresh one if there are some actions to compact. async fn do_checkpoint(&mut self) -> Result> { let last_checkpoint = Self::last_checkpoint(&mut self.store).await?; let current_version = self.last_version; @@ -459,7 +465,7 @@ impl RegionManifestManagerInner { Ok(Some(checkpoint)) } - /// Fetch the last [RegionCheckpoint] from storage. + /// Fetches the last [RegionCheckpoint] from storage. pub(crate) async fn last_checkpoint( store: &mut ManifestObjectStore, ) -> Result> { @@ -472,6 +478,28 @@ impl RegionManifestManagerInner { Ok(None) } } + + /// Returns true if a newer version manifest file is found. + /// + /// It is typically used in read-only regions to catch up with manifest. + pub(crate) async fn check_update(&self) -> Result { + let last_version = self.last_version; + let newer_manifests = self + .store + .get_paths(|entry| { + let file_name = entry.name(); + if is_delta_file(file_name) { + let version = file_version(file_name); + if version > last_version { + return Some((version, entry)); + } + } + None + }) + .await?; + + Ok(!newer_manifests.is_empty()) + } } #[cfg(test)] diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index b470a1255aa1..58755b1015b0 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -171,7 +171,12 @@ impl ManifestObjectStore { .context(OpenDalSnafu) } - /// Scan the manifest files in the range of [start, end) and return all manifest entries. + /// Sorts the manifest files. + fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) { + entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2)); + } + + /// Scans the manifest files in the range of [start, end) and return all manifest entries. pub async fn scan( &self, start: ManifestVersion, @@ -192,7 +197,7 @@ impl ManifestObjectStore { }) .await?; - entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2)); + Self::sort_manifests(&mut entries); Ok(entries) } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 083a42fb36c2..21d527d3cccb 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -123,6 +123,11 @@ impl MitoRegion { self.writable.load(Ordering::Relaxed) } + /// Returns the region dir. + pub(crate) fn region_dir(&self) -> &str { + self.access_layer.region_dir() + } + /// Sets the writable flag. pub(crate) fn set_writable(&self, writable: bool) { self.writable.store(writable, Ordering::Relaxed); diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index ea387ef2c552..e318154e649a 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -52,7 +52,7 @@ pub(crate) struct RegionOpener { object_store_manager: ObjectStoreManagerRef, region_dir: String, scheduler: SchedulerRef, - options: HashMap, + options: Option, cache_manager: Option, skip_wal_replay: bool, } @@ -73,7 +73,7 @@ impl RegionOpener { object_store_manager, region_dir: normalize_dir(region_dir), scheduler, - options: HashMap::new(), + options: None, cache_manager: None, skip_wal_replay: false, } @@ -85,9 +85,15 @@ impl RegionOpener { self } + /// Parses and sets options for the region. + pub(crate) fn parse_options(mut self, options: HashMap) -> Result { + self.options = Some(RegionOptions::try_from(&options)?); + Ok(self) + } + /// Sets options for the region. - pub(crate) fn options(mut self, value: HashMap) -> Self { - self.options = value; + pub(crate) fn options(mut self, options: RegionOptions) -> Self { + self.options = Some(options); self } @@ -109,7 +115,7 @@ impl RegionOpener { /// # Panics /// Panics if metadata is not set. pub(crate) async fn create_or_open( - self, + mut self, config: &MitoConfig, wal: &Wal, ) -> Result { @@ -144,7 +150,7 @@ impl RegionOpener { ); } } - let options = RegionOptions::try_from(&self.options)?; + let options = self.options.take().unwrap(); let object_store = self.object_store(&options.storage)?.clone(); // Create a manifest manager for this region and writes regions to the manifest file. @@ -214,7 +220,7 @@ impl RegionOpener { config: &MitoConfig, wal: &Wal, ) -> Result> { - let region_options = RegionOptions::try_from(&self.options)?; + let region_options = self.options.as_ref().unwrap().clone(); let region_manifest_options = self.manifest_options(config, ®ion_options)?; let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options).await? else { @@ -233,6 +239,7 @@ impl RegionOpener { self.cache_manager.clone(), )); let mutable = self.memtable_builder.build(&metadata); + debug_assert!(mutable.is_empty()); let version = VersionBuilder::new(metadata, mutable) .add_files(file_purger.clone(), manifest.files.values().cloned()) .flushed_entry_id(manifest.flushed_entry_id) @@ -344,12 +351,12 @@ pub(crate) fn check_recovered_region( } /// Replays the mutations from WAL and inserts mutations to memtable of given region. -async fn replay_memtable( +pub(crate) async fn replay_memtable( wal: &Wal, region_id: RegionId, flushed_entry_id: EntryId, version_control: &VersionControlRef, -) -> Result<()> { +) -> Result { let mut rows_replayed = 0; // Last entry id should start from flushed entry id since there might be no // data in the WAL. @@ -377,7 +384,7 @@ async fn replay_memtable( "Replay WAL for region: {}, rows recovered: {}, last entry id: {}", region_id, rows_replayed, last_entry_id ); - Ok(()) + Ok(last_entry_id) } /// Returns the directory to the manifest files. diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 311638792d99..8f04492ba234 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -32,9 +32,9 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_engine::SetReadonlyResponse; use store_api::region_request::{ - AffectedRows, RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, - RegionCreateRequest, RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, - RegionTruncateRequest, + AffectedRows, RegionAlterRequest, RegionCatchupRequest, RegionCloseRequest, + RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest, + RegionOpenRequest, RegionRequest, RegionTruncateRequest, }; use store_api::storage::{RegionId, SequenceNumber}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -550,6 +550,11 @@ impl WorkerRequest { sender: sender.into(), request: DdlRequest::Truncate(v), }), + RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest { + region_id, + sender: sender.into(), + request: DdlRequest::Catchup(v), + }), }; Ok((worker_request, receiver)) @@ -578,6 +583,7 @@ pub(crate) enum DdlRequest { Flush(RegionFlushRequest), Compact(RegionCompactRequest), Truncate(RegionTruncateRequest), + Catchup(RegionCatchupRequest), } /// Sender and Ddl request. diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 63416a640f3b..b2006098b6a3 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -139,6 +139,14 @@ impl TestEnv { MitoEngine::new(config, logstore, object_store_manager) } + /// Creates a new engine with specific config and existing logstore and object store manager. + pub async fn create_follower_engine(&mut self, config: MitoConfig) -> MitoEngine { + let logstore = self.logstore.as_ref().unwrap().clone(); + let object_store_manager = self.object_store_manager.as_ref().unwrap().clone(); + + MitoEngine::new(config, logstore, object_store_manager) + } + /// Creates a new engine with specific config and manager/listener under this env. pub async fn create_engine_with( &mut self, @@ -221,6 +229,7 @@ impl TestEnv { ) } + /// Returns the log store and object store manager. async fn create_log_and_object_store_manager( &self, ) -> (RaftEngineLogStore, ObjectStoreManager) { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 750e29b56751..c969fdd008c1 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -15,6 +15,7 @@ //! Structs and utilities for writing regions. mod handle_alter; +mod handle_catchup; mod handle_close; mod handle_compaction; mod handle_create; @@ -546,6 +547,7 @@ impl RegionWorkerLoop { continue; } DdlRequest::Truncate(_) => self.handle_truncate_request(ddl.region_id).await, + DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await, }; ddl.sender.send(res); diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs new file mode 100644 index 000000000000..88e81285a123 --- /dev/null +++ b/src/mito2/src/worker/handle_catchup.rs @@ -0,0 +1,96 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Handling catchup request. + +use std::sync::Arc; + +use snafu::ensure; +use store_api::logstore::LogStore; +use store_api::region_request::{AffectedRows, RegionCatchupRequest}; +use store_api::storage::RegionId; + +use crate::error::{self, Result}; +use crate::region::opener::{replay_memtable, RegionOpener}; +use crate::worker::RegionWorkerLoop; + +impl RegionWorkerLoop { + pub(crate) async fn handle_catchup_request( + &mut self, + region_id: RegionId, + request: RegionCatchupRequest, + ) -> Result { + let Some(region) = self.regions.get_region(region_id) else { + return error::RegionNotFoundSnafu { region_id }.fail(); + }; + + if region.is_writable() { + return Ok(0); + } + // Note: Currently, We protect the split brain by ensuring the mutable table is empty. + // It's expensive to execute catch-up requests without `set_writable=true` multiple times. + let is_mutable_empty = region.version().memtables.mutable.is_empty(); + + let region = if !is_mutable_empty || region.manifest_manager.check_update().await? { + let reopened_region = Arc::new( + RegionOpener::new( + region_id, + region.region_dir(), + self.memtable_builder.clone(), + self.object_store_manager.clone(), + self.scheduler.clone(), + ) + .cache(Some(self.cache_manager.clone())) + .options(region.version().options.clone()) + .open(&self.config, &self.wal) + .await?, + ); + debug_assert!(!reopened_region.is_writable()); + self.regions.insert_region(reopened_region.clone()); + + reopened_region + } else { + region + }; + + let flushed_entry_id = region.version_control.current().last_entry_id; + + let last_entry_id = replay_memtable( + &self.wal, + region_id, + flushed_entry_id, + ®ion.version_control, + ) + .await?; + + let expected_last_entry_id = request.entry_id; + + if let Some(expected_last_entry_id) = expected_last_entry_id { + ensure!( + expected_last_entry_id == last_entry_id, + error::UnexpectedReplaySnafu { + region_id, + expected_last_entry_id, + replayed_last_entry_id: last_entry_id, + } + ) + } + + if request.set_writable { + region.set_writable(true); + } + + Ok(0) + } +} diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 6d86ddb1887f..0af60793a0e9 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -63,7 +63,7 @@ impl RegionWorkerLoop { self.scheduler.clone(), ) .metadata(metadata) - .options(request.options) + .parse_options(request.options)? .cache(Some(self.cache_manager.clone())) .create_or_open(&self.config, &self.wal) .await?; diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 49100bee5199..a2a7f7d6609a 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -69,8 +69,8 @@ impl RegionWorkerLoop { self.object_store_manager.clone(), self.scheduler.clone(), ) - .options(request.options) .skip_wal_replay(request.skip_wal_replay) + .parse_options(request.options)? .cache(Some(self.cache_manager.clone())) .open(&self.config, &self.wal) .await?; diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index daa2ae3753a5..4aae0990120b 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -21,6 +21,7 @@ use api::v1::{self, Rows, SemanticType}; use snafu::{ensure, OptionExt}; use strum::IntoStaticStr; +use crate::logstore::entry; use crate::metadata::{ ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, MetadataError, RegionMetadata, Result, @@ -42,6 +43,7 @@ pub enum RegionRequest { Flush(RegionFlushRequest), Compact(RegionCompactRequest), Truncate(RegionTruncateRequest), + Catchup(RegionCatchupRequest), } impl RegionRequest { @@ -59,6 +61,7 @@ impl RegionRequest { RegionRequest::Flush(_) => "flush", RegionRequest::Compact(_) => "compact", RegionRequest::Truncate(_) => "truncate", + &&RegionRequest::Catchup(_) => "catchup", } } @@ -453,6 +456,19 @@ pub struct RegionCompactRequest {} #[derive(Debug)] pub struct RegionTruncateRequest {} +/// Catchup region request. +/// +/// Makes a readonly region to catch up to leader region changes. +/// There is no effect if it operating on a leader region. +#[derive(Debug)] +pub struct RegionCatchupRequest { + /// Sets it to writable if it's available after it has caught up with all changes. + pub set_writable: bool, + /// The `entry_id` that was expected to reply to. + /// `None` stands replaying to latest. + pub entry_id: Option, +} + impl fmt::Display for RegionRequest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -466,6 +482,7 @@ impl fmt::Display for RegionRequest { RegionRequest::Flush(_) => write!(f, "Flush"), RegionRequest::Compact(_) => write!(f, "Compact"), RegionRequest::Truncate(_) => write!(f, "Truncate"), + &RegionRequest::Catchup(_) => write!(f, "Catchup"), } } }