From 048368fd876077ea2c34d53ebf92e409c111e31e Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 27 May 2024 18:26:50 +0900 Subject: [PATCH] feat: invoke `flush_table` and `compact_table` in fuzz tests (#4045) * feat: invoke `flush_table` and `compact_table` in fuzz tests * feat: support to flush and compact physical metric table * fix: avoid to create tables with the same name * feat: validate values after flushing or compacting table --- src/metric-engine/src/engine.rs | 34 +++- src/metric-engine/src/error.rs | 18 +- tests-fuzz/src/ir.rs | 7 +- tests-fuzz/src/utils.rs | 28 +++ tests-fuzz/targets/fuzz_insert.rs | 10 +- .../targets/fuzz_insert_logical_table.rs | 164 +++++++++++------- 6 files changed, 188 insertions(+), 73 deletions(-) diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 38cbc6e4ac4e..e5b4bf2faca3 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -34,6 +34,7 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_recordbatch::SendableRecordBatchStream; use mito2::engine::MitoEngine; +use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{ @@ -44,7 +45,7 @@ use store_api::storage::{RegionId, ScanRequest}; use self::state::MetricEngineState; use crate::data_region::DataRegion; -use crate::error::{Result, UnsupportedRegionRequestSnafu}; +use crate::error::{self, Result, UnsupportedRegionRequestSnafu}; use crate::metadata_region::MetadataRegion; use crate::utils; @@ -144,10 +145,33 @@ impl RegionEngine for MetricEngine { .alter_region(region_id, alter, &mut extension_return_value) .await } - RegionRequest::Delete(_) - | RegionRequest::Flush(_) - | RegionRequest::Compact(_) - | RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(), + RegionRequest::Flush(_) => { + if self.inner.is_physical_region(region_id) { + self.inner + .mito + .handle_request(region_id, request) + .await + .context(error::MitoFlushOperationSnafu) + .map(|response| response.affected_rows) + } else { + UnsupportedRegionRequestSnafu { request }.fail() + } + } + RegionRequest::Compact(_) => { + if self.inner.is_physical_region(region_id) { + self.inner + .mito + .handle_request(region_id, request) + .await + .context(error::MitoFlushOperationSnafu) + .map(|response| response.affected_rows) + } else { + UnsupportedRegionRequestSnafu { request }.fail() + } + } + RegionRequest::Delete(_) | RegionRequest::Truncate(_) => { + UnsupportedRegionRequestSnafu { request }.fail() + } RegionRequest::Catchup(ref req) => self.inner.catchup_region(region_id, *req).await, }; diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 340f4f19bcfa..72e6d7032e0c 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -121,6 +121,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Mito flush operation fails"))] + MitoFlushOperation { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Mito catchup operation fails"))] MitoCatchupOperation { source: BoxedError, @@ -128,6 +135,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Mito compact operation fails"))] + MitoCompactOperation { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to collect record batch stream"))] CollectRecordBatchStream { source: common_recordbatch::error::Error, @@ -275,7 +289,9 @@ impl ErrorExt for Error { | CloseMitoRegion { source, .. } | MitoReadOperation { source, .. } | MitoWriteOperation { source, .. } - | MitoCatchupOperation { source, .. } => source.status_code(), + | MitoCatchupOperation { source, .. } + | MitoFlushOperation { source, .. } + | MitoCompactOperation { source, .. } => source.status_code(), CollectRecordBatchStream { source, .. } => source.status_code(), diff --git a/tests-fuzz/src/ir.rs b/tests-fuzz/src/ir.rs index a8907de76d86..01d2cd430981 100644 --- a/tests-fuzz/src/ir.rs +++ b/tests-fuzz/src/ir.rs @@ -36,6 +36,7 @@ use rand::Rng; use serde::{Deserialize, Serialize}; use self::insert_expr::{RowValue, RowValues}; +use crate::context::TableContextRef; use crate::generator::Random; use crate::impl_random; use crate::ir::create_expr::ColumnOption; @@ -442,7 +443,7 @@ pub fn generate_columns( /// Replace Value::Default with the corresponding default value in the rows for comparison. pub fn replace_default( rows: &[RowValues], - create_expr: &CreateTableExpr, + table_ctx_ref: &TableContextRef, insert_expr: &InsertIntoExpr, ) -> Vec { let index_map: HashMap = insert_expr @@ -450,7 +451,7 @@ pub fn replace_default( .iter() .enumerate() .map(|(insert_idx, insert_column)| { - let create_idx = create_expr + let create_idx = table_ctx_ref .columns .iter() .position(|create_column| create_column.name == insert_column.name) @@ -464,7 +465,7 @@ pub fn replace_default( let mut new_row = Vec::new(); for (idx, value) in row.iter().enumerate() { if let RowValue::Default = value { - let column = &create_expr.columns[index_map[&idx]]; + let column = &table_ctx_ref.columns[index_map[&idx]]; new_row.push(RowValue::Value(column.default_value().unwrap().clone())); } else { new_row.push(value.clone()); diff --git a/tests-fuzz/src/utils.rs b/tests-fuzz/src/utils.rs index 9156067b253e..7dff25b5285d 100644 --- a/tests-fuzz/src/utils.rs +++ b/tests-fuzz/src/utils.rs @@ -20,9 +20,13 @@ pub mod process; use std::env; use common_telemetry::info; +use snafu::ResultExt; use sqlx::mysql::MySqlPoolOptions; use sqlx::{MySql, Pool}; +use crate::error::{self, Result}; +use crate::ir::Ident; + /// Database connections pub struct Connections { pub mysql: Option>, @@ -83,3 +87,27 @@ pub fn load_unstable_test_env_variables() -> UnstableTestVariables { root_dir, } } + +/// Flushes memtable to SST file. +pub async fn flush_memtable(e: &Pool, table_name: &Ident) -> Result<()> { + let sql = format!("SELECT flush_table(\"{}\")", table_name); + let result = sqlx::query(&sql) + .execute(e) + .await + .context(error::ExecuteQuerySnafu { sql })?; + info!("Flush table: {}\n\nResult: {result:?}\n\n", table_name); + + Ok(()) +} + +/// Triggers a compaction for table +pub async fn compact_table(e: &Pool, table_name: &Ident) -> Result<()> { + let sql = format!("SELECT compact_table(\"{}\")", table_name); + let result = sqlx::query(&sql) + .execute(e) + .await + .context(error::ExecuteQuerySnafu { sql })?; + info!("Compact table: {}\n\nResult: {result:?}\n\n", table_name); + + Ok(()) +} diff --git a/tests-fuzz/targets/fuzz_insert.rs b/tests-fuzz/targets/fuzz_insert.rs index 11d02ea63d5e..73baf5a39377 100644 --- a/tests-fuzz/targets/fuzz_insert.rs +++ b/tests-fuzz/targets/fuzz_insert.rs @@ -39,7 +39,7 @@ use tests_fuzz::ir::{ use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; use tests_fuzz::translator::DslTranslator; -use tests_fuzz::utils::{init_greptime_connections_via_env, Connections}; +use tests_fuzz::utils::{flush_memtable, init_greptime_connections_via_env, Connections}; use tests_fuzz::validator; struct FuzzContext { @@ -120,7 +120,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> { .context(error::ExecuteQuerySnafu { sql: &sql })?; let table_ctx = Arc::new(TableContext::from(&create_expr)); - let insert_expr = generate_insert_expr(input, &mut rng, table_ctx)?; + let insert_expr = generate_insert_expr(input, &mut rng, table_ctx.clone())?; let translator = InsertIntoExprTranslator; let sql = translator.translate(&insert_expr)?; let result = ctx @@ -141,6 +141,10 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> { } ); + if rng.gen_bool(0.5) { + flush_memtable(&ctx.greptime, &create_expr.table_name).await?; + } + // Validate inserted rows // The order of inserted rows are random, so we need to sort the inserted rows by primary keys and time index for comparison let primary_keys_names = create_expr @@ -178,7 +182,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> { column_list, create_expr.table_name, primary_keys_column_list ); let fetched_rows = validator::row::fetch_values(&ctx.greptime, select_sql.as_str()).await?; - let mut expected_rows = replace_default(&insert_expr.values_list, &create_expr, &insert_expr); + let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, &insert_expr); expected_rows.sort_by(|a, b| { let a_keys: Vec<_> = primary_keys_idxs_in_insert_expr .iter() diff --git a/tests-fuzz/targets/fuzz_insert_logical_table.rs b/tests-fuzz/targets/fuzz_insert_logical_table.rs index 0c66bafdc01d..fe7a25c6761d 100644 --- a/tests-fuzz/targets/fuzz_insert_logical_table.rs +++ b/tests-fuzz/targets/fuzz_insert_logical_table.rs @@ -14,6 +14,7 @@ #![no_main] +use std::collections::HashMap; use std::sync::Arc; use common_telemetry::info; @@ -40,9 +41,10 @@ use tests_fuzz::ir::{ use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; use tests_fuzz::translator::DslTranslator; -use tests_fuzz::utils::{init_greptime_connections_via_env, Connections}; +use tests_fuzz::utils::{ + compact_table, flush_memtable, init_greptime_connections_via_env, Connections, +}; use tests_fuzz::validator; - struct FuzzContext { greptime: Pool, } @@ -56,15 +58,15 @@ impl FuzzContext { #[derive(Copy, Clone, Debug)] struct FuzzInput { seed: u64, - rows: usize, + tables: usize, } impl Arbitrary<'_> for FuzzInput { fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result { let seed = u.int_in_range(u64::MIN..=u64::MAX)?; let mut rng = ChaChaRng::seed_from_u64(seed); - let rows = rng.gen_range(1..4096); - Ok(FuzzInput { rows, seed }) + let tables = rng.gen_range(1..256); + Ok(FuzzInput { tables, seed }) } } @@ -102,26 +104,26 @@ fn generate_create_logical_table_expr( } fn generate_insert_expr( - input: FuzzInput, + rows: usize, rng: &mut R, table_ctx: TableContextRef, ) -> Result { let insert_generator = InsertExprGeneratorBuilder::default() .omit_column_list(false) .table_ctx(table_ctx) - .rows(input.rows) + .rows(rows) .value_generator(Box::new(generate_random_value_for_mysql)) .build() .unwrap(); insert_generator.generate(rng) } -async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> { - info!("input: {input:?}"); - let mut rng = ChaChaRng::seed_from_u64(input.seed); - +async fn create_physical_table( + ctx: &FuzzContext, + rng: &mut R, +) -> Result { // Create a physical table and a logical table on top of it - let create_physical_table_expr = generate_create_physical_table_expr(&mut rng).unwrap(); + let create_physical_table_expr = generate_create_physical_table_expr(rng).unwrap(); let translator = CreateTableExprTranslator; let sql = translator.translate(&create_physical_table_expr)?; let result = sqlx::query(&sql) @@ -130,43 +132,17 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> { .context(error::ExecuteQuerySnafu { sql: &sql })?; info!("Create physical table: {sql}, result: {result:?}"); - let physical_table_ctx = Arc::new(TableContext::from(&create_physical_table_expr)); - - let create_logical_table_expr = - generate_create_logical_table_expr(physical_table_ctx, &mut rng).unwrap(); - let sql = translator.translate(&create_logical_table_expr)?; - let result = sqlx::query(&sql) - .execute(&ctx.greptime) - .await - .context(error::ExecuteQuerySnafu { sql: &sql })?; - info!("Create logical table: {sql}, result: {result:?}"); - - let logical_table_ctx = Arc::new(TableContext::from(&create_logical_table_expr)); - - let insert_expr = generate_insert_expr(input, &mut rng, logical_table_ctx)?; - let translator = InsertIntoExprTranslator; - let sql = translator.translate(&insert_expr)?; - let result = ctx - .greptime - // unprepared query, see - .execute(sql.as_str()) - .await - .context(error::ExecuteQuerySnafu { sql: &sql })?; - - ensure!( - result.rows_affected() == input.rows as u64, - error::AssertSnafu { - reason: format!( - "expected rows affected: {}, actual: {}", - input.rows, - result.rows_affected(), - ) - } - ); + Ok(Arc::new(TableContext::from(&create_physical_table_expr))) +} +async fn validate_values( + ctx: &FuzzContext, + logical_table_ctx: TableContextRef, + insert_expr: &InsertIntoExpr, +) -> Result<()> { // Validate inserted rows // The order of inserted rows are random, so we need to sort the inserted rows by primary keys and time index for comparison - let primary_keys_names = create_logical_table_expr + let primary_keys_names = logical_table_ctx .columns .iter() .filter(|c| c.is_primary_key() || c.is_time_index()) @@ -198,14 +174,11 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> { let select_sql = format!( "SELECT {} FROM {} ORDER BY {}", - column_list, create_logical_table_expr.table_name, primary_keys_column_list + column_list, logical_table_ctx.name, primary_keys_column_list ); let fetched_rows = validator::row::fetch_values(&ctx.greptime, select_sql.as_str()).await?; - let mut expected_rows = replace_default( - &insert_expr.values_list, - &create_logical_table_expr, - &insert_expr, - ); + let mut expected_rows = + replace_default(&insert_expr.values_list, &logical_table_ctx, insert_expr); expected_rows.sort_by(|a, b| { let a_keys: Vec<_> = primary_keys_idxs_in_insert_expr .iter() @@ -225,26 +198,95 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> { }); validator::row::assert_eq::(&insert_expr.columns, &fetched_rows, &expected_rows)?; - // Clean up logical table - let sql = format!("DROP TABLE {}", create_logical_table_expr.table_name); - let result = sqlx::query(&sql) - .execute(&ctx.greptime) + Ok(()) +} + +async fn insert_values( + ctx: &FuzzContext, + rng: &mut R, + logical_table_ctx: TableContextRef, +) -> Result { + let rows = rng.gen_range(1..2048); + let insert_expr = generate_insert_expr(rows, rng, logical_table_ctx.clone())?; + let translator = InsertIntoExprTranslator; + let sql = translator.translate(&insert_expr)?; + let result = ctx + .greptime + // unprepared query, see + .execute(sql.as_str()) .await .context(error::ExecuteQuerySnafu { sql: &sql })?; - info!( - "Drop table: {}, result: {result:?}", - create_logical_table_expr.table_name + + ensure!( + result.rows_affected() == rows as u64, + error::AssertSnafu { + reason: format!( + "expected rows affected: {}, actual: {}", + rows, + result.rows_affected(), + ) + } ); + Ok(insert_expr) +} + +async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> { + info!("input: {input:?}"); + let mut rng = ChaChaRng::seed_from_u64(input.seed); + let physical_table_ctx = create_physical_table(&ctx, &mut rng).await?; + + let mut tables = HashMap::with_capacity(input.tables); + + // Create logical tables + for _ in 0..input.tables { + let translator = CreateTableExprTranslator; + let create_logical_table_expr = + generate_create_logical_table_expr(physical_table_ctx.clone(), &mut rng).unwrap(); + if tables.contains_key(&create_logical_table_expr.table_name) { + // Ignores same name logical table. + continue; + } + let sql = translator.translate(&create_logical_table_expr)?; + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + info!("Create logical table: {sql}, result: {result:?}"); + let logical_table_ctx = Arc::new(TableContext::from(&create_logical_table_expr)); + + let insert_expr = insert_values(&ctx, &mut rng, logical_table_ctx.clone()).await?; + validate_values(&ctx, logical_table_ctx.clone(), &insert_expr).await?; + tables.insert(logical_table_ctx.name.clone(), logical_table_ctx.clone()); + if rng.gen_bool(0.1) { + flush_memtable(&ctx.greptime, &physical_table_ctx.name).await?; + validate_values(&ctx, logical_table_ctx.clone(), &insert_expr).await?; + } + if rng.gen_bool(0.1) { + compact_table(&ctx.greptime, &physical_table_ctx.name).await?; + validate_values(&ctx, logical_table_ctx.clone(), &insert_expr).await?; + } + } + + // Clean up logical table + for (table_name, _) in tables { + let sql = format!("DROP TABLE {}", table_name); + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + info!("Drop table: {}, result: {result:?}", table_name); + } + // Clean up physical table - let sql = format!("DROP TABLE {}", create_physical_table_expr.table_name); + let sql = format!("DROP TABLE {}", physical_table_ctx.name); let result = sqlx::query(&sql) .execute(&ctx.greptime) .await .context(error::ExecuteQuerySnafu { sql })?; info!( "Drop table: {}, result: {result:?}", - create_physical_table_expr.table_name + physical_table_ctx.name ); ctx.close().await;