Skip to content

Commit

Permalink
Merge branch 'main' into opendal-0460
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun authored May 27, 2024
2 parents a914641 + 20ce7d4 commit f9d5f78
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 25 deletions.
4 changes: 2 additions & 2 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

mod alter;
mod catchup;
mod close;
mod create;
mod drop;
Expand Down Expand Up @@ -147,8 +148,7 @@ impl RegionEngine for MetricEngine {
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
// It always Ok(0), all data is the latest.
RegionRequest::Catchup(_) => Ok(0),
RegionRequest::Catchup(ref req) => self.inner.catchup_region(region_id, *req).await,
};

result.map_err(BoxedError::new).map(|rows| RegionResponse {
Expand Down
61 changes: 61 additions & 0 deletions src/metric-engine/src/engine/catchup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use snafu::ResultExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCatchupRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::engine::MetricEngineInner;
use crate::error::{MitoCatchupOperationSnafu, Result, UnsupportedRegionRequestSnafu};
use crate::utils;

impl MetricEngineInner {
pub async fn catchup_region(
&self,
region_id: RegionId,
req: RegionCatchupRequest,
) -> Result<AffectedRows> {
if !self.is_physical_region(region_id) {
return UnsupportedRegionRequestSnafu {
request: RegionRequest::Catchup(req),
}
.fail();
}
let metadata_region_id = utils::to_metadata_region_id(region_id);
// TODO(weny): improve the catchup, we can read the wal entries only once.
self.mito
.handle_request(
metadata_region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: None,
}),
)
.await
.context(MitoCatchupOperationSnafu)?;

self.mito
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: req.entry_id,
}),
)
.await
.context(MitoCatchupOperationSnafu)
.map(|response| response.affected_rows)
}
}
10 changes: 9 additions & 1 deletion src/metric-engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Mito catchup operation fails"))]
MitoCatchupOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to collect record batch stream"))]
CollectRecordBatchStream {
source: common_recordbatch::error::Error,
Expand Down Expand Up @@ -267,7 +274,8 @@ impl ErrorExt for Error {
| OpenMitoRegion { source, .. }
| CloseMitoRegion { source, .. }
| MitoReadOperation { source, .. }
| MitoWriteOperation { source, .. } => source.status_code(),
| MitoWriteOperation { source, .. }
| MitoCatchupOperation { source, .. } => source.status_code(),

CollectRecordBatchStream { source, .. } => source.status_code(),

Expand Down
2 changes: 1 addition & 1 deletion src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ pub struct RegionTruncateRequest {}
///
/// Makes a readonly region to catch up to leader region changes.
/// There is no effect if it operating on a leader region.
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub struct RegionCatchupRequest {
/// Sets it to writable if it's available after it has caught up with all changes.
pub set_writable: bool,
Expand Down
49 changes: 28 additions & 21 deletions tests-fuzz/targets/unstable/fuzz_create_table_standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ impl Arbitrary<'_> for FuzzInput {
const DEFAULT_TEMPLATE: &str = "standalone.template.toml";
const DEFAULT_CONFIG_NAME: &str = "standalone.template.toml";
const DEFAULT_ROOT_DIR: &str = "/tmp/unstable_greptime/";
const DEFAULT_DATA_HOME: &str = "/tmp/unstable_greptime/datahome/";
const DEFAULT_MYSQL_URL: &str = "127.0.0.1:4002";
const DEFAULT_HTTP_HEALTH_URL: &str = "http://127.0.0.1:4000/health";

Expand All @@ -94,11 +93,11 @@ fn generate_create_table_expr<R: Rng + 'static>(rng: &mut R) -> CreateTableExpr
create_table_generator.generate(rng).unwrap()
}

async fn connect_mysql(addr: &str) -> Pool<MySql> {
async fn connect_mysql(addr: &str, database: &str) -> Pool<MySql> {
loop {
match MySqlPoolOptions::new()
.acquire_timeout(Duration::from_secs(30))
.connect(&format!("mysql://{addr}/public"))
.connect(&format!("mysql://{addr}/{database}"))
.await
{
Ok(mysql) => return mysql,
Expand All @@ -109,6 +108,8 @@ async fn connect_mysql(addr: &str) -> Pool<MySql> {
}
}

const FUZZ_TESTS_DATABASE: &str = "fuzz_tests";

async fn execute_unstable_create_table(
unstable_process_controller: Arc<UnstableProcessController>,
rx: watch::Receiver<ProcessState>,
Expand All @@ -117,10 +118,20 @@ async fn execute_unstable_create_table(
// Starts the unstable process.
let moved_unstable_process_controller = unstable_process_controller.clone();
let handler = tokio::spawn(async move { moved_unstable_process_controller.start().await });
let mysql_public = connect_mysql(DEFAULT_MYSQL_URL, "public").await;
loop {
let sql = format!("CREATE DATABASE IF NOT EXISTS {FUZZ_TESTS_DATABASE}");
match sqlx::query(&sql).execute(&mysql_public).await {
Ok(result) => {
info!("Create database: {}, result: {result:?}", sql);
break;
}
Err(err) => warn!("Failed to create database: {}, error: {err}", sql),
}
}
let mysql = connect_mysql(DEFAULT_MYSQL_URL, FUZZ_TESTS_DATABASE).await;
let mut rng = ChaChaRng::seed_from_u64(input.seed);
let mysql = connect_mysql(DEFAULT_MYSQL_URL).await;
let ctx = FuzzContext { greptime: mysql };

let mut table_states = HashMap::new();

for _ in 0..input.num {
Expand All @@ -140,7 +151,7 @@ async fn execute_unstable_create_table(
Ok(result) => {
let state = *rx.borrow();
table_states.insert(table_name, state);
validate_columns(&ctx.greptime, &table_ctx).await;
validate_columns(&ctx.greptime, FUZZ_TESTS_DATABASE, &table_ctx).await;
info!("Create table: {sql}, result: {result:?}");
}
Err(err) => {
Expand All @@ -163,13 +174,13 @@ async fn execute_unstable_create_table(
}

loop {
let sql = "DROP DATABASE IF EXISTS public";
match sqlx::query(sql).execute(&ctx.greptime).await {
let sql = format!("DROP DATABASE IF EXISTS {FUZZ_TESTS_DATABASE}");
match sqlx::query(&sql).execute(&mysql_public).await {
Ok(result) => {
info!("Drop table: {}, result: {result:?}", sql);
info!("Drop database: {}, result: {result:?}", sql);
break;
}
Err(err) => warn!("Failed to drop table: {}, error: {err}", sql),
Err(err) => warn!("Failed to drop database: {}, error: {err}", sql),
}
}
// Cleans up
Expand All @@ -180,9 +191,9 @@ async fn execute_unstable_create_table(
Ok(())
}

async fn validate_columns(client: &Pool<MySql>, table_ctx: &TableContext) {
async fn validate_columns(client: &Pool<MySql>, schema_name: &str, table_ctx: &TableContext) {
loop {
match validator::column::fetch_columns(client, "public".into(), table_ctx.name.clone())
match validator::column::fetch_columns(client, schema_name.into(), table_ctx.name.clone())
.await
{
Ok(mut column_entries) => {
Expand All @@ -207,6 +218,8 @@ fuzz_target!(|input: FuzzInput| {
let root_dir = variables.root_dir.unwrap_or(DEFAULT_ROOT_DIR.to_string());
create_dir_all(&root_dir).unwrap();
let output_config_path = format!("{root_dir}{DEFAULT_CONFIG_NAME}");
let data_home = format!("{root_dir}datahome");

let mut conf_path = get_conf_path();
conf_path.push(DEFAULT_TEMPLATE);
let template_path = conf_path.to_str().unwrap().to_string();
Expand All @@ -216,15 +229,9 @@ fuzz_target!(|input: FuzzInput| {
struct Context {
data_home: String,
}
write_config_file(
&template_path,
&Context {
data_home: DEFAULT_DATA_HOME.to_string(),
},
&output_config_path,
)
.await
.unwrap();
write_config_file(&template_path, &Context { data_home }, &output_config_path)
.await
.unwrap();

let args = vec![
"standalone".to_string(),
Expand Down

0 comments on commit f9d5f78

Please sign in to comment.