diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 20c7e0d050ae..38cbc6e4ac4e 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -13,6 +13,7 @@ // limitations under the License. mod alter; +mod catchup; mod close; mod create; mod drop; @@ -147,8 +148,7 @@ impl RegionEngine for MetricEngine { | RegionRequest::Flush(_) | RegionRequest::Compact(_) | RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(), - // It always Ok(0), all data is the latest. - RegionRequest::Catchup(_) => Ok(0), + RegionRequest::Catchup(ref req) => self.inner.catchup_region(region_id, *req).await, }; result.map_err(BoxedError::new).map(|rows| RegionResponse { diff --git a/src/metric-engine/src/engine/catchup.rs b/src/metric-engine/src/engine/catchup.rs new file mode 100644 index 000000000000..0e6aee1e3883 --- /dev/null +++ b/src/metric-engine/src/engine/catchup.rs @@ -0,0 +1,61 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ResultExt; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{AffectedRows, RegionCatchupRequest, RegionRequest}; +use store_api::storage::RegionId; + +use crate::engine::MetricEngineInner; +use crate::error::{MitoCatchupOperationSnafu, Result, UnsupportedRegionRequestSnafu}; +use crate::utils; + +impl MetricEngineInner { + pub async fn catchup_region( + &self, + region_id: RegionId, + req: RegionCatchupRequest, + ) -> Result { + if !self.is_physical_region(region_id) { + return UnsupportedRegionRequestSnafu { + request: RegionRequest::Catchup(req), + } + .fail(); + } + let metadata_region_id = utils::to_metadata_region_id(region_id); + // TODO(weny): improve the catchup, we can read the wal entries only once. + self.mito + .handle_request( + metadata_region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: req.set_writable, + entry_id: None, + }), + ) + .await + .context(MitoCatchupOperationSnafu)?; + + self.mito + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: req.set_writable, + entry_id: req.entry_id, + }), + ) + .await + .context(MitoCatchupOperationSnafu) + .map(|response| response.affected_rows) + } +} diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index f52bc6e3cc72..340f4f19bcfa 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -121,6 +121,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Mito catchup operation fails"))] + MitoCatchupOperation { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to collect record batch stream"))] CollectRecordBatchStream { source: common_recordbatch::error::Error, @@ -267,7 +274,8 @@ impl ErrorExt for Error { | OpenMitoRegion { source, .. } | CloseMitoRegion { source, .. } | MitoReadOperation { source, .. } - | MitoWriteOperation { source, .. } => source.status_code(), + | MitoWriteOperation { source, .. } + | MitoCatchupOperation { source, .. } => source.status_code(), CollectRecordBatchStream { source, .. } => source.status_code(), diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 70271f017b35..1452fcbe6125 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -666,7 +666,7 @@ pub struct RegionTruncateRequest {} /// /// Makes a readonly region to catch up to leader region changes. /// There is no effect if it operating on a leader region. -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct RegionCatchupRequest { /// Sets it to writable if it's available after it has caught up with all changes. pub set_writable: bool, diff --git a/tests-fuzz/targets/unstable/fuzz_create_table_standalone.rs b/tests-fuzz/targets/unstable/fuzz_create_table_standalone.rs index ad0a64f7d28c..5f499be33d8f 100644 --- a/tests-fuzz/targets/unstable/fuzz_create_table_standalone.rs +++ b/tests-fuzz/targets/unstable/fuzz_create_table_standalone.rs @@ -76,7 +76,6 @@ impl Arbitrary<'_> for FuzzInput { const DEFAULT_TEMPLATE: &str = "standalone.template.toml"; const DEFAULT_CONFIG_NAME: &str = "standalone.template.toml"; const DEFAULT_ROOT_DIR: &str = "/tmp/unstable_greptime/"; -const DEFAULT_DATA_HOME: &str = "/tmp/unstable_greptime/datahome/"; const DEFAULT_MYSQL_URL: &str = "127.0.0.1:4002"; const DEFAULT_HTTP_HEALTH_URL: &str = "http://127.0.0.1:4000/health"; @@ -94,11 +93,11 @@ fn generate_create_table_expr(rng: &mut R) -> CreateTableExpr create_table_generator.generate(rng).unwrap() } -async fn connect_mysql(addr: &str) -> Pool { +async fn connect_mysql(addr: &str, database: &str) -> Pool { loop { match MySqlPoolOptions::new() .acquire_timeout(Duration::from_secs(30)) - .connect(&format!("mysql://{addr}/public")) + .connect(&format!("mysql://{addr}/{database}")) .await { Ok(mysql) => return mysql, @@ -109,6 +108,8 @@ async fn connect_mysql(addr: &str) -> Pool { } } +const FUZZ_TESTS_DATABASE: &str = "fuzz_tests"; + async fn execute_unstable_create_table( unstable_process_controller: Arc, rx: watch::Receiver, @@ -117,10 +118,20 @@ async fn execute_unstable_create_table( // Starts the unstable process. let moved_unstable_process_controller = unstable_process_controller.clone(); let handler = tokio::spawn(async move { moved_unstable_process_controller.start().await }); + let mysql_public = connect_mysql(DEFAULT_MYSQL_URL, "public").await; + loop { + let sql = format!("CREATE DATABASE IF NOT EXISTS {FUZZ_TESTS_DATABASE}"); + match sqlx::query(&sql).execute(&mysql_public).await { + Ok(result) => { + info!("Create database: {}, result: {result:?}", sql); + break; + } + Err(err) => warn!("Failed to create database: {}, error: {err}", sql), + } + } + let mysql = connect_mysql(DEFAULT_MYSQL_URL, FUZZ_TESTS_DATABASE).await; let mut rng = ChaChaRng::seed_from_u64(input.seed); - let mysql = connect_mysql(DEFAULT_MYSQL_URL).await; let ctx = FuzzContext { greptime: mysql }; - let mut table_states = HashMap::new(); for _ in 0..input.num { @@ -140,7 +151,7 @@ async fn execute_unstable_create_table( Ok(result) => { let state = *rx.borrow(); table_states.insert(table_name, state); - validate_columns(&ctx.greptime, &table_ctx).await; + validate_columns(&ctx.greptime, FUZZ_TESTS_DATABASE, &table_ctx).await; info!("Create table: {sql}, result: {result:?}"); } Err(err) => { @@ -163,13 +174,13 @@ async fn execute_unstable_create_table( } loop { - let sql = "DROP DATABASE IF EXISTS public"; - match sqlx::query(sql).execute(&ctx.greptime).await { + let sql = format!("DROP DATABASE IF EXISTS {FUZZ_TESTS_DATABASE}"); + match sqlx::query(&sql).execute(&mysql_public).await { Ok(result) => { - info!("Drop table: {}, result: {result:?}", sql); + info!("Drop database: {}, result: {result:?}", sql); break; } - Err(err) => warn!("Failed to drop table: {}, error: {err}", sql), + Err(err) => warn!("Failed to drop database: {}, error: {err}", sql), } } // Cleans up @@ -180,9 +191,9 @@ async fn execute_unstable_create_table( Ok(()) } -async fn validate_columns(client: &Pool, table_ctx: &TableContext) { +async fn validate_columns(client: &Pool, schema_name: &str, table_ctx: &TableContext) { loop { - match validator::column::fetch_columns(client, "public".into(), table_ctx.name.clone()) + match validator::column::fetch_columns(client, schema_name.into(), table_ctx.name.clone()) .await { Ok(mut column_entries) => { @@ -207,6 +218,8 @@ fuzz_target!(|input: FuzzInput| { let root_dir = variables.root_dir.unwrap_or(DEFAULT_ROOT_DIR.to_string()); create_dir_all(&root_dir).unwrap(); let output_config_path = format!("{root_dir}{DEFAULT_CONFIG_NAME}"); + let data_home = format!("{root_dir}datahome"); + let mut conf_path = get_conf_path(); conf_path.push(DEFAULT_TEMPLATE); let template_path = conf_path.to_str().unwrap().to_string(); @@ -216,15 +229,9 @@ fuzz_target!(|input: FuzzInput| { struct Context { data_home: String, } - write_config_file( - &template_path, - &Context { - data_home: DEFAULT_DATA_HOME.to_string(), - }, - &output_config_path, - ) - .await - .unwrap(); + write_config_file(&template_path, &Context { data_home }, &output_config_path) + .await + .unwrap(); let args = vec![ "standalone".to_string(),