Skip to content

Commit

Permalink
feat: add set_readonly_gracefully for engine (GreptimeTeam#2787)
Browse files Browse the repository at this point in the history
* feat: add set_readonly_gracefully for engine

* chore: apply suggestions from CR

* chore: rename to set_readonly_test

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Nov 24, 2023
1 parent 3a4c9f2 commit 9e58bba
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 6 deletions.
9 changes: 8 additions & 1 deletion src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use query::query_engine::DescribeResult;
use query::QueryEngine;
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use table::TableRef;
Expand Down Expand Up @@ -191,6 +191,13 @@ impl RegionEngine for MockRegionEngine {
Ok(())
}

async fn set_readonly_gracefully(
&self,
_region_id: RegionId,
) -> Result<SetReadonlyResponse, BoxedError> {
unimplemented!()
}

fn role(&self, _region_id: RegionId) -> Option<RegionRole> {
Some(RegionRole::Leader)
}
Expand Down
15 changes: 14 additions & 1 deletion src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_telemetry::{error, info};
use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::{
RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest,
};
Expand Down Expand Up @@ -103,6 +103,19 @@ impl RegionEngine for FileRegionEngine {
.map_err(BoxedError::new)
}

async fn set_readonly_gracefully(
&self,
region_id: RegionId,
) -> Result<SetReadonlyResponse, BoxedError> {
let exists = self.inner.get_region(region_id).await.is_some();

if exists {
Ok(SetReadonlyResponse::success(None))
} else {
Ok(SetReadonlyResponse::NotFound)
}
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.state(region_id)
}
Expand Down
9 changes: 8 additions & 1 deletion src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -171,6 +171,13 @@ impl RegionEngine for MetricEngine {
todo!()
}

async fn set_readonly_gracefully(
&self,
region_id: RegionId,
) -> std::result::Result<SetReadonlyResponse, BoxedError> {
self.inner.mito.set_readonly_gracefully(region_id).await
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
todo!()
}
Expand Down
25 changes: 23 additions & 2 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ mod projection_test;
#[cfg(test)]
mod prune_test;
#[cfg(test)]
mod set_readonly_test;
#[cfg(test)]
mod truncate_test;

use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -49,7 +50,7 @@ use object_store::manager::ObjectStoreManagerRef;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};

Expand Down Expand Up @@ -188,6 +189,16 @@ impl EngineInner {
Ok(())
}

/// Sets read-only for a region and ensures no more writes in the region after it returns.
async fn set_readonly_gracefully(&self, region_id: RegionId) -> Result<SetReadonlyResponse> {
// Notes: It acquires the mutable ownership to ensure no other threads,
// Therefore, we submit it to the worker.
let (request, receiver) = WorkerRequest::new_set_readonly_gracefully(region_id);
self.workers.submit_to_worker(region_id, request).await?;

receiver.await.context(RecvSnafu)
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.workers.get_region(region_id).map(|region| {
if region.is_writable() {
Expand Down Expand Up @@ -261,6 +272,16 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}

async fn set_readonly_gracefully(
&self,
region_id: RegionId,
) -> Result<SetReadonlyResponse, BoxedError> {
self.inner
.set_readonly_gracefully(region_id)
.await
.map_err(BoxedError::new)
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.role(region_id)
}
Expand Down
111 changes: 111 additions & 0 deletions src/mito2/src/engine/set_readonly_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::{RegionEngine, SetReadonlyResponse};
use store_api::region_request::{RegionPutRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::config::MitoConfig;
use crate::request::WorkerRequest;
use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv};

#[tokio::test]
async fn test_set_readonly_gracefully() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let result = engine.set_readonly_gracefully(region_id).await.unwrap();
assert_eq!(
SetReadonlyResponse::Success {
last_entry_id: Some(0)
},
result
);

// set readonly again.
let result = engine.set_readonly_gracefully(region_id).await.unwrap();
assert_eq!(
SetReadonlyResponse::Success {
last_entry_id: Some(0)
},
result
);

let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};

let error = engine
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest { rows: rows.clone() }),
)
.await
.unwrap_err();

assert_eq!(error.status_code(), StatusCode::RegionReadonly);

engine.set_writable(region_id, true).unwrap();

put_rows(&engine, region_id, rows).await;

let result = engine.set_readonly_gracefully(region_id).await.unwrap();

assert_eq!(
SetReadonlyResponse::Success {
last_entry_id: Some(1)
},
result
);
}

#[tokio::test]
async fn test_set_readonly_gracefully_not_exist() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let non_exist_region_id = RegionId::new(1, 1);

// For inner `handle_inter_command`.
let (set_readonly_request, recv) =
WorkerRequest::new_set_readonly_gracefully(non_exist_region_id);
engine
.inner
.workers
.submit_to_worker(non_exist_region_id, set_readonly_request)
.await
.unwrap();
let result = recv.await.unwrap();
assert_eq!(SetReadonlyResponse::NotFound, result);

// For fast-path.
let result = engine
.set_readonly_gracefully(non_exist_region_id)
.await
.unwrap();
assert_eq!(SetReadonlyResponse::NotFound, result);
}
20 changes: 20 additions & 0 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use prost::Message;
use smallvec::SmallVec;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::region_engine::SetReadonlyResponse;
use store_api::region_request::{
RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
Expand Down Expand Up @@ -472,6 +473,14 @@ pub(crate) enum WorkerRequest {
notify: BackgroundNotify,
},

/// The internal commands.
SetReadonlyGracefully {
/// Id of the region to send.
region_id: RegionId,
/// The sender of [SetReadonlyResponse].
sender: Sender<SetReadonlyResponse>,
},

/// Notify a worker to stop.
Stop,
}
Expand Down Expand Up @@ -542,6 +551,17 @@ impl WorkerRequest {

Ok((worker_request, receiver))
}

pub(crate) fn new_set_readonly_gracefully(
region_id: RegionId,
) -> (WorkerRequest, Receiver<SetReadonlyResponse>) {
let (sender, receiver) = oneshot::channel();

(
WorkerRequest::SetReadonlyGracefully { region_id, sender },
receiver,
)
}
}

/// DDL request to a region.
Expand Down
22 changes: 21 additions & 1 deletion src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_engine::SetReadonlyResponse;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{mpsc, oneshot, Mutex};

use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::CompactionScheduler;
Expand Down Expand Up @@ -501,6 +502,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// For background notify, we handle it directly.
self.handle_background_notify(region_id, notify).await;
}
WorkerRequest::SetReadonlyGracefully { region_id, sender } => {
self.set_readonly_gracefully(region_id, sender).await;
}
// We receive a stop signal, but we still want to process remaining
// requests. The worker thread will then check the running flag and
// then exit.
Expand Down Expand Up @@ -563,6 +567,22 @@ impl<S: LogStore> RegionWorkerLoop<S> {
BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
}
}

/// Handles `set_readonly_gracefully`.
async fn set_readonly_gracefully(
&mut self,
region_id: RegionId,
sender: oneshot::Sender<SetReadonlyResponse>,
) {
if let Some(region) = self.regions.get_region(region_id) {
region.set_writable(false);

let last_entry_id = region.version_control.current().last_entry_id;
let _ = sender.send(SetReadonlyResponse::success(Some(last_entry_id)));
} else {
let _ = sender.send(SetReadonlyResponse::NotFound);
}
}
}

impl<S> RegionWorkerLoop<S> {
Expand Down
26 changes: 26 additions & 0 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,28 @@ use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use serde::{Deserialize, Serialize};

use crate::logstore::entry;
use crate::metadata::RegionMetadataRef;
use crate::region_request::RegionRequest;
use crate::storage::{RegionId, ScanRequest};

/// The result of setting readonly for the region.
#[derive(Debug, PartialEq, Eq)]
pub enum SetReadonlyResponse {
Success {
/// Returns `last_entry_id` of the region if available(e.g., It's not available in file engine).
last_entry_id: Option<entry::Id>,
},
NotFound,
}

impl SetReadonlyResponse {
/// Returns a [SetReadonlyResponse::Success] with the `last_entry_id`.
pub fn success(last_entry_id: Option<entry::Id>) -> Self {
Self::Success { last_entry_id }
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct GrantedRegion {
pub region_id: RegionId,
Expand Down Expand Up @@ -128,6 +146,14 @@ pub trait RegionEngine: Send + Sync {
/// take effect.
fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError>;

/// Sets readonly for a region gracefully.
///
/// After the call returns, the engine ensures no more write operations will succeed in the region.
async fn set_readonly_gracefully(
&self,
region_id: RegionId,
) -> Result<SetReadonlyResponse, BoxedError>;

/// Indicates region role.
///
/// Returns the `None` if the region is not found.
Expand Down

0 comments on commit 9e58bba

Please sign in to comment.