Skip to content

Commit

Permalink
refactor(simulation): make it compile without cfg(madsim) (#11960)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Aug 31, 2023
1 parent fdbba6e commit 7a097e6
Show file tree
Hide file tree
Showing 32 changed files with 146 additions and 125 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ script = """
set -e
export CARGO_TARGET_DIR=target/coverage
cargo llvm-cov nextest --html
cargo llvm-cov nextest --html --workspace --exclude risingwave_simulation
"""
description = "Run unit tests and report coverage"

Expand All @@ -682,7 +682,7 @@ script = """
#!/usr/bin/env bash
set -e
cargo nextest run "$@"
cargo nextest run --workspace --exclude risingwave_simulation "$@"
"""
description = "🌟 Run unit tests"

Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/run-unit-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ cd ${REPO_ROOT}

echo "+++ Run unit tests with coverage"
# use tee to disable progress bar
NEXTEST_PROFILE=ci cargo llvm-cov nextest --lcov --output-path lcov.info --features failpoints,sync_point 2> >(tee);
NEXTEST_PROFILE=ci cargo llvm-cov nextest --lcov --output-path lcov.info --features failpoints,sync_point --workspace --exclude risingwave_simulation 2> >(tee);

echo "--- Codecov upload coverage reports"
curl -Os https://uploader.codecov.io/latest/linux/codecov && chmod +x codecov
./codecov -t "$CODECOV_TOKEN" -s . -F rust
./codecov -t "$CODECOV_TOKEN" -s . -F rust
1 change: 1 addition & 0 deletions src/tests/simulation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ normal = ["serde"]
anyhow = "1.0"
async-trait = "0.1"
aws-sdk-s3 = { version = "0.2", package = "madsim-aws-sdk-s3" }
cfg-or-panic = "0.1"
clap = { version = "4", features = ["derive"] }
console = "0.15"
etcd-client = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions src/tests/simulation/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<'a, 'b> SetStmtsIterator<'a, 'b> {

impl SetStmts {
fn push(&mut self, sql: &str) {
let ast = Parser::parse_sql(&sql).expect("a set statement should be parsed successfully");
let ast = Parser::parse_sql(sql).expect("a set statement should be parsed successfully");
match ast
.into_iter()
.exactly_one()
Expand Down Expand Up @@ -134,7 +134,7 @@ impl RisingWave {
.simple_query("SET RW_IMPLICIT_FLUSH TO true;")
.await?;
// replay all SET statements
for stmt in SetStmtsIterator::new(&set_stmts) {
for stmt in SetStmtsIterator::new(set_stmts) {
client.simple_query(&stmt).await?;
}
Ok((client, task))
Expand Down
30 changes: 23 additions & 7 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![cfg_attr(not(madsim), allow(unused_imports))]

use std::collections::HashMap;
use std::future::Future;
use std::io::Write;
Expand All @@ -20,17 +22,20 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, bail, Result};
use cfg_or_panic::cfg_or_panic;
use clap::Parser;
use futures::channel::{mpsc, oneshot};
use futures::future::join_all;
use futures::{SinkExt, StreamExt};
use itertools::Itertools;
use madsim::net::ipvs::*;
#[cfg(madsim)]
use madsim::runtime::{Handle, NodeHandle};
use rand::seq::IteratorRandom;
use rand::Rng;
use risingwave_pb::common::WorkerNode;
use sqllogictest::AsyncDB;
#[cfg(not(madsim))]
use tokio::runtime::Handle;

use crate::client::RisingWave;

Expand Down Expand Up @@ -153,15 +158,20 @@ impl Configuration {
pub struct Cluster {
config: Configuration,
handle: Handle,
#[cfg(madsim)]
pub(crate) client: NodeHandle,
#[cfg(madsim)]
pub(crate) ctl: NodeHandle,
}

impl Cluster {
/// Start a RisingWave cluster for testing.
///
/// This function should be called exactly once in a test.
#[cfg_or_panic(madsim)]
pub async fn start(conf: Configuration) -> Result<Self> {
use madsim::net::ipvs::*;

let handle = madsim::runtime::Handle::current();
println!("seed = {}", handle.seed());
println!("{:#?}", conf);
Expand Down Expand Up @@ -361,6 +371,7 @@ impl Cluster {
}

/// Start a SQL session on the client node.
#[cfg_or_panic(madsim)]
pub fn start_session(&mut self) -> Session {
let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);

Expand Down Expand Up @@ -404,6 +415,7 @@ impl Cluster {
}

/// Run a future on the client node.
#[cfg_or_panic(madsim)]
pub async fn run_on_client<F>(&self, future: F) -> F::Output
where
F: Future + Send + 'static,
Expand Down Expand Up @@ -433,7 +445,7 @@ impl Cluster {
timeout: Duration,
) -> Result<String> {
let fut = async move {
let mut interval = madsim::time::interval(interval);
let mut interval = tokio::time::interval(interval);
loop {
interval.tick().await;
let result = self.run(sql.clone()).await?;
Expand All @@ -443,7 +455,7 @@ impl Cluster {
}
};

match madsim::time::timeout(timeout, fut).await {
match tokio::time::timeout(timeout, fut).await {
Ok(r) => Ok(r?),
Err(_) => bail!("wait_until timeout"),
}
Expand All @@ -461,6 +473,7 @@ impl Cluster {
}

/// Kill some nodes and restart them in 2s + restart_delay_secs with a probability of 0.1.
#[cfg_or_panic(madsim)]
pub async fn kill_node(&self, opts: &KillOpts) {
let mut nodes = vec![];
if opts.kill_meta {
Expand Down Expand Up @@ -519,7 +532,7 @@ impl Cluster {
let t = rand::thread_rng().gen_range(Duration::from_secs(0)..Duration::from_secs(1));
tokio::time::sleep(t).await;
tracing::info!("kill {name}");
madsim::runtime::Handle::current().kill(name);
Handle::current().kill(name);

let mut t =
rand::thread_rng().gen_range(Duration::from_secs(0)..Duration::from_secs(1));
Expand All @@ -531,12 +544,13 @@ impl Cluster {
}
tokio::time::sleep(t).await;
tracing::info!("restart {name}");
madsim::runtime::Handle::current().restart(name);
Handle::current().restart(name);
}))
.await;
}

/// Create a node for kafka producer and prepare data.
#[cfg_or_panic(madsim)]
pub async fn create_kafka_producer(&self, datadir: &str) {
self.handle
.create_node()
Expand All @@ -552,6 +566,7 @@ impl Cluster {
}

/// Create a kafka topic.
#[cfg_or_panic(madsim)]
pub fn create_kafka_topics(&self, topics: HashMap<String, i32>) {
self.handle
.create_node()
Expand All @@ -570,6 +585,7 @@ impl Cluster {
}

/// Graceful shutdown all RisingWave nodes.
#[cfg_or_panic(madsim)]
pub async fn graceful_shutdown(&self) {
let mut nodes = vec![];
let mut metas = vec![];
Expand All @@ -592,12 +608,12 @@ impl Cluster {
for node in &nodes {
self.handle.send_ctrl_c(node);
}
madsim::time::sleep(waiting_time).await;
tokio::time::sleep(waiting_time).await;
// shutdown metas
for meta in &metas {
self.handle.send_ctrl_c(meta);
}
madsim::time::sleep(waiting_time).await;
tokio::time::sleep(waiting_time).await;

// check all nodes are exited
for node in nodes.iter().chain(metas.iter()) {
Expand Down
14 changes: 10 additions & 4 deletions src/tests/simulation/src/ctl_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![cfg_attr(not(madsim), expect(unused_imports))]

use std::collections::{HashMap, HashSet};
use std::fmt::Write;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use cfg_or_panic::cfg_or_panic;
use clap::Parser;
use itertools::Itertools;
use madsim::rand::thread_rng;
use rand::seq::{IteratorRandom, SliceRandom};
use rand::Rng;
use rand::{thread_rng, Rng};
use risingwave_common::hash::ParallelUnitId;
use risingwave_pb::common::{HostAddress, WorkerNode};
use risingwave_pb::meta::get_reschedule_plan_request::PbPolicy;
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::PbFragment;
Expand Down Expand Up @@ -219,6 +220,7 @@ impl Fragment {

impl Cluster {
/// Locate fragments that satisfy all the predicates.
#[cfg_or_panic(madsim)]
pub async fn locate_fragments(
&mut self,
predicates: impl IntoIterator<Item = BoxedPredicate>,
Expand Down Expand Up @@ -291,6 +293,7 @@ impl Cluster {
self.locate_one_fragment([predicate::id(id)]).await
}

#[cfg_or_panic(madsim)]
pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
let response = self
.ctl
Expand All @@ -305,6 +308,7 @@ impl Cluster {
}

// update node schedulability
#[cfg_or_panic(madsim)]
async fn update_worker_node_schedulability(
&self,
worker_ids: Vec<u32>,
Expand Down Expand Up @@ -345,11 +349,12 @@ impl Cluster {
self.reschedule_helper(plan, false).await
}

/// Same as reschedule, but resolve the no_shuffle upstream
/// Same as reschedule, but resolve the no-shuffle upstream
pub async fn reschedule_resolve_no_shuffle(&mut self, plan: impl Into<String>) -> Result<()> {
self.reschedule_helper(plan, true).await
}

#[cfg_or_panic(madsim)]
async fn reschedule_helper(
&mut self,
plan: impl Into<String>,
Expand Down Expand Up @@ -394,6 +399,7 @@ impl Cluster {
Ok(())
}

#[cfg_or_panic(madsim)]
pub async fn get_reschedule_plan(&self, policy: PbPolicy) -> Result<GetReschedulePlanResponse> {
let revision = self
.ctl
Expand Down
34 changes: 17 additions & 17 deletions src/tests/simulation/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
use std::collections::HashMap;
use std::time::SystemTime;

use itertools::Either;
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::producer::{BaseProducer, BaseRecord};
use rdkafka::producer::{BaseProducer, BaseRecord, Producer};
use rdkafka::ClientConfig;

/// Create a kafka topic
Expand Down Expand Up @@ -82,22 +83,21 @@ pub async fn producer(broker_addr: &str, datadir: String) {
.expect("failed to create topic");

let content = std::fs::read(file.path()).unwrap();
let msgs: Box<dyn Iterator<Item = (Option<&[u8]>, &[u8])> + Send> =
if topic.ends_with("bin") {
// binary message data, a file is a message
Box::new(std::iter::once((None, content.as_slice())))
} else {
// text message data, a line is a message
Box::new(
content
.split(|&b| b == b'\n')
.filter(|line| !line.is_empty())
.map(|line| match line.iter().position(|&b| b == KEY_DELIMITER) {
Some(pos) => (Some(&line[..pos]), &line[pos + 1..]),
None => (None, line),
}),
)
};
let msgs = if topic.ends_with("bin") {
// binary message data, a file is a message
Either::Left(std::iter::once((None, content.as_slice())))
} else {
// text message data, a line is a message
Either::Right(
content
.split(|&b| b == b'\n')
.filter(|line| !line.is_empty())
.map(|line| match line.iter().position(|&b| b == KEY_DELIMITER) {
Some(pos) => (Some(&line[..pos]), &line[pos + 1..]),
None => (None, line),
}),
)
};
for (key, payload) in msgs {
loop {
let ts = SystemTime::now()
Expand Down
1 change: 0 additions & 1 deletion src/tests/simulation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![cfg(madsim)]
#![feature(trait_alias)]
#![feature(lint_reasons)]
#![feature(lazy_cell)]
Expand Down
10 changes: 3 additions & 7 deletions src/tests/simulation/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@

use std::path::PathBuf;

use cfg_or_panic::cfg_or_panic;
use clap::Parser;

#[cfg(not(madsim))]
fn main() {
println!("This binary is only available in simulation.");
}

/// Deterministic simulation end-to-end test runner.
///
/// ENVS:
Expand Down Expand Up @@ -145,8 +141,8 @@ pub struct Args {
e2e_extended_test: bool,
}

#[cfg(madsim)]
#[madsim::main]
#[tokio::main]
#[cfg_or_panic(madsim)]
async fn main() {
use std::sync::Arc;

Expand Down
6 changes: 1 addition & 5 deletions src/tests/simulation/src/slt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ use crate::cluster::{Cluster, KillOpts};
use crate::utils::TimedExt;

fn is_create_table_as(sql: &str) -> bool {
let parts: Vec<String> = sql
.trim_start()
.split_whitespace()
.map(|s| s.to_lowercase())
.collect();
let parts: Vec<String> = sql.split_whitespace().map(|s| s.to_lowercase()).collect();

parts.len() >= 4 && parts[0] == "create" && parts[1] == "table" && parts[3] == "as"
}
Expand Down
Loading

0 comments on commit 7a097e6

Please sign in to comment.