Skip to content

Commit

Permalink
feat: add set_readonly_gracefully for engine
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 21, 2023
1 parent 20f0121 commit 18f2217
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 6 deletions.
9 changes: 8 additions & 1 deletion src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use query::query_engine::DescribeResult;
use query::QueryEngine;
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResult};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use table::TableRef;
Expand Down Expand Up @@ -191,6 +191,13 @@ impl RegionEngine for MockRegionEngine {
Ok(())
}

async fn set_readonly_gracefully(
&self,
_region_id: RegionId,
) -> Result<SetReadonlyResult, BoxedError> {
unimplemented!()
}

fn role(&self, _region_id: RegionId) -> Option<RegionRole> {
Some(RegionRole::Leader)
}
Expand Down
14 changes: 13 additions & 1 deletion src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_telemetry::{error, info};
use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResult};
use store_api::region_request::{
RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest,
};
Expand Down Expand Up @@ -103,6 +103,18 @@ impl RegionEngine for FileRegionEngine {
.map_err(BoxedError::new)
}

async fn set_readonly_gracefully(
&self,
region_id: RegionId,
) -> Result<SetReadonlyResult, BoxedError> {
let exist = self.inner.get_region(region_id).await.is_some();

Ok(SetReadonlyResult {
last_entry_id: None,
exist,
})
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.state(region_id)
}
Expand Down
9 changes: 8 additions & 1 deletion src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResult};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -173,6 +173,13 @@ impl RegionEngine for MetricEngine {
todo!()
}

async fn set_readonly_gracefully(
&self,
region_id: RegionId,
) -> std::result::Result<SetReadonlyResult, BoxedError> {
self.inner.mito.set_readonly_gracefully(region_id).await
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
todo!()
}
Expand Down
38 changes: 36 additions & 2 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ mod create_test;
mod drop_test;
#[cfg(test)]
mod flush_test;
#[cfg(test)]
mod internal_command_test;
#[cfg(any(test, feature = "test"))]
pub mod listener;
#[cfg(test)]
Expand All @@ -38,7 +40,6 @@ mod projection_test;
mod prune_test;
#[cfg(test)]
mod truncate_test;

use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -49,7 +50,7 @@ use object_store::manager::ObjectStoreManagerRef;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResult};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};

Expand Down Expand Up @@ -188,6 +189,29 @@ impl EngineInner {
Ok(())
}

async fn set_readonly_gracefully(&self, region_id: RegionId) -> Result<SetReadonlyResult> {
if let Some(region) = self.workers.get_region(region_id) {
if !region.is_writable() {
let last_entry_id = region.version_control.current().last_entry_id;

Ok(SetReadonlyResult {
last_entry_id: Some(last_entry_id),
exist: true,
})
} else {
let (request, receiver) = WorkerRequest::new_set_readonly_gracefully(region_id);
self.workers.submit_to_worker(region_id, request).await?;

receiver.await.context(RecvSnafu)
}
} else {
Ok(SetReadonlyResult {
last_entry_id: None,
exist: false,
})
}
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.workers.get_region(region_id).map(|region| {
if region.is_writable() {
Expand Down Expand Up @@ -261,6 +285,16 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}

async fn set_readonly_gracefully(
&self,
region_id: RegionId,
) -> Result<SetReadonlyResult, BoxedError> {
self.inner
.set_readonly_gracefully(region_id)
.await
.map_err(BoxedError::new)
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.role(region_id)
}
Expand Down
111 changes: 111 additions & 0 deletions src/mito2/src/engine/internal_command_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::assert_matches::assert_matches;

use api::v1::Rows;
use common_error::ext::ErrorExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionPutRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::config::MitoConfig;
use crate::error::Error;
use crate::request::WorkerRequest;
use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv};

#[tokio::test]
async fn test_set_readonly_gracefully() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let (set_readonly_request, recv) = WorkerRequest::new_set_readonly_gracefully(region_id);
engine
.inner
.workers
.submit_to_worker(region_id, set_readonly_request)
.await
.unwrap();
let result = recv.await.unwrap();
assert!(result.exist);
assert_eq!(result.last_entry_id.unwrap(), 0);

// For fast-path.
let result = engine.set_readonly_gracefully(region_id).await.unwrap();
assert!(result.exist);
assert_eq!(result.last_entry_id.unwrap(), 0);

let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};

let error = engine
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest { rows: rows.clone() }),
)
.await
.unwrap_err();

let error = error.as_any().downcast_ref::<Error>().unwrap();

assert_matches!(error, Error::RegionReadonly { .. });

engine.set_writable(region_id, true).unwrap();

put_rows(&engine, region_id, rows).await;

let result = engine.set_readonly_gracefully(region_id).await.unwrap();
assert!(result.exist);
assert_eq!(result.last_entry_id.unwrap(), 1);
}

#[tokio::test]
async fn test_set_readonly_gracefully_not_exist() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let non_exist_region_id = RegionId::new(1, 1);

// For inner `handle_inter_command`.
let (set_readonly_request, recv) =
WorkerRequest::new_set_readonly_gracefully(non_exist_region_id);
engine
.inner
.workers
.submit_to_worker(non_exist_region_id, set_readonly_request)
.await
.unwrap();
let result = recv.await.unwrap();
assert!(!result.exist);
assert!(result.last_entry_id.is_none());

// For fast-path.
let result = engine
.set_readonly_gracefully(non_exist_region_id)
.await
.unwrap();
assert!(!result.exist);
assert!(result.last_entry_id.is_none());
}
1 change: 1 addition & 0 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! Mito is the a region engine to store timeseries data.
#![feature(let_chains)]
#![feature(assert_matches)]

#[cfg(any(test, feature = "test"))]
#[cfg_attr(feature = "test", allow(unused))]
Expand Down
33 changes: 33 additions & 0 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use prost::Message;
use smallvec::SmallVec;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::region_engine::SetReadonlyResult;
use store_api::region_request::{
RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
Expand Down Expand Up @@ -467,6 +468,14 @@ pub(crate) enum WorkerRequest {
notify: BackgroundNotify,
},

/// The internal commands.
Internal {
/// Id of the region to send.
region_id: RegionId,

command: Command,
},

/// Notify a worker to stop.
Stop,
}
Expand Down Expand Up @@ -537,6 +546,20 @@ impl WorkerRequest {

Ok((worker_request, receiver))
}

pub(crate) fn new_set_readonly_gracefully(
region_id: RegionId,
) -> (WorkerRequest, Receiver<SetReadonlyResult>) {
let (sender, receiver) = oneshot::channel();

(
WorkerRequest::Internal {
region_id,
command: Command::SetReadonlyGracefully(SetReadonlyGracefully { sender }),
},
receiver,
)
}
}

/// DDL request to a region.
Expand All @@ -563,6 +586,16 @@ pub(crate) struct SenderDdlRequest {
pub(crate) request: DdlRequest,
}

#[derive(Debug)]
pub(crate) enum Command {
SetReadonlyGracefully(SetReadonlyGracefully),
}

#[derive(Debug)]
pub(crate) struct SetReadonlyGracefully {
pub(crate) sender: Sender<SetReadonlyResult>,
}

/// Notification from a background job.
#[derive(Debug)]
pub(crate) enum BackgroundNotify {
Expand Down
30 changes: 29 additions & 1 deletion src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_engine::SetReadonlyResult;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, Mutex};
Expand All @@ -49,7 +50,7 @@ use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::memtable::MemtableBuilderRef;
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
BackgroundNotify, Command, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::wal::Wal;
Expand Down Expand Up @@ -501,6 +502,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// For background notify, we handle it directly.
self.handle_background_notify(region_id, notify).await;
}
WorkerRequest::Internal { region_id, command } => {
self.handle_inter_command(region_id, command).await;
}
// We receive a stop signal, but we still want to process remaining
// requests. The worker thread will then check the running flag and
// then exit.
Expand Down Expand Up @@ -563,6 +567,30 @@ impl<S: LogStore> RegionWorkerLoop<S> {
BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
}
}

/// Handles internal command.
async fn handle_inter_command(&mut self, region_id: RegionId, command: Command) {
match command {
Command::SetReadonlyGracefully(req) => {
if let Some(region) = self.regions.get_region(region_id) {
if region.is_writable() {
region.set_writable(false);
}

let last_entry_id = region.version_control.current().last_entry_id;
let _ = req.sender.send(SetReadonlyResult {
last_entry_id: Some(last_entry_id),
exist: true,
});
} else {
let _ = req.sender.send(SetReadonlyResult {
last_entry_id: None,
exist: false,
});
}
}
}
}
}

impl<S> RegionWorkerLoop<S> {
Expand Down
Loading

0 comments on commit 18f2217

Please sign in to comment.