From d2f1bafd33918418c8e68727351a153ad5dde45f Mon Sep 17 00:00:00 2001
From: Noel Kwan <47273164+kwannoel@users.noreply.github.com>
Date: Mon, 8 Jan 2024 14:28:14 +0800
Subject: [PATCH] feat(sqlsmith): refactor test runners (#14405)
---
src/tests/simulation/src/main.rs | 15 +-
src/tests/sqlsmith/src/bin/main.rs | 2 +-
src/tests/sqlsmith/src/lib.rs | 2 +-
src/tests/sqlsmith/src/runner.rs | 681 ------------------
src/tests/sqlsmith/src/test_runners/README.md | 0
src/tests/sqlsmith/src/test_runners/diff.rs | 206 ++++++
.../sqlsmith/src/test_runners/fuzzing.rs | 180 +++++
src/tests/sqlsmith/src/test_runners/mod.rs | 26 +
src/tests/sqlsmith/src/test_runners/utils.rs | 353 +++++++++
9 files changed, 777 insertions(+), 688 deletions(-)
delete mode 100644 src/tests/sqlsmith/src/runner.rs
create mode 100644 src/tests/sqlsmith/src/test_runners/README.md
create mode 100644 src/tests/sqlsmith/src/test_runners/diff.rs
create mode 100644 src/tests/sqlsmith/src/test_runners/fuzzing.rs
create mode 100644 src/tests/sqlsmith/src/test_runners/mod.rs
create mode 100644 src/tests/sqlsmith/src/test_runners/utils.rs
diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs
index 4c2c1da7fa34..2d198239a735 100644
--- a/src/tests/simulation/src/main.rs
+++ b/src/tests/simulation/src/main.rs
@@ -201,7 +201,7 @@ async fn main() {
.await
.unwrap();
if let Some(outdir) = args.generate_sqlsmith_queries {
- risingwave_sqlsmith::runner::generate(
+ risingwave_sqlsmith::test_runners::generate(
rw.pg_client(),
&args.files,
count,
@@ -212,7 +212,7 @@ async fn main() {
return;
}
if args.run_differential_tests {
- risingwave_sqlsmith::runner::run_differential_testing(
+ risingwave_sqlsmith::test_runners::run_differential_testing(
rw.pg_client(),
&args.files,
count,
@@ -223,8 +223,13 @@ async fn main() {
return;
}
- risingwave_sqlsmith::runner::run(rw.pg_client(), &args.files, count, Some(seed))
- .await;
+ risingwave_sqlsmith::test_runners::run(
+ rw.pg_client(),
+ &args.files,
+ count,
+ Some(seed),
+ )
+ .await;
})
.await;
return;
@@ -237,7 +242,7 @@ async fn main() {
let rw = RisingWave::connect("frontend".into(), "dev".into())
.await
.unwrap();
- risingwave_sqlsmith::runner::run_pre_generated(rw.pg_client(), &outdir).await;
+ risingwave_sqlsmith::test_runners::run_pre_generated(rw.pg_client(), &outdir).await;
})
.await;
return;
diff --git a/src/tests/sqlsmith/src/bin/main.rs b/src/tests/sqlsmith/src/bin/main.rs
index 79df7f6932a3..6aaa1f60f150 100644
--- a/src/tests/sqlsmith/src/bin/main.rs
+++ b/src/tests/sqlsmith/src/bin/main.rs
@@ -21,7 +21,7 @@ use std::time::Duration;
use clap::Parser as ClapParser;
use risingwave_sqlsmith::print_function_table;
-use risingwave_sqlsmith::runner::{generate, run, run_differential_testing};
+use risingwave_sqlsmith::test_runners::{generate, run, run_differential_testing};
use tokio_postgres::NoTls;
#[derive(ClapParser, Debug, Clone)]
diff --git a/src/tests/sqlsmith/src/lib.rs b/src/tests/sqlsmith/src/lib.rs
index 23f6454bc9da..2d8c23a52b74 100644
--- a/src/tests/sqlsmith/src/lib.rs
+++ b/src/tests/sqlsmith/src/lib.rs
@@ -37,8 +37,8 @@ use risingwave_sqlparser::parser::Parser;
use crate::sql_gen::SqlGenerator;
pub mod reducer;
-pub mod runner;
mod sql_gen;
+pub mod test_runners;
mod utils;
pub mod validation;
pub use validation::is_permissible_error;
diff --git a/src/tests/sqlsmith/src/runner.rs b/src/tests/sqlsmith/src/runner.rs
deleted file mode 100644
index b095cf4e3e96..000000000000
--- a/src/tests/sqlsmith/src/runner.rs
+++ /dev/null
@@ -1,681 +0,0 @@
-// Copyright 2024 RisingWave Labs
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-//! Provides E2E Test runner functionality.
-
-use anyhow::{anyhow, bail};
-use itertools::Itertools;
-use rand::rngs::SmallRng;
-use rand::{Rng, SeedableRng};
-#[cfg(madsim)]
-use rand_chacha::ChaChaRng;
-use risingwave_sqlparser::ast::Statement;
-use similar::{ChangeTag, TextDiff};
-use tokio::time::{sleep, timeout, Duration};
-use tokio_postgres::error::Error as PgError;
-use tokio_postgres::{Client, SimpleQueryMessage};
-
-use crate::utils::read_file_contents;
-use crate::validation::{is_permissible_error, is_recovery_in_progress_error};
-use crate::{
- differential_sql_gen, generate_update_statements, insert_sql_gen, mview_sql_gen,
- parse_create_table_statements, parse_sql, session_sql_gen, sql_gen, Table,
-};
-
-type PgResult = std::result::Result;
-type Result = anyhow::Result;
-
-/// e2e test runner for pre-generated queries from sqlsmith
-pub async fn run_pre_generated(client: &Client, outdir: &str) {
- let timeout_duration = 12; // allow for some variance.
- let queries_path = format!("{}/queries.sql", outdir);
- let queries = read_file_contents(queries_path).unwrap();
- for statement in parse_sql(&queries) {
- let sql = statement.to_string();
- tracing::info!("[EXECUTING STATEMENT]: {}", sql);
- run_query(timeout_duration, client, &sql).await.unwrap();
- }
- tracing::info!("[EXECUTION SUCCESS]");
-}
-
-/// Query Generator
-/// If we encounter an expected error, just skip.
-/// If we encounter an unexpected error,
-/// Sqlsmith should stop execution, but writeout ddl and queries so far.
-/// If query takes too long -> cancel it, **mark it as error**.
-/// NOTE(noel): It will still fail if DDL creation fails.
-pub async fn generate(
- client: &Client,
- testdata: &str,
- count: usize,
- _outdir: &str,
- seed: Option,
-) {
- let timeout_duration = 5;
-
- set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
- set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
- tracing::info!("Set session variables");
-
- let mut rng = generate_rng(seed);
- let base_tables = create_base_tables(testdata, client).await.unwrap();
-
- let rows_per_table = 50;
- let max_rows_inserted = rows_per_table * base_tables.len();
- let inserts = populate_tables(client, &mut rng, base_tables.clone(), rows_per_table).await;
- tracing::info!("Populated base tables");
-
- let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client)
- .await
- .unwrap();
-
- // Generate an update for some inserts, on the corresponding table.
- update_base_tables(client, &mut rng, &base_tables, &inserts).await;
-
- test_sqlsmith(
- client,
- &mut rng,
- tables.clone(),
- base_tables.clone(),
- max_rows_inserted,
- )
- .await;
- tracing::info!("Passed sqlsmith tests");
-
- tracing::info!("Ran updates");
-
- let mut generated_queries = 0;
- for _ in 0..count {
- test_session_variable(client, &mut rng).await;
- let sql = sql_gen(&mut rng, tables.clone());
- tracing::info!("[EXECUTING TEST_BATCH]: {}", sql);
- let result = run_query(timeout_duration, client, sql.as_str()).await;
- match result {
- Err(_e) => {
- generated_queries += 1;
- tracing::info!("Generated {} batch queries", generated_queries);
- tracing::error!("Unrecoverable error encountered.");
- return;
- }
- Ok(0) => {
- generated_queries += 1;
- }
- _ => {}
- }
- }
- tracing::info!("Generated {} batch queries", generated_queries);
-
- let mut generated_queries = 0;
- for _ in 0..count {
- test_session_variable(client, &mut rng).await;
- let (sql, table) = mview_sql_gen(&mut rng, tables.clone(), "stream_query");
- tracing::info!("[EXECUTING TEST_STREAM]: {}", sql);
- let result = run_query(timeout_duration, client, sql.as_str()).await;
- match result {
- Err(_e) => {
- generated_queries += 1;
- tracing::info!("Generated {} stream queries", generated_queries);
- tracing::error!("Unrecoverable error encountered.");
- return;
- }
- Ok(0) => {
- generated_queries += 1;
- }
- _ => {}
- }
- tracing::info!("[EXECUTING DROP MVIEW]: {}", &format_drop_mview(&table));
- drop_mview_table(&table, client).await;
- }
- tracing::info!("Generated {} stream queries", generated_queries);
-
- drop_tables(&mviews, testdata, client).await;
-}
-
-/// e2e test runner for sqlsmith
-pub async fn run(client: &Client, testdata: &str, count: usize, seed: Option) {
- let mut rng = generate_rng(seed);
-
- set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
- set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
- tracing::info!("Set session variables");
-
- let base_tables = create_base_tables(testdata, client).await.unwrap();
-
- let rows_per_table = 50;
- let inserts = populate_tables(client, &mut rng, base_tables.clone(), rows_per_table).await;
- tracing::info!("Populated base tables");
-
- let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client)
- .await
- .unwrap();
- tracing::info!("Created tables");
-
- // Generate an update for some inserts, on the corresponding table.
- update_base_tables(client, &mut rng, &base_tables, &inserts).await;
- tracing::info!("Ran updates");
-
- let max_rows_inserted = rows_per_table * base_tables.len();
- test_sqlsmith(
- client,
- &mut rng,
- tables.clone(),
- base_tables.clone(),
- max_rows_inserted,
- )
- .await;
- tracing::info!("Passed sqlsmith tests");
-
- test_batch_queries(client, &mut rng, tables.clone(), count)
- .await
- .unwrap();
- tracing::info!("Passed batch queries");
- test_stream_queries(client, &mut rng, tables.clone(), count)
- .await
- .unwrap();
- tracing::info!("Passed stream queries");
-
- drop_tables(&mviews, testdata, client).await;
- tracing::info!("[EXECUTION SUCCESS]");
-}
-
-/// Differential testing for batch and stream
-pub async fn run_differential_testing(
- client: &Client,
- testdata: &str,
- count: usize,
- seed: Option,
-) -> Result<()> {
- let mut rng = generate_rng(seed);
-
- set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
- set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
- tracing::info!("Set session variables");
-
- let base_tables = create_base_tables(testdata, client).await.unwrap();
-
- let rows_per_table = 50;
- let inserts = populate_tables(client, &mut rng, base_tables.clone(), rows_per_table).await;
- tracing::info!("Populated base tables");
-
- let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client)
- .await
- .unwrap();
- tracing::info!("Created tables");
-
- // Generate an update for some inserts, on the corresponding table.
- update_base_tables(client, &mut rng, &base_tables, &inserts).await;
- tracing::info!("Ran updates");
-
- for i in 0..count {
- diff_stream_and_batch(&mut rng, tables.clone(), client, i).await?
- }
-
- drop_tables(&mviews, testdata, client).await;
- tracing::info!("[EXECUTION SUCCESS]");
- Ok(())
-}
-
-fn generate_rng(seed: Option) -> impl Rng {
- #[cfg(madsim)]
- if let Some(seed) = seed {
- ChaChaRng::seed_from_u64(seed)
- } else {
- ChaChaRng::from_rng(SmallRng::from_entropy()).unwrap()
- }
- #[cfg(not(madsim))]
- if let Some(seed) = seed {
- SmallRng::seed_from_u64(seed)
- } else {
- SmallRng::from_entropy()
- }
-}
-
-async fn update_base_tables(
- client: &Client,
- rng: &mut R,
- base_tables: &[Table],
- inserts: &[Statement],
-) {
- let update_statements = generate_update_statements(rng, base_tables, inserts).unwrap();
- for update_statement in update_statements {
- let sql = update_statement.to_string();
- tracing::info!("[EXECUTING UPDATES]: {}", &sql);
- client.simple_query(&sql).await.unwrap();
- }
-}
-
-async fn populate_tables(
- client: &Client,
- rng: &mut R,
- base_tables: Vec,
- row_count: usize,
-) -> Vec {
- let inserts = insert_sql_gen(rng, base_tables, row_count);
- for insert in &inserts {
- tracing::info!("[EXECUTING INSERT]: {}", insert);
- client.simple_query(insert).await.unwrap();
- }
- inserts
- .iter()
- .map(|s| parse_sql(s).into_iter().next().unwrap())
- .collect_vec()
-}
-
-/// Sanity checks for sqlsmith
-async fn test_sqlsmith(
- client: &Client,
- rng: &mut R,
- tables: Vec,
- base_tables: Vec,
- row_count: usize,
-) {
- // Test inserted rows should be at least 50% population count,
- // otherwise we don't have sufficient data in our system.
- // ENABLE: https://github.com/risingwavelabs/risingwave/issues/3844
- test_population_count(client, base_tables, row_count).await;
- tracing::info!("passed population count test");
-
- let threshold = 0.50; // permit at most 50% of queries to be skipped.
- let sample_size = 20;
-
- let skipped_percentage = test_batch_queries(client, rng, tables.clone(), sample_size)
- .await
- .unwrap();
- tracing::info!(
- "percentage of skipped batch queries = {}, threshold: {}",
- skipped_percentage,
- threshold
- );
- if skipped_percentage > threshold {
- panic!("skipped batch queries exceeded threshold.");
- }
-
- let skipped_percentage = test_stream_queries(client, rng, tables.clone(), sample_size)
- .await
- .unwrap();
- tracing::info!(
- "percentage of skipped stream queries = {}, threshold: {}",
- skipped_percentage,
- threshold
- );
- if skipped_percentage > threshold {
- panic!("skipped stream queries exceeded threshold.");
- }
-}
-
-async fn set_variable(client: &Client, variable: &str, value: &str) -> String {
- let s = format!("SET {variable} TO {value}");
- tracing::info!("[EXECUTING SET_VAR]: {}", s);
- client.simple_query(&s).await.unwrap();
- s
-}
-
-async fn test_session_variable(client: &Client, rng: &mut R) -> String {
- let session_sql = session_sql_gen(rng);
- tracing::info!("[EXECUTING TEST SESSION_VAR]: {}", session_sql);
- client.simple_query(session_sql.as_str()).await.unwrap();
- session_sql
-}
-
-/// Expects at least 50% of inserted rows included.
-async fn test_population_count(client: &Client, base_tables: Vec, expected_count: usize) {
- let mut actual_count = 0;
- for t in base_tables {
- let q = format!("select * from {};", t.name);
- let rows = client.simple_query(&q).await.unwrap();
- actual_count += rows.len();
- }
- if actual_count < expected_count / 2 {
- panic!(
- "expected at least 50% rows included.\
- Total {} rows, only had {} rows",
- expected_count, actual_count,
- )
- }
-}
-
-/// Test batch queries, returns skipped query statistics
-/// Runs in distributed mode, since queries can be complex and cause overflow in local execution
-/// mode.
-async fn test_batch_queries(
- client: &Client,
- rng: &mut R,
- tables: Vec,
- sample_size: usize,
-) -> Result {
- let mut skipped = 0;
- for _ in 0..sample_size {
- test_session_variable(client, rng).await;
- let sql = sql_gen(rng, tables.clone());
- tracing::info!("[TEST BATCH]: {}", sql);
- skipped += run_query(30, client, &sql).await?;
- }
- Ok(skipped as f64 / sample_size as f64)
-}
-
-/// Test stream queries, returns skipped query statistics
-async fn test_stream_queries(
- client: &Client,
- rng: &mut R,
- tables: Vec,
- sample_size: usize,
-) -> Result {
- let mut skipped = 0;
-
- for _ in 0..sample_size {
- test_session_variable(client, rng).await;
- let (sql, table) = mview_sql_gen(rng, tables.clone(), "stream_query");
- tracing::info!("[TEST STREAM]: {}", sql);
- skipped += run_query(12, client, &sql).await?;
- tracing::info!("[TEST DROP MVIEW]: {}", &format_drop_mview(&table));
- drop_mview_table(&table, client).await;
- }
- Ok(skipped as f64 / sample_size as f64)
-}
-
-fn get_seed_table_sql(testdata: &str) -> String {
- let seed_files = ["tpch.sql", "nexmark.sql", "alltypes.sql"];
- seed_files
- .iter()
- .map(|filename| read_file_contents(format!("{}/{}", testdata, filename)).unwrap())
- .collect::()
-}
-
-/// Create the tables defined in testdata, along with some mviews.
-/// TODO: Generate indexes and sinks.
-async fn create_base_tables(testdata: &str, client: &Client) -> Result> {
- tracing::info!("Preparing tables...");
-
- let sql = get_seed_table_sql(testdata);
- let (base_tables, statements) = parse_create_table_statements(sql);
- let mut mvs_and_base_tables = vec![];
- mvs_and_base_tables.extend_from_slice(&base_tables);
-
- for stmt in &statements {
- let create_sql = stmt.to_string();
- tracing::info!("[EXECUTING CREATE TABLE]: {}", &create_sql);
- client.simple_query(&create_sql).await.unwrap();
- }
-
- Ok(base_tables)
-}
-
-/// Create the tables defined in testdata, along with some mviews.
-/// TODO: Generate indexes and sinks.
-async fn create_mviews(
- rng: &mut impl Rng,
- mvs_and_base_tables: Vec,
- client: &Client,
-) -> Result<(Vec, Vec)> {
- let mut mvs_and_base_tables = mvs_and_base_tables;
- let mut mviews = vec![];
- // Generate some mviews
- for i in 0..20 {
- let (create_sql, table) =
- mview_sql_gen(rng, mvs_and_base_tables.clone(), &format!("m{}", i));
- tracing::info!("[EXECUTING CREATE MVIEW]: {}", &create_sql);
- let skip_count = run_query(6, client, &create_sql).await?;
- if skip_count == 0 {
- mvs_and_base_tables.push(table.clone());
- mviews.push(table);
- }
- }
- Ok((mvs_and_base_tables, mviews))
-}
-
-fn format_drop_mview(mview: &Table) -> String {
- format!("DROP MATERIALIZED VIEW IF EXISTS {}", mview.name)
-}
-
-/// Drops mview tables.
-async fn drop_mview_table(mview: &Table, client: &Client) {
- client
- .simple_query(&format_drop_mview(mview))
- .await
- .unwrap();
-}
-
-/// Drops mview tables and seed tables
-async fn drop_tables(mviews: &[Table], testdata: &str, client: &Client) {
- tracing::info!("Cleaning tables...");
-
- for mview in mviews.iter().rev() {
- drop_mview_table(mview, client).await;
- }
-
- let seed_files = ["drop_tpch.sql", "drop_nexmark.sql", "drop_alltypes.sql"];
- let sql = seed_files
- .iter()
- .map(|filename| read_file_contents(format!("{}/{}", testdata, filename)).unwrap())
- .collect::();
-
- for stmt in sql.lines() {
- client.simple_query(stmt).await.unwrap();
- }
-}
-
-/// Validate client responses, returning a count of skipped queries, number of result rows.
-fn validate_response(
- response: PgResult>,
-) -> Result<(i64, Vec)> {
- match response {
- Ok(rows) => Ok((0, rows)),
- Err(e) => {
- // Permit runtime errors conservatively.
- if let Some(e) = e.as_db_error()
- && is_permissible_error(&e.to_string())
- {
- tracing::info!("[SKIPPED ERROR]: {:#?}", e);
- return Ok((1, vec![]));
- }
- // consolidate error reason for deterministic test
- tracing::info!("[UNEXPECTED ERROR]: {:#?}", e);
- Err(anyhow!("Encountered unexpected error: {e}"))
- }
- }
-}
-
-async fn run_query(timeout_duration: u64, client: &Client, query: &str) -> Result {
- let (skipped_count, _) = run_query_inner(timeout_duration, client, query).await?;
- Ok(skipped_count)
-}
-/// Run query, handle permissible errors
-/// For recovery error, just do bounded retry.
-/// For other errors, validate them accordingly, skipping if they are permitted.
-/// Otherwise just return success.
-/// If takes too long return the query which timed out + execution time + timeout error
-/// Returns: Number of skipped queries, number of rows returned.
-async fn run_query_inner(
- timeout_duration: u64,
- client: &Client,
- query: &str,
-) -> Result<(i64, Vec)> {
- let query_task = client.simple_query(query);
- let result = timeout(Duration::from_secs(timeout_duration), query_task).await;
- let response = match result {
- Ok(r) => r,
- Err(_) => bail!(
- "[UNEXPECTED ERROR] Query timeout after {timeout_duration}s:\n{:?}",
- query
- ),
- };
- if let Err(e) = &response
- && let Some(e) = e.as_db_error()
- {
- if is_recovery_in_progress_error(&e.to_string()) {
- let tries = 5;
- let interval = 1;
- for _ in 0..tries {
- // retry 5 times
- sleep(Duration::from_secs(interval)).await;
- let query_task = client.simple_query(query);
- let response = timeout(Duration::from_secs(timeout_duration), query_task).await;
- match response {
- Ok(Ok(r)) => {
- return Ok((0, r));
- }
- Err(_) => bail!(
- "[UNEXPECTED ERROR] Query timeout after {timeout_duration}s:\n{:?}",
- query
- ),
- _ => {}
- }
- }
- bail!("[UNEXPECTED ERROR] Failed to recover after {tries} tries with interval {interval}s")
- } else {
- return validate_response(response);
- }
- }
- let rows = response?;
- Ok((0, rows))
-}
-
-/// Create the tables defined in testdata, along with some mviews.
-/// Just test number of rows for now.
-/// TODO(kwannoel): Test row contents as well. That requires us to run a batch query
-/// with `select * ORDER BY `.
-async fn diff_stream_and_batch(
- rng: &mut impl Rng,
- mvs_and_base_tables: Vec,
- client: &Client,
- i: usize,
-) -> Result<()> {
- // Generate some mviews
- let mview_name = format!("stream_{}", i);
- let (batch, stream, table) = differential_sql_gen(rng, mvs_and_base_tables, &mview_name)?;
- diff_stream_and_batch_with_sqls(client, i, &batch, &stream, &mview_name, &table).await
-}
-
-async fn diff_stream_and_batch_with_sqls(
- client: &Client,
- i: usize,
- batch: &str,
- stream: &str,
- mview_name: &str,
- table: &Table,
-) -> Result<()> {
- tracing::info!("[RUN CREATE MVIEW id={}]: {}", i, stream);
- let skip_count = run_query(12, client, stream).await?;
- if skip_count > 0 {
- tracing::info!("[RUN DROP MVIEW id={}]: {}", i, &format_drop_mview(table));
- drop_mview_table(table, client).await;
- return Ok(());
- }
-
- let select = format!("SELECT * FROM {}", &mview_name);
- tracing::info!("[RUN SELECT * FROM MVIEW id={}]: {}", i, select);
- let (skip_count, stream_result) = run_query_inner(12, client, &select).await?;
- if skip_count > 0 {
- bail!("SQL should not fail: {:?}", select)
- }
-
- tracing::info!("[RUN - BATCH QUERY id={}]: {}", i, &batch);
- let (skip_count, batch_result) = run_query_inner(12, client, batch).await?;
- if skip_count > 0 {
- tracing::info!(
- "[DIFF - DROP MVIEW id={}]: {}",
- i,
- &format_drop_mview(table)
- );
- drop_mview_table(table, client).await;
- return Ok(());
- }
- let n_stream_rows = stream_result.len();
- let n_batch_rows = batch_result.len();
- let formatted_stream_rows = format_rows(&batch_result);
- let formatted_batch_rows = format_rows(&stream_result);
- tracing::debug!(
- "[COMPARE - STREAM_FORMATTED_ROW id={}]: {formatted_stream_rows}",
- i,
- );
- tracing::debug!(
- "[COMPARE - BATCH_FORMATTED_ROW id={}]: {formatted_batch_rows}",
- i,
- );
-
- let diff = TextDiff::from_lines(&formatted_batch_rows, &formatted_stream_rows);
-
- let diff: String = diff
- .iter_all_changes()
- .filter_map(|change| match change.tag() {
- ChangeTag::Delete => Some(format!("-{}", change)),
- ChangeTag::Insert => Some(format!("+{}", change)),
- ChangeTag::Equal => None,
- })
- .collect();
-
- if diff.is_empty() {
- tracing::info!("[RUN DROP MVIEW id={}]: {}", i, format_drop_mview(table));
- tracing::info!("[PASSED DIFF id={}, rows_compared={n_stream_rows}]", i);
-
- drop_mview_table(table, client).await;
- Ok(())
- } else {
- bail!(
- "
-Different results for batch and stream:
-
-BATCH SQL:
-{batch}
-
-STREAM SQL:
-{stream}
-
-SELECT FROM STREAM SQL:
-{select}
-
-BATCH_ROW_LEN:
-{n_batch_rows}
-
-STREAM_ROW_LEN:
-{n_stream_rows}
-
-BATCH_ROWS:
-{formatted_batch_rows}
-
-STREAM_ROWS:
-{formatted_stream_rows}
-
-ROW DIFF (+/-):
-{diff}
-",
- )
- }
-}
-
-/// Format + sort rows so they can be diffed.
-fn format_rows(rows: &[SimpleQueryMessage]) -> String {
- rows.iter()
- .filter_map(|r| match r {
- SimpleQueryMessage::Row(row) => {
- let n_cols = row.columns().len();
- let formatted_row: String = (0..n_cols)
- .map(|i| {
- format!(
- "{:#?}",
- match row.get(i) {
- Some(s) => s,
- _ => "NULL",
- }
- )
- })
- .join(", ");
- Some(formatted_row)
- }
- SimpleQueryMessage::CommandComplete(_n_rows) => None,
- _ => unreachable!(),
- })
- .sorted()
- .join("\n")
-}
diff --git a/src/tests/sqlsmith/src/test_runners/README.md b/src/tests/sqlsmith/src/test_runners/README.md
new file mode 100644
index 000000000000..e69de29bb2d1
diff --git a/src/tests/sqlsmith/src/test_runners/diff.rs b/src/tests/sqlsmith/src/test_runners/diff.rs
new file mode 100644
index 000000000000..f05c9f3521d7
--- /dev/null
+++ b/src/tests/sqlsmith/src/test_runners/diff.rs
@@ -0,0 +1,206 @@
+// Copyright 2024 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Provides E2E Test runner functionality.
+
+use anyhow::bail;
+use itertools::Itertools;
+use rand::Rng;
+#[cfg(madsim)]
+use rand_chacha::ChaChaRng;
+use similar::{ChangeTag, TextDiff};
+use tokio_postgres::{Client, SimpleQueryMessage};
+
+use crate::test_runners::utils::{
+ create_base_tables, create_mviews, drop_mview_table, drop_tables, format_drop_mview,
+ generate_rng, populate_tables, run_query, run_query_inner, set_variable, update_base_tables,
+ Result,
+};
+use crate::{differential_sql_gen, Table};
+
+/// Differential testing for batch and stream
+pub async fn run_differential_testing(
+ client: &Client,
+ testdata: &str,
+ count: usize,
+ seed: Option,
+) -> Result<()> {
+ let mut rng = generate_rng(seed);
+
+ set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
+ set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
+ tracing::info!("Set session variables");
+
+ let base_tables = create_base_tables(testdata, client).await.unwrap();
+
+ let rows_per_table = 50;
+ let inserts = populate_tables(client, &mut rng, base_tables.clone(), rows_per_table).await;
+ tracing::info!("Populated base tables");
+
+ let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client)
+ .await
+ .unwrap();
+ tracing::info!("Created tables");
+
+ // Generate an update for some inserts, on the corresponding table.
+ update_base_tables(client, &mut rng, &base_tables, &inserts).await;
+ tracing::info!("Ran updates");
+
+ for i in 0..count {
+ diff_stream_and_batch(&mut rng, tables.clone(), client, i).await?
+ }
+
+ drop_tables(&mviews, testdata, client).await;
+ tracing::info!("[EXECUTION SUCCESS]");
+ Ok(())
+}
+
+/// Create the tables defined in testdata, along with some mviews.
+/// Just test number of rows for now.
+/// TODO(kwannoel): Test row contents as well. That requires us to run a batch query
+/// with `select * ORDER BY `.
+async fn diff_stream_and_batch(
+ rng: &mut impl Rng,
+ mvs_and_base_tables: Vec,
+ client: &Client,
+ i: usize,
+) -> Result<()> {
+ // Generate some mviews
+ let mview_name = format!("stream_{}", i);
+ let (batch, stream, table) = differential_sql_gen(rng, mvs_and_base_tables, &mview_name)?;
+ diff_stream_and_batch_with_sqls(client, i, &batch, &stream, &mview_name, &table).await
+}
+
+async fn diff_stream_and_batch_with_sqls(
+ client: &Client,
+ i: usize,
+ batch: &str,
+ stream: &str,
+ mview_name: &str,
+ table: &Table,
+) -> Result<()> {
+ tracing::info!("[RUN CREATE MVIEW id={}]: {}", i, stream);
+ let skip_count = run_query(12, client, stream).await?;
+ if skip_count > 0 {
+ tracing::info!("[RUN DROP MVIEW id={}]: {}", i, &format_drop_mview(table));
+ drop_mview_table(table, client).await;
+ return Ok(());
+ }
+
+ let select = format!("SELECT * FROM {}", &mview_name);
+ tracing::info!("[RUN SELECT * FROM MVIEW id={}]: {}", i, select);
+ let (skip_count, stream_result) = run_query_inner(12, client, &select).await?;
+ if skip_count > 0 {
+ bail!("SQL should not fail: {:?}", select)
+ }
+
+ tracing::info!("[RUN - BATCH QUERY id={}]: {}", i, &batch);
+ let (skip_count, batch_result) = run_query_inner(12, client, batch).await?;
+ if skip_count > 0 {
+ tracing::info!(
+ "[DIFF - DROP MVIEW id={}]: {}",
+ i,
+ &format_drop_mview(table)
+ );
+ drop_mview_table(table, client).await;
+ return Ok(());
+ }
+ let n_stream_rows = stream_result.len();
+ let n_batch_rows = batch_result.len();
+ let formatted_stream_rows = format_rows(&batch_result);
+ let formatted_batch_rows = format_rows(&stream_result);
+ tracing::debug!(
+ "[COMPARE - STREAM_FORMATTED_ROW id={}]: {formatted_stream_rows}",
+ i,
+ );
+ tracing::debug!(
+ "[COMPARE - BATCH_FORMATTED_ROW id={}]: {formatted_batch_rows}",
+ i,
+ );
+
+ let diff = TextDiff::from_lines(&formatted_batch_rows, &formatted_stream_rows);
+
+ let diff: String = diff
+ .iter_all_changes()
+ .filter_map(|change| match change.tag() {
+ ChangeTag::Delete => Some(format!("-{}", change)),
+ ChangeTag::Insert => Some(format!("+{}", change)),
+ ChangeTag::Equal => None,
+ })
+ .collect();
+
+ if diff.is_empty() {
+ tracing::info!("[RUN DROP MVIEW id={}]: {}", i, format_drop_mview(table));
+ tracing::info!("[PASSED DIFF id={}, rows_compared={n_stream_rows}]", i);
+
+ drop_mview_table(table, client).await;
+ Ok(())
+ } else {
+ bail!(
+ "
+Different results for batch and stream:
+
+BATCH SQL:
+{batch}
+
+STREAM SQL:
+{stream}
+
+SELECT FROM STREAM SQL:
+{select}
+
+BATCH_ROW_LEN:
+{n_batch_rows}
+
+STREAM_ROW_LEN:
+{n_stream_rows}
+
+BATCH_ROWS:
+{formatted_batch_rows}
+
+STREAM_ROWS:
+{formatted_stream_rows}
+
+ROW DIFF (+/-):
+{diff}
+",
+ )
+ }
+}
+
+/// Format + sort rows so they can be diffed.
+fn format_rows(rows: &[SimpleQueryMessage]) -> String {
+ rows.iter()
+ .filter_map(|r| match r {
+ SimpleQueryMessage::Row(row) => {
+ let n_cols = row.columns().len();
+ let formatted_row: String = (0..n_cols)
+ .map(|i| {
+ format!(
+ "{:#?}",
+ match row.get(i) {
+ Some(s) => s,
+ _ => "NULL",
+ }
+ )
+ })
+ .join(", ");
+ Some(formatted_row)
+ }
+ SimpleQueryMessage::CommandComplete(_n_rows) => None,
+ _ => unreachable!(),
+ })
+ .sorted()
+ .join("\n")
+}
diff --git a/src/tests/sqlsmith/src/test_runners/fuzzing.rs b/src/tests/sqlsmith/src/test_runners/fuzzing.rs
new file mode 100644
index 000000000000..b29a78b4b07d
--- /dev/null
+++ b/src/tests/sqlsmith/src/test_runners/fuzzing.rs
@@ -0,0 +1,180 @@
+// Copyright 2024 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Provides E2E Test runner functionality.
+
+#[cfg(madsim)]
+use rand_chacha::ChaChaRng;
+use tokio_postgres::Client;
+
+use crate::test_runners::utils::{
+ create_base_tables, create_mviews, drop_mview_table, drop_tables, format_drop_mview,
+ generate_rng, populate_tables, run_query, set_variable, test_batch_queries,
+ test_session_variable, test_sqlsmith, test_stream_queries, update_base_tables,
+};
+use crate::utils::read_file_contents;
+use crate::{mview_sql_gen, parse_sql, sql_gen};
+
+/// e2e test runner for pre-generated queries from sqlsmith
+pub async fn run_pre_generated(client: &Client, outdir: &str) {
+ let timeout_duration = 12; // allow for some variance.
+ let queries_path = format!("{}/queries.sql", outdir);
+ let queries = read_file_contents(queries_path).unwrap();
+ for statement in parse_sql(&queries) {
+ let sql = statement.to_string();
+ tracing::info!("[EXECUTING STATEMENT]: {}", sql);
+ run_query(timeout_duration, client, &sql).await.unwrap();
+ }
+ tracing::info!("[EXECUTION SUCCESS]");
+}
+
+/// Query Generator
+/// If we encounter an expected error, just skip.
+/// If we encounter an unexpected error,
+/// Sqlsmith should stop execution, but writeout ddl and queries so far.
+/// If query takes too long -> cancel it, **mark it as error**.
+/// NOTE(noel): It will still fail if DDL creation fails.
+pub async fn generate(
+ client: &Client,
+ testdata: &str,
+ count: usize,
+ _outdir: &str,
+ seed: Option,
+) {
+ let timeout_duration = 5;
+
+ set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
+ set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
+ tracing::info!("Set session variables");
+
+ let mut rng = generate_rng(seed);
+ let base_tables = create_base_tables(testdata, client).await.unwrap();
+
+ let rows_per_table = 50;
+ let max_rows_inserted = rows_per_table * base_tables.len();
+ let inserts = populate_tables(client, &mut rng, base_tables.clone(), rows_per_table).await;
+ tracing::info!("Populated base tables");
+
+ let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client)
+ .await
+ .unwrap();
+
+ // Generate an update for some inserts, on the corresponding table.
+ update_base_tables(client, &mut rng, &base_tables, &inserts).await;
+
+ test_sqlsmith(
+ client,
+ &mut rng,
+ tables.clone(),
+ base_tables.clone(),
+ max_rows_inserted,
+ )
+ .await;
+ tracing::info!("Passed sqlsmith tests");
+
+ tracing::info!("Ran updates");
+
+ let mut generated_queries = 0;
+ for _ in 0..count {
+ test_session_variable(client, &mut rng).await;
+ let sql = sql_gen(&mut rng, tables.clone());
+ tracing::info!("[EXECUTING TEST_BATCH]: {}", sql);
+ let result = run_query(timeout_duration, client, sql.as_str()).await;
+ match result {
+ Err(_e) => {
+ generated_queries += 1;
+ tracing::info!("Generated {} batch queries", generated_queries);
+ tracing::error!("Unrecoverable error encountered.");
+ return;
+ }
+ Ok(0) => {
+ generated_queries += 1;
+ }
+ _ => {}
+ }
+ }
+ tracing::info!("Generated {} batch queries", generated_queries);
+
+ let mut generated_queries = 0;
+ for _ in 0..count {
+ test_session_variable(client, &mut rng).await;
+ let (sql, table) = mview_sql_gen(&mut rng, tables.clone(), "stream_query");
+ tracing::info!("[EXECUTING TEST_STREAM]: {}", sql);
+ let result = run_query(timeout_duration, client, sql.as_str()).await;
+ match result {
+ Err(_e) => {
+ generated_queries += 1;
+ tracing::info!("Generated {} stream queries", generated_queries);
+ tracing::error!("Unrecoverable error encountered.");
+ return;
+ }
+ Ok(0) => {
+ generated_queries += 1;
+ }
+ _ => {}
+ }
+ tracing::info!("[EXECUTING DROP MVIEW]: {}", &format_drop_mview(&table));
+ drop_mview_table(&table, client).await;
+ }
+ tracing::info!("Generated {} stream queries", generated_queries);
+
+ drop_tables(&mviews, testdata, client).await;
+}
+
+/// e2e test runner for sqlsmith
+pub async fn run(client: &Client, testdata: &str, count: usize, seed: Option) {
+ let mut rng = generate_rng(seed);
+
+ set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
+ set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
+ tracing::info!("Set session variables");
+
+ let base_tables = create_base_tables(testdata, client).await.unwrap();
+
+ let rows_per_table = 50;
+ let inserts = populate_tables(client, &mut rng, base_tables.clone(), rows_per_table).await;
+ tracing::info!("Populated base tables");
+
+ let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client)
+ .await
+ .unwrap();
+ tracing::info!("Created tables");
+
+ // Generate an update for some inserts, on the corresponding table.
+ update_base_tables(client, &mut rng, &base_tables, &inserts).await;
+ tracing::info!("Ran updates");
+
+ let max_rows_inserted = rows_per_table * base_tables.len();
+ test_sqlsmith(
+ client,
+ &mut rng,
+ tables.clone(),
+ base_tables.clone(),
+ max_rows_inserted,
+ )
+ .await;
+ tracing::info!("Passed sqlsmith tests");
+
+ test_batch_queries(client, &mut rng, tables.clone(), count)
+ .await
+ .unwrap();
+ tracing::info!("Passed batch queries");
+ test_stream_queries(client, &mut rng, tables.clone(), count)
+ .await
+ .unwrap();
+ tracing::info!("Passed stream queries");
+
+ drop_tables(&mviews, testdata, client).await;
+ tracing::info!("[EXECUTION SUCCESS]");
+}
diff --git a/src/tests/sqlsmith/src/test_runners/mod.rs b/src/tests/sqlsmith/src/test_runners/mod.rs
new file mode 100644
index 000000000000..9347f070b58e
--- /dev/null
+++ b/src/tests/sqlsmith/src/test_runners/mod.rs
@@ -0,0 +1,26 @@
+// Copyright 2024 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Contains test runners:
+//! - fuzzing: For crashing testing the database with generated batch, stream queries.
+//! - differential testing: For testing the database with generated batch,
+//! stream queries and comparing their results.
+
+mod diff;
+mod fuzzing;
+
+mod utils;
+
+pub use diff::run_differential_testing;
+pub use fuzzing::{generate, run, run_pre_generated};
diff --git a/src/tests/sqlsmith/src/test_runners/utils.rs b/src/tests/sqlsmith/src/test_runners/utils.rs
new file mode 100644
index 000000000000..98f29df49044
--- /dev/null
+++ b/src/tests/sqlsmith/src/test_runners/utils.rs
@@ -0,0 +1,353 @@
+// Copyright 2024 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use anyhow::{anyhow, bail};
+use itertools::Itertools;
+use rand::rngs::SmallRng;
+use rand::{Rng, SeedableRng};
+#[cfg(madsim)]
+use rand_chacha::ChaChaRng;
+use risingwave_sqlparser::ast::Statement;
+use tokio::time::{sleep, timeout, Duration};
+use tokio_postgres::error::Error as PgError;
+use tokio_postgres::{Client, SimpleQueryMessage};
+
+use crate::utils::read_file_contents;
+use crate::validation::{is_permissible_error, is_recovery_in_progress_error};
+use crate::{
+ generate_update_statements, insert_sql_gen, mview_sql_gen, parse_create_table_statements,
+ parse_sql, session_sql_gen, sql_gen, Table,
+};
+
+pub(super) type PgResult = std::result::Result;
+pub(super) type Result = anyhow::Result;
+
+pub(super) async fn update_base_tables(
+ client: &Client,
+ rng: &mut R,
+ base_tables: &[Table],
+ inserts: &[Statement],
+) {
+ let update_statements = generate_update_statements(rng, base_tables, inserts).unwrap();
+ for update_statement in update_statements {
+ let sql = update_statement.to_string();
+ tracing::info!("[EXECUTING UPDATES]: {}", &sql);
+ client.simple_query(&sql).await.unwrap();
+ }
+}
+
+pub(super) async fn populate_tables(
+ client: &Client,
+ rng: &mut R,
+ base_tables: Vec,
+ row_count: usize,
+) -> Vec {
+ let inserts = insert_sql_gen(rng, base_tables, row_count);
+ for insert in &inserts {
+ tracing::info!("[EXECUTING INSERT]: {}", insert);
+ client.simple_query(insert).await.unwrap();
+ }
+ inserts
+ .iter()
+ .map(|s| parse_sql(s).into_iter().next().unwrap())
+ .collect_vec()
+}
+
+pub(super) async fn set_variable(client: &Client, variable: &str, value: &str) -> String {
+ let s = format!("SET {variable} TO {value}");
+ tracing::info!("[EXECUTING SET_VAR]: {}", s);
+ client.simple_query(&s).await.unwrap();
+ s
+}
+
+/// Sanity checks for sqlsmith
+pub(super) async fn test_sqlsmith(
+ client: &Client,
+ rng: &mut R,
+ tables: Vec,
+ base_tables: Vec,
+ row_count: usize,
+) {
+ // Test inserted rows should be at least 50% population count,
+ // otherwise we don't have sufficient data in our system.
+ // ENABLE: https://github.com/risingwavelabs/risingwave/issues/3844
+ test_population_count(client, base_tables, row_count).await;
+ tracing::info!("passed population count test");
+
+ let threshold = 0.50; // permit at most 50% of queries to be skipped.
+ let sample_size = 20;
+
+ let skipped_percentage = test_batch_queries(client, rng, tables.clone(), sample_size)
+ .await
+ .unwrap();
+ tracing::info!(
+ "percentage of skipped batch queries = {}, threshold: {}",
+ skipped_percentage,
+ threshold
+ );
+ if skipped_percentage > threshold {
+ panic!("skipped batch queries exceeded threshold.");
+ }
+
+ let skipped_percentage = test_stream_queries(client, rng, tables.clone(), sample_size)
+ .await
+ .unwrap();
+ tracing::info!(
+ "percentage of skipped stream queries = {}, threshold: {}",
+ skipped_percentage,
+ threshold
+ );
+ if skipped_percentage > threshold {
+ panic!("skipped stream queries exceeded threshold.");
+ }
+}
+
+pub(super) async fn test_session_variable(client: &Client, rng: &mut R) -> String {
+ let session_sql = session_sql_gen(rng);
+ tracing::info!("[EXECUTING TEST SESSION_VAR]: {}", session_sql);
+ client.simple_query(session_sql.as_str()).await.unwrap();
+ session_sql
+}
+
+/// Expects at least 50% of inserted rows included.
+pub(super) async fn test_population_count(
+ client: &Client,
+ base_tables: Vec,
+ expected_count: usize,
+) {
+ let mut actual_count = 0;
+ for t in base_tables {
+ let q = format!("select * from {};", t.name);
+ let rows = client.simple_query(&q).await.unwrap();
+ actual_count += rows.len();
+ }
+ if actual_count < expected_count / 2 {
+ panic!(
+ "expected at least 50% rows included.\
+ Total {} rows, only had {} rows",
+ expected_count, actual_count,
+ )
+ }
+}
+
+/// Test batch queries, returns skipped query statistics
+/// Runs in distributed mode, since queries can be complex and cause overflow in local execution
+/// mode.
+pub(super) async fn test_batch_queries(
+ client: &Client,
+ rng: &mut R,
+ tables: Vec,
+ sample_size: usize,
+) -> Result {
+ let mut skipped = 0;
+ for _ in 0..sample_size {
+ test_session_variable(client, rng).await;
+ let sql = sql_gen(rng, tables.clone());
+ tracing::info!("[TEST BATCH]: {}", sql);
+ skipped += run_query(30, client, &sql).await?;
+ }
+ Ok(skipped as f64 / sample_size as f64)
+}
+
+/// Test stream queries, returns skipped query statistics
+pub(super) async fn test_stream_queries(
+ client: &Client,
+ rng: &mut R,
+ tables: Vec,
+ sample_size: usize,
+) -> Result {
+ let mut skipped = 0;
+
+ for _ in 0..sample_size {
+ test_session_variable(client, rng).await;
+ let (sql, table) = mview_sql_gen(rng, tables.clone(), "stream_query");
+ tracing::info!("[TEST STREAM]: {}", sql);
+ skipped += run_query(12, client, &sql).await?;
+ tracing::info!("[TEST DROP MVIEW]: {}", &format_drop_mview(&table));
+ drop_mview_table(&table, client).await;
+ }
+ Ok(skipped as f64 / sample_size as f64)
+}
+
+pub(super) fn get_seed_table_sql(testdata: &str) -> String {
+ let seed_files = ["tpch.sql", "nexmark.sql", "alltypes.sql"];
+ seed_files
+ .iter()
+ .map(|filename| read_file_contents(format!("{}/{}", testdata, filename)).unwrap())
+ .collect::()
+}
+
+/// Create the tables defined in testdata, along with some mviews.
+/// TODO: Generate indexes and sinks.
+pub(super) async fn create_base_tables(testdata: &str, client: &Client) -> Result> {
+ tracing::info!("Preparing tables...");
+
+ let sql = get_seed_table_sql(testdata);
+ let (base_tables, statements) = parse_create_table_statements(sql);
+ let mut mvs_and_base_tables = vec![];
+ mvs_and_base_tables.extend_from_slice(&base_tables);
+
+ for stmt in &statements {
+ let create_sql = stmt.to_string();
+ tracing::info!("[EXECUTING CREATE TABLE]: {}", &create_sql);
+ client.simple_query(&create_sql).await.unwrap();
+ }
+
+ Ok(base_tables)
+}
+
+/// Create the tables defined in testdata, along with some mviews.
+/// TODO: Generate indexes and sinks.
+pub(super) async fn create_mviews(
+ rng: &mut impl Rng,
+ mvs_and_base_tables: Vec,
+ client: &Client,
+) -> Result<(Vec, Vec)> {
+ let mut mvs_and_base_tables = mvs_and_base_tables;
+ let mut mviews = vec![];
+ // Generate some mviews
+ for i in 0..20 {
+ let (create_sql, table) =
+ mview_sql_gen(rng, mvs_and_base_tables.clone(), &format!("m{}", i));
+ tracing::info!("[EXECUTING CREATE MVIEW]: {}", &create_sql);
+ let skip_count = run_query(6, client, &create_sql).await?;
+ if skip_count == 0 {
+ mvs_and_base_tables.push(table.clone());
+ mviews.push(table);
+ }
+ }
+ Ok((mvs_and_base_tables, mviews))
+}
+
+pub(super) fn format_drop_mview(mview: &Table) -> String {
+ format!("DROP MATERIALIZED VIEW IF EXISTS {}", mview.name)
+}
+
+/// Drops mview tables.
+pub(super) async fn drop_mview_table(mview: &Table, client: &Client) {
+ client
+ .simple_query(&format_drop_mview(mview))
+ .await
+ .unwrap();
+}
+
+/// Drops mview tables and seed tables
+pub(super) async fn drop_tables(mviews: &[Table], testdata: &str, client: &Client) {
+ tracing::info!("Cleaning tables...");
+
+ for mview in mviews.iter().rev() {
+ drop_mview_table(mview, client).await;
+ }
+
+ let seed_files = ["drop_tpch.sql", "drop_nexmark.sql", "drop_alltypes.sql"];
+ let sql = seed_files
+ .iter()
+ .map(|filename| read_file_contents(format!("{}/{}", testdata, filename)).unwrap())
+ .collect::();
+
+ for stmt in sql.lines() {
+ client.simple_query(stmt).await.unwrap();
+ }
+}
+
+/// Validate client responses, returning a count of skipped queries, number of result rows.
+pub(super) fn validate_response(
+ response: PgResult>,
+) -> Result<(i64, Vec)> {
+ match response {
+ Ok(rows) => Ok((0, rows)),
+ Err(e) => {
+ // Permit runtime errors conservatively.
+ if let Some(e) = e.as_db_error()
+ && is_permissible_error(&e.to_string())
+ {
+ tracing::info!("[SKIPPED ERROR]: {:#?}", e);
+ return Ok((1, vec![]));
+ }
+ // consolidate error reason for deterministic test
+ tracing::info!("[UNEXPECTED ERROR]: {:#?}", e);
+ Err(anyhow!("Encountered unexpected error: {e}"))
+ }
+ }
+}
+
+pub(super) async fn run_query(timeout_duration: u64, client: &Client, query: &str) -> Result {
+ let (skipped_count, _) = run_query_inner(timeout_duration, client, query).await?;
+ Ok(skipped_count)
+}
+/// Run query, handle permissible errors
+/// For recovery error, just do bounded retry.
+/// For other errors, validate them accordingly, skipping if they are permitted.
+/// Otherwise just return success.
+/// If takes too long return the query which timed out + execution time + timeout error
+/// Returns: Number of skipped queries, number of rows returned.
+pub(super) async fn run_query_inner(
+ timeout_duration: u64,
+ client: &Client,
+ query: &str,
+) -> Result<(i64, Vec)> {
+ let query_task = client.simple_query(query);
+ let result = timeout(Duration::from_secs(timeout_duration), query_task).await;
+ let response = match result {
+ Ok(r) => r,
+ Err(_) => bail!(
+ "[UNEXPECTED ERROR] Query timeout after {timeout_duration}s:\n{:?}",
+ query
+ ),
+ };
+ if let Err(e) = &response
+ && let Some(e) = e.as_db_error()
+ {
+ if is_recovery_in_progress_error(&e.to_string()) {
+ let tries = 5;
+ let interval = 1;
+ for _ in 0..tries {
+ // retry 5 times
+ sleep(Duration::from_secs(interval)).await;
+ let query_task = client.simple_query(query);
+ let response = timeout(Duration::from_secs(timeout_duration), query_task).await;
+ match response {
+ Ok(Ok(r)) => {
+ return Ok((0, r));
+ }
+ Err(_) => bail!(
+ "[UNEXPECTED ERROR] Query timeout after {timeout_duration}s:\n{:?}",
+ query
+ ),
+ _ => {}
+ }
+ }
+ bail!("[UNEXPECTED ERROR] Failed to recover after {tries} tries with interval {interval}s")
+ } else {
+ return validate_response(response);
+ }
+ }
+ let rows = response?;
+ Ok((0, rows))
+}
+
+pub(super) fn generate_rng(seed: Option) -> impl Rng {
+ #[cfg(madsim)]
+ if let Some(seed) = seed {
+ ChaChaRng::seed_from_u64(seed)
+ } else {
+ ChaChaRng::from_rng(SmallRng::from_entropy()).unwrap()
+ }
+ #[cfg(not(madsim))]
+ if let Some(seed) = seed {
+ SmallRng::seed_from_u64(seed)
+ } else {
+ SmallRng::from_entropy()
+ }
+}