Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 23, 2023
1 parent 66bf820 commit 058f354
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 107 deletions.
4 changes: 2 additions & 2 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use query::query_engine::DescribeResult;
use query::QueryEngine;
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResult};
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use table::TableRef;
Expand Down Expand Up @@ -194,7 +194,7 @@ impl RegionEngine for MockRegionEngine {
async fn set_readonly_gracefully(
&self,
_region_id: RegionId,
) -> Result<SetReadonlyResult, BoxedError> {
) -> Result<SetReadonlyResponse, BoxedError> {
unimplemented!()
}

Expand Down
13 changes: 7 additions & 6 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_telemetry::{error, info};
use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResult};
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::{
RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest,
};
Expand Down Expand Up @@ -106,13 +106,14 @@ impl RegionEngine for FileRegionEngine {
async fn set_readonly_gracefully(
&self,
region_id: RegionId,
) -> Result<SetReadonlyResult, BoxedError> {
) -> Result<SetReadonlyResponse, BoxedError> {
let exist = self.inner.get_region(region_id).await.is_some();

Ok(SetReadonlyResult {
last_entry_id: None,
exist,
})
if exist {
Ok(SetReadonlyResponse::success(None))
} else {
Ok(SetReadonlyResponse::NotFound)
}
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
Expand Down
4 changes: 2 additions & 2 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResult};
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -176,7 +176,7 @@ impl RegionEngine for MetricEngine {
async fn set_readonly_gracefully(
&self,
region_id: RegionId,
) -> std::result::Result<SetReadonlyResult, BoxedError> {
) -> std::result::Result<SetReadonlyResponse, BoxedError> {
self.inner.mito.set_readonly_gracefully(region_id).await
}

Expand Down
35 changes: 11 additions & 24 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ mod create_test;
mod drop_test;
#[cfg(test)]
mod flush_test;
#[cfg(test)]
mod internal_command_test;
#[cfg(any(test, feature = "test"))]
pub mod listener;
#[cfg(test)]
Expand All @@ -39,6 +37,8 @@ mod projection_test;
#[cfg(test)]
mod prune_test;
#[cfg(test)]
mod set_readonly_gracefully_test;
#[cfg(test)]
mod truncate_test;
use std::sync::Arc;

Expand All @@ -50,7 +50,7 @@ use object_store::manager::ObjectStoreManagerRef;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResult};
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};

Expand Down Expand Up @@ -189,27 +189,14 @@ impl EngineInner {
Ok(())
}

async fn set_readonly_gracefully(&self, region_id: RegionId) -> Result<SetReadonlyResult> {
if let Some(region) = self.workers.get_region(region_id) {
if !region.is_writable() {
let last_entry_id = region.version_control.current().last_entry_id;

Ok(SetReadonlyResult {
last_entry_id: Some(last_entry_id),
exist: true,
})
} else {
let (request, receiver) = WorkerRequest::new_set_readonly_gracefully(region_id);
self.workers.submit_to_worker(region_id, request).await?;
/// Sets read-only for a region and ensures no more writes in the region after it returns.
async fn set_readonly_gracefully(&self, region_id: RegionId) -> Result<SetReadonlyResponse> {
// Notes: It acquires the mutable ownership to ensure no other threads,
// Therefore, we submit it to the worker.
let (request, receiver) = WorkerRequest::new_set_readonly_gracefully(region_id);
self.workers.submit_to_worker(region_id, request).await?;

receiver.await.context(RecvSnafu)
}
} else {
Ok(SetReadonlyResult {
last_entry_id: None,
exist: false,
})
}
receiver.await.context(RecvSnafu)
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
Expand Down Expand Up @@ -288,7 +275,7 @@ impl RegionEngine for MitoEngine {
async fn set_readonly_gracefully(
&self,
region_id: RegionId,
) -> Result<SetReadonlyResult, BoxedError> {
) -> Result<SetReadonlyResponse, BoxedError> {
self.inner
.set_readonly_gracefully(region_id)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::assert_matches::assert_matches;

use api::v1::Rows;
use common_error::ext::ErrorExt;
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{RegionEngine, SetReadonlyResponse};
use store_api::region_request::{RegionPutRequest, RegionRequest};
use store_api::storage::RegionId;

Expand All @@ -39,21 +39,22 @@ async fn test_set_readonly_gracefully() {
.await
.unwrap();

let (set_readonly_request, recv) = WorkerRequest::new_set_readonly_gracefully(region_id);
engine
.inner
.workers
.submit_to_worker(region_id, set_readonly_request)
.await
.unwrap();
let result = recv.await.unwrap();
assert!(result.exist);
assert_eq!(result.last_entry_id.unwrap(), 0);

// For fast-path.
let result = engine.set_readonly_gracefully(region_id).await.unwrap();
assert!(result.exist);
assert_eq!(result.last_entry_id.unwrap(), 0);
assert_eq!(
SetReadonlyResponse::Success {
last_entry_id: Some(0)
},
result
);

// set readonly again.
let result = engine.set_readonly_gracefully(region_id).await.unwrap();
assert_eq!(
SetReadonlyResponse::Success {
last_entry_id: Some(0)
},
result
);

let rows = Rows {
schema: column_schemas,
Expand All @@ -77,8 +78,13 @@ async fn test_set_readonly_gracefully() {
put_rows(&engine, region_id, rows).await;

let result = engine.set_readonly_gracefully(region_id).await.unwrap();
assert!(result.exist);
assert_eq!(result.last_entry_id.unwrap(), 1);

assert_eq!(
SetReadonlyResponse::Success {
last_entry_id: Some(1)
},
result
);
}

#[tokio::test]
Expand All @@ -98,14 +104,12 @@ async fn test_set_readonly_gracefully_not_exist() {
.await
.unwrap();
let result = recv.await.unwrap();
assert!(!result.exist);
assert!(result.last_entry_id.is_none());
assert_eq!(SetReadonlyResponse::NotFound, result);

// For fast-path.
let result = engine
.set_readonly_gracefully(non_exist_region_id)
.await
.unwrap();
assert!(!result.exist);
assert!(result.last_entry_id.is_none());
assert_eq!(SetReadonlyResponse::NotFound, result);
}
25 changes: 6 additions & 19 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use prost::Message;
use smallvec::SmallVec;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::region_engine::SetReadonlyResult;
use store_api::region_engine::SetReadonlyResponse;
use store_api::region_request::{
RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
Expand Down Expand Up @@ -469,11 +469,11 @@ pub(crate) enum WorkerRequest {
},

/// The internal commands.
Internal {
SetReadonlyGracefully {
/// Id of the region to send.
region_id: RegionId,

command: Command,
/// The sender of [SetReadonlyResponse].
sender: Sender<SetReadonlyResponse>,
},

/// Notify a worker to stop.
Expand Down Expand Up @@ -549,14 +549,11 @@ impl WorkerRequest {

pub(crate) fn new_set_readonly_gracefully(
region_id: RegionId,
) -> (WorkerRequest, Receiver<SetReadonlyResult>) {
) -> (WorkerRequest, Receiver<SetReadonlyResponse>) {
let (sender, receiver) = oneshot::channel();

(
WorkerRequest::Internal {
region_id,
command: Command::SetReadonlyGracefully(SetReadonlyGracefully { sender }),
},
WorkerRequest::SetReadonlyGracefully { region_id, sender },
receiver,
)
}
Expand Down Expand Up @@ -586,16 +583,6 @@ pub(crate) struct SenderDdlRequest {
pub(crate) request: DdlRequest,
}

#[derive(Debug)]
pub(crate) enum Command {
SetReadonlyGracefully(SetReadonlyGracefully),
}

#[derive(Debug)]
pub(crate) struct SetReadonlyGracefully {
pub(crate) sender: Sender<SetReadonlyResult>,
}

/// Notification from a background job.
#[derive(Debug)]
pub(crate) enum BackgroundNotify {
Expand Down
44 changes: 18 additions & 26 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_engine::SetReadonlyResult;
use store_api::region_engine::SetReadonlyResponse;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{mpsc, oneshot, Mutex};

use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::CompactionScheduler;
Expand All @@ -50,7 +50,7 @@ use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::memtable::MemtableBuilderRef;
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, Command, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::wal::Wal;
Expand Down Expand Up @@ -502,8 +502,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// For background notify, we handle it directly.
self.handle_background_notify(region_id, notify).await;
}
WorkerRequest::Internal { region_id, command } => {
self.handle_inter_command(region_id, command).await;
WorkerRequest::SetReadonlyGracefully { region_id, sender } => {
self.set_readonly_gracefully(region_id, sender).await;
}
// We receive a stop signal, but we still want to process remaining
// requests. The worker thread will then check the running flag and
Expand Down Expand Up @@ -568,27 +568,19 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
}

/// Handles internal command.
async fn handle_inter_command(&mut self, region_id: RegionId, command: Command) {
match command {
Command::SetReadonlyGracefully(req) => {
if let Some(region) = self.regions.get_region(region_id) {
if region.is_writable() {
region.set_writable(false);
}

let last_entry_id = region.version_control.current().last_entry_id;
let _ = req.sender.send(SetReadonlyResult {
last_entry_id: Some(last_entry_id),
exist: true,
});
} else {
let _ = req.sender.send(SetReadonlyResult {
last_entry_id: None,
exist: false,
});
}
}
/// Handles `set_readonly_gracefully`.
async fn set_readonly_gracefully(
&mut self,
region_id: RegionId,
sender: oneshot::Sender<SetReadonlyResponse>,
) {
if let Some(region) = self.regions.get_region(region_id) {
region.set_writable(false);

let last_entry_id = region.version_control.current().last_entry_id;
let _ = sender.send(SetReadonlyResponse::success(Some(last_entry_id)));
} else {
let _ = sender.send(SetReadonlyResponse::NotFound);
}
}
}
Expand Down
23 changes: 16 additions & 7 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,26 @@ use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use serde::{Deserialize, Serialize};

use crate::logstore::entry;
use crate::metadata::RegionMetadataRef;
use crate::region_request::RegionRequest;
use crate::storage::{RegionId, ScanRequest};

/// The result of setting readonly for the region.
#[derive(Debug)]
pub struct SetReadonlyResult {
/// Returns `last_entry_id` of the region if available(e.g., It's not available in file engine).
pub last_entry_id: Option<u64>,
/// Returns true if the region exist.
pub exist: bool,
#[derive(Debug, PartialEq, Eq)]
pub enum SetReadonlyResponse {
Success {
/// Returns `last_entry_id` of the region if available(e.g., It's not available in file engine).
last_entry_id: Option<entry::Id>,
},
NotFound,
}

impl SetReadonlyResponse {
/// Returns a [SetReadonlyResponse::Success] with the `last_entry_id`.
pub fn success(last_entry_id: Option<entry::Id>) -> Self {
Self::Success { last_entry_id }
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -143,7 +152,7 @@ pub trait RegionEngine: Send + Sync {
async fn set_readonly_gracefully(
&self,
region_id: RegionId,
) -> Result<SetReadonlyResult, BoxedError>;
) -> Result<SetReadonlyResponse, BoxedError>;

/// Indicates region role.
///
Expand Down

0 comments on commit 058f354

Please sign in to comment.