Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): Add writable flag to region #2349

Merged
merged 17 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/common/error/src/status_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub enum StatusCode {
DatabaseNotFound = 4004,
RegionNotFound = 4005,
RegionAlreadyExists = 4006,
RegionReadonly = 4007,
// ====== End of catalog related status code =======

// ====== Begin of storage related status code =====
Expand Down Expand Up @@ -117,6 +118,7 @@ impl StatusCode {
| StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::RegionAlreadyExists
| StatusCode::RegionReadonly
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists
| StatusCode::DatabaseNotFound
Expand Down Expand Up @@ -151,6 +153,7 @@ impl StatusCode {
| StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::RegionAlreadyExists
| StatusCode::RegionReadonly
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists
| StatusCode::DatabaseNotFound
Expand Down Expand Up @@ -183,6 +186,7 @@ impl StatusCode {
v if v == StatusCode::RegionAlreadyExists as u32 => {
Some(StatusCode::RegionAlreadyExists)
}
v if v == StatusCode::RegionReadonly as u32 => Some(StatusCode::RegionReadonly),
v if v == StatusCode::TableColumnNotFound as u32 => {
Some(StatusCode::TableColumnNotFound)
}
Expand Down
8 changes: 3 additions & 5 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,16 @@ mod twcs;
use std::sync::Arc;
use std::time::Duration;

use common_query::Output;
use common_telemetry::debug;
pub use picker::CompactionPickerRef;
use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;

use crate::access_layer::AccessLayerRef;
use crate::compaction::twcs::TwcsPicker;
use crate::error;
use crate::error::Result;
use crate::region::version::VersionRef;
use crate::request::WorkerRequest;
use crate::request::{OptionOutputTx, WorkerRequest};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::FilePurgerRef;

Expand All @@ -43,7 +41,7 @@ pub struct CompactionRequest {
pub(crate) ttl: Option<Duration>,
pub(crate) compaction_time_window: Option<i64>,
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
pub(crate) waiter: Option<oneshot::Sender<error::Result<Output>>>,
pub(crate) waiter: OptionOutputTx,
pub(crate) file_purger: FilePurgerRef,
}

Expand Down
15 changes: 7 additions & 8 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
use tokio::sync::oneshot::Sender;

use crate::access_layer::AccessLayerRef;
use crate::compaction::output::CompactionOutput;
use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::CompactionRequest;
use crate::error;
use crate::error::CompactRegionSnafu;
use crate::request::{BackgroundNotify, CompactionFailed, CompactionFinished, WorkerRequest};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OptionOutputTx, WorkerRequest,
};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::version::LevelMeta;
use crate::worker::send_result;

const MAX_PARALLEL_COMPACTION: usize = 8;

Expand Down Expand Up @@ -157,7 +157,7 @@ impl Picker for TwcsPicker {

if outputs.is_empty() && expired_ssts.is_empty() {
// Nothing to compact.
send_result(waiter, Ok(Output::AffectedRows(0)));
waiter.send(Ok(Output::AffectedRows(0)));
return None;
}
let task = TwcsCompactionTask {
Expand Down Expand Up @@ -228,7 +228,7 @@ pub(crate) struct TwcsCompactionTask {
/// Request sender to notify the worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Sender that are used to notify waiters waiting for pending compaction tasks.
pub sender: Option<Sender<error::Result<Output>>>,
pub sender: OptionOutputTx,
}

impl Debug for TwcsCompactionTask {
Expand Down Expand Up @@ -321,11 +321,10 @@ impl TwcsCompactionTask {

/// Handles compaction failure, notifies all waiters.
fn on_failure(&mut self, err: Arc<error::Error>) {
if let Some(sender) = self.sender.take() {
let _ = sender.send(Err(err.clone()).context(CompactRegionSnafu {
self.sender
.send_mut(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}

/// Notifies region worker to handle post-compaction tasks.
Expand Down
22 changes: 20 additions & 2 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ impl MitoEngine {
self.inner.workers.is_region_exists(region_id)
}

fn scan(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
/// Returns a scanner to scan for `request`.
fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.inner.handle_query(region_id, request)
}

Expand Down Expand Up @@ -143,6 +144,17 @@ impl EngineInner {

scan_region.scanner()
}

/// Set writable mode for a region.
fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<()> {
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;

region.set_writable(writable);
Ok(())
}
}

#[async_trait]
Expand All @@ -168,7 +180,7 @@ impl RegionEngine for MitoEngine {
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
self.scan(region_id, request)
self.scanner(region_id, request)
.map_err(BoxedError::new)?
.scan()
.await
Expand All @@ -191,6 +203,12 @@ impl RegionEngine for MitoEngine {
async fn stop(&self) -> std::result::Result<(), BoxedError> {
self.inner.stop().await.map_err(BoxedError::new)
}

fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> {
self.inner
.set_writable(region_id, writable)
.map_err(BoxedError::new)
}
}

// Tests methods.
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::test_util::{

async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
Expand Down Expand Up @@ -206,6 +206,8 @@ async fn test_put_after_alter() {
)
.await
.unwrap();
// Set writable.
engine.set_writable(region_id, true).unwrap();

// Put with old schema.
let rows = Rows {
Expand Down
10 changes: 5 additions & 5 deletions src/mito2/src/engine/flush_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn test_manual_flush() {
flush_region(&engine, region_id).await;

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
Expand Down Expand Up @@ -110,7 +110,7 @@ async fn test_flush_engine() {
listener.wait().await;

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
Expand Down Expand Up @@ -175,7 +175,7 @@ async fn test_write_stall() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
Expand Down Expand Up @@ -211,7 +211,7 @@ async fn test_flush_empty() {
flush_region(&engine, region_id).await;

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(0, scanner.num_files());
let stream = scanner.scan().await.unwrap();
Expand Down Expand Up @@ -255,7 +255,7 @@ async fn test_flush_reopen_region() {
};
check_region();

reopen_region(&engine, region_id, region_dir).await;
reopen_region(&engine, region_id, region_dir, true).await;
check_region();

// Puts again.
Expand Down
54 changes: 46 additions & 8 deletions src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,27 @@

use std::collections::HashMap;

use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionOpenRequest, RegionRequest};
use store_api::region_request::{RegionOpenRequest, RegionPutRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::config::MitoConfig;
use crate::test_util::{reopen_region, CreateRequestBuilder, TestEnv};
use crate::test_util::{
build_rows, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv,
};

#[tokio::test]
async fn test_engine_open_empty() {
let mut env = TestEnv::with_prefix("open-empty");
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let err = engine
.handle_request(
RegionId::new(1, 1),
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir: "empty".to_string(),
Expand All @@ -39,10 +43,9 @@ async fn test_engine_open_empty() {
)
.await
.unwrap_err();
assert!(
matches!(err.status_code(), StatusCode::RegionNotFound),
"unexpected err: {err}"
);
assert_eq!(StatusCode::RegionNotFound, err.status_code());
let err = engine.set_writable(region_id, true).unwrap_err();
assert_eq!(StatusCode::RegionNotFound, err.status_code());
}

#[tokio::test]
Expand Down Expand Up @@ -84,6 +87,41 @@ async fn test_engine_reopen_region() {
.await
.unwrap();

reopen_region(&engine, region_id, region_dir).await;
reopen_region(&engine, region_id, region_dir, false).await;
assert!(engine.is_region_exists(region_id));
}

#[tokio::test]
async fn test_engine_open_readonly() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

reopen_region(&engine, region_id, region_dir, false).await;

// Region is readonly.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 2),
};
let err = engine
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest { rows: rows.clone() }),
)
.await
.unwrap_err();
assert_eq!(StatusCode::RegionReadonly, err.status_code());

// Set writable and write.
engine.set_writable(region_id, true).unwrap();
put_rows(&engine, region_id, rows).await;
}
9 changes: 3 additions & 6 deletions src/mito2/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ async fn test_region_replay() {
assert_eq!(0, rows);

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(42, batches.iter().map(|b| b.num_rows()).sum::<usize>());

Expand Down Expand Up @@ -216,8 +215,7 @@ async fn test_put_delete() {
delete_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
Expand Down Expand Up @@ -274,8 +272,7 @@ async fn test_put_overwrite() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
Expand Down
7 changes: 7 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,12 @@ pub enum Error {
source: store_api::metadata::MetadataError,
location: Location,
},

#[snafu(display("Region {} is read only, location: {}", region_id, location))]
RegionReadonly {
region_id: RegionId,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -544,6 +550,7 @@ impl ErrorExt for Error {
CompactRegion { source, .. } => source.status_code(),
CompatReader { .. } => StatusCode::Unexpected,
InvalidRegionRequest { source, .. } => source.status_code(),
RegionReadonly { .. } => StatusCode::RegionReadonly,
}
}

Expand Down
Loading