Skip to content

Commit

Permalink
feat: invoke flush_table and compact_table in fuzz tests (#4045)
Browse files Browse the repository at this point in the history
* feat: invoke `flush_table` and `compact_table` in fuzz tests

* feat: support to flush and compact physical metric table

* fix: avoid to create tables with the same name

* feat: validate values after flushing or compacting table
  • Loading branch information
WenyXu authored May 27, 2024
1 parent f9db5ff commit 048368f
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 73 deletions.
34 changes: 29 additions & 5 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
Expand All @@ -44,7 +45,7 @@ use store_api::storage::{RegionId, ScanRequest};

use self::state::MetricEngineState;
use crate::data_region::DataRegion;
use crate::error::{Result, UnsupportedRegionRequestSnafu};
use crate::error::{self, Result, UnsupportedRegionRequestSnafu};
use crate::metadata_region::MetadataRegion;
use crate::utils;

Expand Down Expand Up @@ -144,10 +145,33 @@ impl RegionEngine for MetricEngine {
.alter_region(region_id, alter, &mut extension_return_value)
.await
}
RegionRequest::Delete(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
RegionRequest::Flush(_) => {
if self.inner.is_physical_region(region_id) {
self.inner
.mito
.handle_request(region_id, request)
.await
.context(error::MitoFlushOperationSnafu)
.map(|response| response.affected_rows)
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Compact(_) => {
if self.inner.is_physical_region(region_id) {
self.inner
.mito
.handle_request(region_id, request)
.await
.context(error::MitoFlushOperationSnafu)
.map(|response| response.affected_rows)
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Delete(_) | RegionRequest::Truncate(_) => {
UnsupportedRegionRequestSnafu { request }.fail()
}
RegionRequest::Catchup(ref req) => self.inner.catchup_region(region_id, *req).await,
};

Expand Down
18 changes: 17 additions & 1 deletion src/metric-engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,27 @@ pub enum Error {
location: Location,
},

#[snafu(display("Mito flush operation fails"))]
MitoFlushOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Mito catchup operation fails"))]
MitoCatchupOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Mito compact operation fails"))]
MitoCompactOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to collect record batch stream"))]
CollectRecordBatchStream {
source: common_recordbatch::error::Error,
Expand Down Expand Up @@ -275,7 +289,9 @@ impl ErrorExt for Error {
| CloseMitoRegion { source, .. }
| MitoReadOperation { source, .. }
| MitoWriteOperation { source, .. }
| MitoCatchupOperation { source, .. } => source.status_code(),
| MitoCatchupOperation { source, .. }
| MitoFlushOperation { source, .. }
| MitoCompactOperation { source, .. } => source.status_code(),

CollectRecordBatchStream { source, .. } => source.status_code(),

Expand Down
7 changes: 4 additions & 3 deletions tests-fuzz/src/ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use rand::Rng;
use serde::{Deserialize, Serialize};

use self::insert_expr::{RowValue, RowValues};
use crate::context::TableContextRef;
use crate::generator::Random;
use crate::impl_random;
use crate::ir::create_expr::ColumnOption;
Expand Down Expand Up @@ -442,15 +443,15 @@ pub fn generate_columns<R: Rng + 'static>(
/// Replace Value::Default with the corresponding default value in the rows for comparison.
pub fn replace_default(
rows: &[RowValues],
create_expr: &CreateTableExpr,
table_ctx_ref: &TableContextRef,
insert_expr: &InsertIntoExpr,
) -> Vec<RowValues> {
let index_map: HashMap<usize, usize> = insert_expr
.columns
.iter()
.enumerate()
.map(|(insert_idx, insert_column)| {
let create_idx = create_expr
let create_idx = table_ctx_ref
.columns
.iter()
.position(|create_column| create_column.name == insert_column.name)
Expand All @@ -464,7 +465,7 @@ pub fn replace_default(
let mut new_row = Vec::new();
for (idx, value) in row.iter().enumerate() {
if let RowValue::Default = value {
let column = &create_expr.columns[index_map[&idx]];
let column = &table_ctx_ref.columns[index_map[&idx]];
new_row.push(RowValue::Value(column.default_value().unwrap().clone()));
} else {
new_row.push(value.clone());
Expand Down
28 changes: 28 additions & 0 deletions tests-fuzz/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ pub mod process;
use std::env;

use common_telemetry::info;
use snafu::ResultExt;
use sqlx::mysql::MySqlPoolOptions;
use sqlx::{MySql, Pool};

use crate::error::{self, Result};
use crate::ir::Ident;

/// Database connections
pub struct Connections {
pub mysql: Option<Pool<MySql>>,
Expand Down Expand Up @@ -83,3 +87,27 @@ pub fn load_unstable_test_env_variables() -> UnstableTestVariables {
root_dir,
}
}

/// Flushes memtable to SST file.
pub async fn flush_memtable(e: &Pool<MySql>, table_name: &Ident) -> Result<()> {
let sql = format!("SELECT flush_table(\"{}\")", table_name);
let result = sqlx::query(&sql)
.execute(e)
.await
.context(error::ExecuteQuerySnafu { sql })?;
info!("Flush table: {}\n\nResult: {result:?}\n\n", table_name);

Ok(())
}

/// Triggers a compaction for table
pub async fn compact_table(e: &Pool<MySql>, table_name: &Ident) -> Result<()> {
let sql = format!("SELECT compact_table(\"{}\")", table_name);
let result = sqlx::query(&sql)
.execute(e)
.await
.context(error::ExecuteQuerySnafu { sql })?;
info!("Compact table: {}\n\nResult: {result:?}\n\n", table_name);

Ok(())
}
10 changes: 7 additions & 3 deletions tests-fuzz/targets/fuzz_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use tests_fuzz::ir::{
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::utils::{init_greptime_connections_via_env, Connections};
use tests_fuzz::utils::{flush_memtable, init_greptime_connections_via_env, Connections};
use tests_fuzz::validator;

struct FuzzContext {
Expand Down Expand Up @@ -120,7 +120,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
.context(error::ExecuteQuerySnafu { sql: &sql })?;

let table_ctx = Arc::new(TableContext::from(&create_expr));
let insert_expr = generate_insert_expr(input, &mut rng, table_ctx)?;
let insert_expr = generate_insert_expr(input, &mut rng, table_ctx.clone())?;
let translator = InsertIntoExprTranslator;
let sql = translator.translate(&insert_expr)?;
let result = ctx
Expand All @@ -141,6 +141,10 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
}
);

if rng.gen_bool(0.5) {
flush_memtable(&ctx.greptime, &create_expr.table_name).await?;
}

// Validate inserted rows
// The order of inserted rows are random, so we need to sort the inserted rows by primary keys and time index for comparison
let primary_keys_names = create_expr
Expand Down Expand Up @@ -178,7 +182,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
column_list, create_expr.table_name, primary_keys_column_list
);
let fetched_rows = validator::row::fetch_values(&ctx.greptime, select_sql.as_str()).await?;
let mut expected_rows = replace_default(&insert_expr.values_list, &create_expr, &insert_expr);
let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, &insert_expr);
expected_rows.sort_by(|a, b| {
let a_keys: Vec<_> = primary_keys_idxs_in_insert_expr
.iter()
Expand Down
Loading

0 comments on commit 048368f

Please sign in to comment.