Skip to content

Commit

Permalink
add sim test
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Aug 5, 2024
1 parent 1c9c6e5 commit a681dc0
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 26 deletions.
1 change: 1 addition & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ extend-exclude = [
# We don't want to fix "fals" here, but may want in other places.
# Ideally, we should just ignore that line: https://github.com/crate-ci/typos/issues/316
"src/common/src/cast/mod.rs",
"src/tests/simulation/tests/integration_tests/scale/shared_source.rs",
]
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,8 +801,10 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
BackfillState::Finished => {}
_ => {
return Err(anyhow::anyhow!(
"Unexpected backfill state: {:?}",
backfill_state
"Unexpected backfill state in update_state_if_changed_forward_stage: {:?}, target_splits: {:?}, current_splits: {:?}",
backfill_state,
target_splits,
current_splits
)
.into());
}
Expand Down
1 change: 1 addition & 0 deletions src/tests/simulation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ glob = "0.3"
itertools = { workspace = true }
lru = { workspace = true }
madsim = "0.2.27"
maplit = "1"
paste = "1"
pin-project = "1.1"
pretty_assertions = "1"
Expand Down
29 changes: 9 additions & 20 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,27 +153,16 @@ impl Configuration {
/// Provides a configuration for scale test which ensures that the arrangement backfill is disabled,
/// so table scan will use `no_shuffle`.
pub fn for_scale_no_shuffle() -> Self {
// Embed the config file and create a temporary file at runtime. The file will be deleted
// automatically when it's dropped.
let config_path = {
let mut file =
tempfile::NamedTempFile::new().expect("failed to create temp config file");
file.write_all(include_bytes!("risingwave-scale.toml"))
.expect("failed to write config file");
file.into_temp_path()
};
let mut conf = Self::for_scale();
conf.per_session_queries =
vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into()].into();
conf
}

Configuration {
config_path: ConfigPath::Temp(config_path.into()),
frontend_nodes: 2,
compute_nodes: 3,
meta_nodes: 3,
compactor_nodes: 2,
compute_node_cores: 2,
per_session_queries: vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into()]
.into(),
..Default::default()
}
pub fn for_scale_shared_source() -> Self {
let mut conf = Self::for_scale();
conf.per_session_queries = vec!["SET RW_ENABLE_SHARED_SOURCE = true;".into()].into();
conf
}

pub fn for_auto_parallelism(
Expand Down
25 changes: 21 additions & 4 deletions src/tests/simulation/src/ctl_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#![cfg_attr(not(madsim), expect(unused_imports))]

use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::ffi::OsString;
use std::fmt::Write;
use std::sync::Arc;
Expand All @@ -23,17 +23,17 @@ use anyhow::{anyhow, Result};
use cfg_or_panic::cfg_or_panic;
use clap::Parser;
use itertools::Itertools;
use rand::seq::{IteratorRandom, SliceRandom};
use rand::seq::IteratorRandom;
use rand::{thread_rng, Rng};
use risingwave_common::catalog::TableId;
use risingwave_common::hash::WorkerSlotId;
use risingwave_connector::source::{SplitImpl, SplitMetaData};
use risingwave_hummock_sdk::{CompactionGroupId, HummockSstableId};
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::PbFragment;
use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
use risingwave_pb::meta::GetClusterInfoResponse;
use risingwave_pb::stream_plan::StreamNode;
use serde::de::IntoDeserializer;

use self::predicate::BoxedPredicate;
use crate::cluster::Cluster;
Expand Down Expand Up @@ -76,7 +76,7 @@ pub mod predicate {
Box::new(p)
}

/// There exists operators whose identity contains `s` in the fragment.
/// There exists operators whose identity contains `s` in the fragment (case insensitive).
pub fn identity_contains(s: impl Into<String>) -> BoxedPredicate {
let s: String = s.into();
let p = move |f: &PbFragment| {
Expand Down Expand Up @@ -363,6 +363,23 @@ impl Cluster {
Ok(response)
}

#[cfg_or_panic(madsim)]
pub async fn list_source_splits(&self) -> Result<String> {
let info = self.get_cluster_info().await?;
let mut res = BTreeMap::new();
for (actor_id, splits) in info.actor_splits {
let splits = splits
.splits
.iter()
.map(|split| SplitImpl::try_from(split).unwrap())
.map(|split| split.id())
.collect_vec()
.join(",");
res.insert(actor_id, splits);
}
Ok(format!("{res:?}"))
}

// update node schedulability
#[cfg_or_panic(madsim)]
async fn update_worker_node_schedulability(
Expand Down
1 change: 1 addition & 0 deletions src/tests/simulation/tests/integration_tests/scale/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod nexmark_q4;
mod nexmark_source;
mod no_shuffle;
mod schedulability;
mod shared_source;
mod singleton_migration;
mod sink;
mod streaming_parallelism;
Expand Down
Loading

0 comments on commit a681dc0

Please sign in to comment.