diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 20896efd25c5..76e3bd1bdf2b 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -37,7 +37,7 @@ use query::query_engine::DescribeResult; use query::QueryEngine; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionEngine, RegionRole}; +use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; use table::TableRef; @@ -191,6 +191,13 @@ impl RegionEngine for MockRegionEngine { Ok(()) } + async fn set_readonly_gracefully( + &self, + _region_id: RegionId, + ) -> Result { + unimplemented!() + } + fn role(&self, _region_id: RegionId) -> Option { Some(RegionRole::Leader) } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 867d39a28219..485761af1861 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -24,7 +24,7 @@ use common_telemetry::{error, info}; use object_store::ObjectStore; use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionEngine, RegionRole}; +use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; use store_api::region_request::{ RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest, }; @@ -103,6 +103,19 @@ impl RegionEngine for FileRegionEngine { .map_err(BoxedError::new) } + async fn set_readonly_gracefully( + &self, + region_id: RegionId, + ) -> Result { + let exists = self.inner.get_region(region_id).await.is_some(); + + if exists { + Ok(SetReadonlyResponse::success(None)) + } else { + Ok(SetReadonlyResponse::NotFound) + } + } + fn role(&self, region_id: RegionId) -> Option { self.inner.state(region_id) } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 7888f75be8dd..7abc0cfacf15 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -27,7 +27,7 @@ use common_query::Output; use common_recordbatch::SendableRecordBatchStream; use mito2::engine::MitoEngine; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionEngine, RegionRole}; +use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::RwLock; @@ -171,6 +171,13 @@ impl RegionEngine for MetricEngine { todo!() } + async fn set_readonly_gracefully( + &self, + region_id: RegionId, + ) -> std::result::Result { + self.inner.mito.set_readonly_gracefully(region_id).await + } + fn role(&self, region_id: RegionId) -> Option { todo!() } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 3115cb180d5d..558dec2a6c68 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -37,8 +37,9 @@ mod projection_test; #[cfg(test)] mod prune_test; #[cfg(test)] +mod set_readonly_test; +#[cfg(test)] mod truncate_test; - use std::sync::Arc; use async_trait::async_trait; @@ -49,7 +50,7 @@ use object_store::manager::ObjectStoreManagerRef; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionEngine, RegionRole}; +use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; @@ -188,6 +189,16 @@ impl EngineInner { Ok(()) } + /// Sets read-only for a region and ensures no more writes in the region after it returns. + async fn set_readonly_gracefully(&self, region_id: RegionId) -> Result { + // Notes: It acquires the mutable ownership to ensure no other threads, + // Therefore, we submit it to the worker. + let (request, receiver) = WorkerRequest::new_set_readonly_gracefully(region_id); + self.workers.submit_to_worker(region_id, request).await?; + + receiver.await.context(RecvSnafu) + } + fn role(&self, region_id: RegionId) -> Option { self.workers.get_region(region_id).map(|region| { if region.is_writable() { @@ -261,6 +272,16 @@ impl RegionEngine for MitoEngine { .map_err(BoxedError::new) } + async fn set_readonly_gracefully( + &self, + region_id: RegionId, + ) -> Result { + self.inner + .set_readonly_gracefully(region_id) + .await + .map_err(BoxedError::new) + } + fn role(&self, region_id: RegionId) -> Option { self.inner.role(region_id) } diff --git a/src/mito2/src/engine/set_readonly_test.rs b/src/mito2/src/engine/set_readonly_test.rs new file mode 100644 index 000000000000..e658352ceb17 --- /dev/null +++ b/src/mito2/src/engine/set_readonly_test.rs @@ -0,0 +1,111 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::Rows; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use store_api::region_engine::{RegionEngine, SetReadonlyResponse}; +use store_api::region_request::{RegionPutRequest, RegionRequest}; +use store_api::storage::RegionId; + +use crate::config::MitoConfig; +use crate::request::WorkerRequest; +use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; + +#[tokio::test] +async fn test_set_readonly_gracefully() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let result = engine.set_readonly_gracefully(region_id).await.unwrap(); + assert_eq!( + SetReadonlyResponse::Success { + last_entry_id: Some(0) + }, + result + ); + + // set readonly again. + let result = engine.set_readonly_gracefully(region_id).await.unwrap(); + assert_eq!( + SetReadonlyResponse::Success { + last_entry_id: Some(0) + }, + result + ); + + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 3), + }; + + let error = engine + .handle_request( + region_id, + RegionRequest::Put(RegionPutRequest { rows: rows.clone() }), + ) + .await + .unwrap_err(); + + assert_eq!(error.status_code(), StatusCode::RegionReadonly); + + engine.set_writable(region_id, true).unwrap(); + + put_rows(&engine, region_id, rows).await; + + let result = engine.set_readonly_gracefully(region_id).await.unwrap(); + + assert_eq!( + SetReadonlyResponse::Success { + last_entry_id: Some(1) + }, + result + ); +} + +#[tokio::test] +async fn test_set_readonly_gracefully_not_exist() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let non_exist_region_id = RegionId::new(1, 1); + + // For inner `handle_inter_command`. + let (set_readonly_request, recv) = + WorkerRequest::new_set_readonly_gracefully(non_exist_region_id); + engine + .inner + .workers + .submit_to_worker(non_exist_region_id, set_readonly_request) + .await + .unwrap(); + let result = recv.await.unwrap(); + assert_eq!(SetReadonlyResponse::NotFound, result); + + // For fast-path. + let result = engine + .set_readonly_gracefully(non_exist_region_id) + .await + .unwrap(); + assert_eq!(SetReadonlyResponse::NotFound, result); +} diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 5ebd5fae110c..8bc4ceac75f8 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -32,6 +32,7 @@ use prost::Message; use smallvec::SmallVec; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; +use store_api::region_engine::SetReadonlyResponse; use store_api::region_request::{ RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, @@ -472,6 +473,14 @@ pub(crate) enum WorkerRequest { notify: BackgroundNotify, }, + /// The internal commands. + SetReadonlyGracefully { + /// Id of the region to send. + region_id: RegionId, + /// The sender of [SetReadonlyResponse]. + sender: Sender, + }, + /// Notify a worker to stop. Stop, } @@ -542,6 +551,17 @@ impl WorkerRequest { Ok((worker_request, receiver)) } + + pub(crate) fn new_set_readonly_gracefully( + region_id: RegionId, + ) -> (WorkerRequest, Receiver) { + let (sender, receiver) = oneshot::channel(); + + ( + WorkerRequest::SetReadonlyGracefully { region_id, sender }, + receiver, + ) + } } /// DDL request to a region. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 64f7472c3842..2ffc86d48f1e 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -36,9 +36,10 @@ use futures::future::try_join_all; use object_store::manager::ObjectStoreManagerRef; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; +use store_api::region_engine::SetReadonlyResponse; use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, oneshot, Mutex}; use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::CompactionScheduler; @@ -501,6 +502,9 @@ impl RegionWorkerLoop { // For background notify, we handle it directly. self.handle_background_notify(region_id, notify).await; } + WorkerRequest::SetReadonlyGracefully { region_id, sender } => { + self.set_readonly_gracefully(region_id, sender).await; + } // We receive a stop signal, but we still want to process remaining // requests. The worker thread will then check the running flag and // then exit. @@ -563,6 +567,22 @@ impl RegionWorkerLoop { BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await, } } + + /// Handles `set_readonly_gracefully`. + async fn set_readonly_gracefully( + &mut self, + region_id: RegionId, + sender: oneshot::Sender, + ) { + if let Some(region) = self.regions.get_region(region_id) { + region.set_writable(false); + + let last_entry_id = region.version_control.current().last_entry_id; + let _ = sender.send(SetReadonlyResponse::success(Some(last_entry_id))); + } else { + let _ = sender.send(SetReadonlyResponse::NotFound); + } + } } impl RegionWorkerLoop { diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index f300931bb109..27c0d7a93a6c 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -23,10 +23,28 @@ use common_query::Output; use common_recordbatch::SendableRecordBatchStream; use serde::{Deserialize, Serialize}; +use crate::logstore::entry; use crate::metadata::RegionMetadataRef; use crate::region_request::RegionRequest; use crate::storage::{RegionId, ScanRequest}; +/// The result of setting readonly for the region. +#[derive(Debug, PartialEq, Eq)] +pub enum SetReadonlyResponse { + Success { + /// Returns `last_entry_id` of the region if available(e.g., It's not available in file engine). + last_entry_id: Option, + }, + NotFound, +} + +impl SetReadonlyResponse { + /// Returns a [SetReadonlyResponse::Success] with the `last_entry_id`. + pub fn success(last_entry_id: Option) -> Self { + Self::Success { last_entry_id } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct GrantedRegion { pub region_id: RegionId, @@ -128,6 +146,14 @@ pub trait RegionEngine: Send + Sync { /// take effect. fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError>; + /// Sets readonly for a region gracefully. + /// + /// After the call returns, the engine ensures no more write operations will succeed in the region. + async fn set_readonly_gracefully( + &self, + region_id: RegionId, + ) -> Result; + /// Indicates region role. /// /// Returns the `None` if the region is not found.