diff --git a/ci/scripts/deterministic-recovery-test.sh b/ci/scripts/deterministic-recovery-test.sh index b1c8937ad8e8..30c41603c9e6 100755 --- a/ci/scripts/deterministic-recovery-test.sh +++ b/ci/scripts/deterministic-recovery-test.sh @@ -9,8 +9,7 @@ echo "--- Download artifacts" download-and-decompress-artifact risingwave_simulation . chmod +x ./risingwave_simulation -export RUST_LOG="info,\ -risingwave_meta::barrier::recovery=debug,\ +export RUST_LOG="risingwave_meta::barrier::recovery=debug,\ risingwave_meta::manager::catalog=debug,\ risingwave_meta::rpc::ddl_controller=debug,\ risingwave_meta::barrier::mod=debug,\ diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index b693cd33e8d6..1cdb4771ffd4 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -1,3 +1,6 @@ +statement ok +set rw_implicit_flush=true; + statement ok create table t_kafka ( id integer primary key, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index b572539ecdd8..cc66d1341b4f 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -348,7 +348,18 @@ impl PlanRoot { /// Generate optimized stream plan fn gen_optimized_stream_plan(&mut self, emit_on_window_close: bool) -> Result { - self.gen_optimized_stream_plan_inner(emit_on_window_close, StreamScanType::Backfill) + let stream_scan_type = if self + .plan + .ctx() + .session_ctx() + .config() + .streaming_enable_arrangement_backfill() + { + StreamScanType::ArrangementBackfill + } else { + StreamScanType::Backfill + }; + self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type) } fn gen_optimized_stream_plan_inner( @@ -778,6 +789,14 @@ impl PlanRoot { ) -> Result { let stream_scan_type = if without_backfill { StreamScanType::UpstreamOnly + } else if self + .plan + .ctx() + .session_ctx() + .config() + .streaming_enable_arrangement_backfill() + { + StreamScanType::ArrangementBackfill } else { StreamScanType::Backfill }; diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index b17967025402..b451dea88c63 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -22,7 +22,6 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnDesc, TableDesc}; use risingwave_common::error::Result; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_pb::stream_plan::StreamScanType; use super::generic::{GenericPlanNode, GenericPlanRef}; use super::utils::{childless_record, Distill}; @@ -521,24 +520,11 @@ impl ToBatch for LogicalScan { impl ToStream for LogicalScan { fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { if self.predicate().always_true() { - if self - .ctx() - .session_ctx() - .config() - .streaming_enable_arrangement_backfill() - { - Ok(StreamTableScan::new_with_stream_scan_type( - self.core.clone(), - StreamScanType::ArrangementBackfill, - ) - .into()) - } else { - Ok(StreamTableScan::new_with_stream_scan_type( - self.core.clone(), - ctx.stream_scan_type(), - ) - .into()) - } + Ok(StreamTableScan::new_with_stream_scan_type( + self.core.clone(), + ctx.stream_scan_type(), + ) + .into()) } else { let (scan, predicate, project_expr) = self.predicate_pull_up(); let mut plan = LogicalFilter::create(scan.into(), predicate); diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 7dde686064be..ef224c5f96d0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -47,10 +47,6 @@ pub struct StreamTableScan { } impl StreamTableScan { - pub fn new(core: generic::Scan) -> Self { - Self::new_with_stream_scan_type(core, StreamScanType::Backfill) - } - pub fn new_with_stream_scan_type( core: generic::Scan, stream_scan_type: StreamScanType, diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index c9eccf29d473..c42c2f5a5142 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -28,6 +28,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::{ DispatchStrategy, Dispatcher, DispatcherType, MergeNode, StreamActor, StreamNode, + StreamScanType, }; use super::id::GlobalFragmentIdsExt; @@ -174,11 +175,20 @@ impl ActorBuilder { downstream_fragment_id: self.fragment_id, }]; - // FIXME(kwannoel): This may not hold for Arrangement Backfill. - // As we always use the `NoShuffle` exchange for MV on MV, there should be only one - // upstream. let upstream_actor_id = upstreams.actors.as_global_ids(); - assert_eq!(upstream_actor_id.len(), 1); + let is_arrangement_backfill = + stream_scan.stream_scan_type == StreamScanType::ArrangementBackfill as i32; + if !is_arrangement_backfill { + assert_eq!(upstream_actor_id.len(), 1); + } + + let upstream_dispatcher_type = if is_arrangement_backfill { + // FIXME(kwannoel): Should the upstream dispatcher type depends on the upstream distribution? + // If singleton, use `Simple` dispatcher, otherwise use `Hash` dispatcher. + DispatcherType::Hash as _ + } else { + DispatcherType::NoShuffle as _ + }; let input = vec![ // Fill the merge node body with correct upstream info. @@ -186,7 +196,7 @@ impl ActorBuilder { node_body: Some(NodeBody::Merge(MergeNode { upstream_actor_id, upstream_fragment_id: upstreams.fragment_id.as_global_id(), - upstream_dispatcher_type: DispatcherType::NoShuffle as _, + upstream_dispatcher_type, fields: merge_node.fields.clone(), })), ..merge_node.clone() diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 79926a359060..4edd3743a1ce 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -35,7 +35,7 @@ use risingwave_pb::stream_plan::stream_fragment_graph::{ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ DispatchStrategy, DispatcherType, FragmentTypeFlag, StreamActor, - StreamFragmentGraph as StreamFragmentGraphProto, + StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanType, }; use crate::manager::{MetaSrvEnv, StreamingJob}; @@ -194,6 +194,28 @@ impl BuildingFragment { table_columns } + + pub fn has_arrangement_backfill(&self) -> bool { + fn has_arrangement_backfill_node(stream_node: &StreamNode) -> bool { + let is_backfill = if let Some(node) = &stream_node.node_body + && let Some(node) = node.as_stream_scan() + { + node.stream_scan_type == StreamScanType::ArrangementBackfill as i32 + } else { + false + }; + is_backfill + || stream_node + .get_input() + .iter() + .any(has_arrangement_backfill_node) + } + let stream_node = match self.inner.node.as_ref() { + Some(node) => node, + _ => return false, + }; + has_arrangement_backfill_node(stream_node) + } } impl Deref for BuildingFragment { @@ -590,6 +612,7 @@ impl CompleteStreamFragmentGraph { // Build the extra edges between the upstream `Materialize` and the downstream `StreamScan` // of the new materialized view. for (&id, fragment) in &mut graph.fragments { + let uses_arrangement_backfill = fragment.has_arrangement_backfill(); for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns { let (up_fragment_id, edge) = match table_job_type.as_ref() { Some(TableJobType::SharedCdcSource) => { @@ -636,7 +659,7 @@ impl CompleteStreamFragmentGraph { let mview_id = GlobalFragmentId::new(mview_fragment.fragment_id); // Resolve the required output columns from the upstream materialized view. - let output_indices = { + let (dist_key_indices, output_indices) = { let nodes = mview_fragment.actors[0].get_nodes().unwrap(); let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap(); @@ -647,8 +670,16 @@ impl CompleteStreamFragmentGraph { .iter() .map(|c| c.column_desc.as_ref().unwrap().column_id) .collect_vec(); + let dist_key_indices: Vec<_> = mview_node + .table + .as_ref() + .unwrap() + .distribution_key + .iter() + .map(|i| *i as u32) + .collect(); - output_columns + let output_indices = output_columns .iter() .map(|c| { all_column_ids @@ -657,21 +688,38 @@ impl CompleteStreamFragmentGraph { .map(|i| i as u32) }) .collect::>>() - .context("column not found in the upstream materialized view")? + .context( + "column not found in the upstream materialized view", + )?; + (dist_key_indices, output_indices) + }; + let dispatch_strategy = if uses_arrangement_backfill { + if !dist_key_indices.is_empty() { + DispatchStrategy { + r#type: DispatcherType::Hash as _, + dist_key_indices, + output_indices, + } + } else { + DispatchStrategy { + r#type: DispatcherType::Simple as _, + dist_key_indices: vec![], // empty for Simple + output_indices, + } + } + } else { + DispatchStrategy { + r#type: DispatcherType::NoShuffle as _, + dist_key_indices: vec![], // not used for `NoShuffle` + output_indices, + } }; let edge = StreamFragmentEdge { id: EdgeId::UpstreamExternal { upstream_table_id, downstream_fragment_id: id, }, - // We always use `NoShuffle` for the exchange between the upstream - // `Materialize` and the downstream `StreamScan` of the - // new materialized view. - dispatch_strategy: DispatchStrategy { - r#type: DispatcherType::NoShuffle as _, - dist_key_indices: vec![], // not used for `NoShuffle` - output_indices, - }, + dispatch_strategy, }; (mview_id, edge) diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index a7f4385505eb..6eb0a7efdf78 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -372,8 +372,8 @@ where )); } - // FIXME(kwannoel): Replicate - // upstream_table.write_chunk(chunk); + // Replicate + upstream_table.write_chunk(chunk); } if upstream_chunk_buffer_is_empty { diff --git a/src/tests/simulation/src/arrangement_backfill.toml b/src/tests/simulation/src/arrangement_backfill.toml new file mode 100644 index 000000000000..6e4beddf864e --- /dev/null +++ b/src/tests/simulation/src/arrangement_backfill.toml @@ -0,0 +1,14 @@ +[server] +telemetry_enabled = false +metrics_level = "Disabled" + +[streaming.developer] +stream_chunk_size = 1 + +[system] +# NOTE(kwannoel): If can't reproduce it, set to a lower number. +# This will throttle snapshot read. +# barrier_interval_ms = 1 +checkpoint_frequency = 10 +barrier_interval_ms = 100 +max_concurrent_creating_streaming_jobs = 0 diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 3bed45d06dd0..9b3e1a7ad736 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -87,6 +87,25 @@ pub struct Configuration { /// Path to etcd data file. pub etcd_data_path: Option, + + /// Queries to run per session. + pub per_session_queries: Arc>, +} + +impl Default for Configuration { + fn default() -> Self { + Configuration { + config_path: ConfigPath::Regular("".into()), + frontend_nodes: 1, + compute_nodes: 1, + meta_nodes: 1, + compactor_nodes: 1, + compute_node_cores: 1, + etcd_timeout_rate: 0.0, + etcd_data_path: None, + per_session_queries: vec![].into(), + } + } } impl Configuration { @@ -109,8 +128,9 @@ impl Configuration { meta_nodes: 3, compactor_nodes: 2, compute_node_cores: 2, - etcd_timeout_rate: 0.0, - etcd_data_path: None, + per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into()] + .into(), + ..Default::default() } } @@ -150,8 +170,9 @@ metrics_level = "Disabled" meta_nodes: 1, compactor_nodes: 1, compute_node_cores: 2, - etcd_timeout_rate: 0.0, - etcd_data_path: None, + per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into()] + .into(), + ..Default::default() } } @@ -174,8 +195,29 @@ metrics_level = "Disabled" meta_nodes: 1, compactor_nodes: 1, compute_node_cores: 4, - etcd_timeout_rate: 0.0, - etcd_data_path: None, + ..Default::default() + } + } + + pub fn for_arrangement_backfill() -> Self { + // Embed the config file and create a temporary file at runtime. The file will be deleted + // automatically when it's dropped. + let config_path = { + let mut file = + tempfile::NamedTempFile::new().expect("failed to create temp config file"); + file.write_all(include_bytes!("arrangement_backfill.toml")) + .expect("failed to write config file"); + file.into_temp_path() + }; + + Configuration { + config_path: ConfigPath::Temp(config_path.into()), + frontend_nodes: 1, + compute_nodes: 3, + meta_nodes: 1, + compactor_nodes: 1, + compute_node_cores: 1, + ..Default::default() } } @@ -202,8 +244,7 @@ metrics_level = "Disabled" meta_nodes: 3, compactor_nodes: 2, compute_node_cores: 2, - etcd_timeout_rate: 0.0, - etcd_data_path: None, + ..Default::default() } } } @@ -439,14 +480,25 @@ impl Cluster { }) } + #[cfg_or_panic(madsim)] + fn per_session_queries(&self) -> Arc> { + self.config.per_session_queries.clone() + } + /// Start a SQL session on the client node. #[cfg_or_panic(madsim)] pub fn start_session(&mut self) -> Session { let (query_tx, mut query_rx) = mpsc::channel::(0); + let per_session_queries = self.per_session_queries(); self.client.spawn(async move { let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?; + for sql in per_session_queries.as_ref() { + client.run(sql).await?; + } + drop(per_session_queries); + while let Some((sql, tx)) = query_rx.next().await { let result = client .run(&sql) diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index 2d198239a735..9de1fc6d79a3 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -173,6 +173,7 @@ async fn main() { meta_nodes: args.meta_nodes, etcd_timeout_rate: args.etcd_timeout_rate, etcd_data_path: args.etcd_data, + ..Default::default() }; let kill_opts = KillOpts { kill_meta: args.kill_meta || args.kill, diff --git a/src/tests/simulation/tests/integration_tests/backfill_tests.rs b/src/tests/simulation/tests/integration_tests/backfill_tests.rs index 372adc49da6c..168e84a7ff5a 100644 --- a/src/tests/simulation/tests/integration_tests/backfill_tests.rs +++ b/src/tests/simulation/tests/integration_tests/backfill_tests.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::thread::sleep; +use std::time::Duration; + use anyhow::Result; use itertools::Itertools; use risingwave_simulation::cluster::{Cluster, Configuration}; @@ -102,3 +105,92 @@ async fn test_backfill_with_upstream_and_snapshot_read() -> Result<()> { } Ok(()) } + +/// The replication scenario is tested: +/// 1. Upstream yields some chunk downstream. The chunk values are in the range 301-400. +/// 2. Backfill snapshot read is until 300. +/// 3. Receive barrier, 301-400 must be replicated, if not these records are discarded. +/// 4. Next epoch is not a checkpoint epoch. +/// 5. Next Snapshot Read occurs, checkpointed data is from 400-500. +/// +/// In order to reproduce this scenario, we rely on a few things: +/// 1. Large checkpoint period, so checkpoint takes a long time to occur. +/// 2. We insert 2 partitions of records for the snapshot, +/// one at the lower bound, one at the upper bound. +/// 3. We insert a chunk of records in between. +#[tokio::test] +async fn test_arrangement_backfill_replication() -> Result<()> { + // Initialize cluster with config which has larger checkpoint interval, + // so it will rely on replication. + let mut cluster = Cluster::start(Configuration::for_arrangement_backfill()).await?; + let mut session = cluster.start_session(); + + // Create a table with parallelism = 1; + session.run("SET STREAMING_PARALLELISM=1;").await?; + session.run("CREATE TABLE t (v1 int primary key)").await?; + let parallelism_per_fragment = session + .run("select parallelism from rw_tables join rw_fragments on id=table_id and name='t';") + .await?; + for parallelism in parallelism_per_fragment.split('\n') { + assert_eq!(parallelism.parse::().unwrap(), 1); + } + + // Ingest snapshot data + session + .run("INSERT INTO t select * from generate_series(1, 100)") + .await?; + session.run("FLUSH;").await?; + session + .run("INSERT INTO t select * from generate_series(201, 300)") + .await?; + session.run("FLUSH;").await?; + + // Start update data thread + let mut session2 = cluster.start_session(); + let upstream_task = tokio::spawn(async move { + // The initial 100 records will take approx 3s + // After that we start ingesting upstream records. + sleep(Duration::from_secs(3)); + for i in 101..=200 { + session2 + .run(format!("insert into t values ({})", i)) + .await + .unwrap(); + } + session2.run("FLUSH;").await.unwrap(); + }); + + // Create a materialized view with parallelism = 3; + session.run("SET STREAMING_PARALLELISM=3").await?; + session + .run("SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=true") + .await?; + session.run("SET STREAMING_RATE_LIMIT=30").await?; + session + .run("create materialized view m1 as select * from t") + .await?; + + upstream_task.await?; + + // Verify its parallelism + let parallelism_per_fragment = session.run( + "select parallelism from rw_materialized_views join rw_fragments on id=table_id and name='m1';" + ).await?; + for parallelism in parallelism_per_fragment.split('\n') { + assert_eq!(parallelism.parse::().unwrap(), 3); + } + + // Verify all data has been ingested, with no extra data in m1. + let result = session + .run("select t.v1 from t where t.v1 not in (select v1 from m1)") + .await?; + assert_eq!(result, ""); + let result = session.run("select count(*) from m1").await?; + assert_eq!(result.parse::().unwrap(), 300); + Ok(()) +} + +// TODO(kwannoel): Test case where upstream distribution is Single, then downstream +// distribution MUST also be single, and arrangement backfill should just use Simple. + +// TODO(kwannoel): Test arrangement backfill background recovery. diff --git a/src/tests/simulation/tests/integration_tests/batch/mod.rs b/src/tests/simulation/tests/integration_tests/batch/mod.rs index 4cd66a2fd3ae..fb3658bef44d 100644 --- a/src/tests/simulation/tests/integration_tests/batch/mod.rs +++ b/src/tests/simulation/tests/integration_tests/batch/mod.rs @@ -77,8 +77,7 @@ metrics_level = \"Disabled\" meta_nodes: 3, compactor_nodes: 2, compute_node_cores: 2, - etcd_timeout_rate: 0.0, - etcd_data_path: None, + ..Default::default() } } diff --git a/src/tests/simulation/tests/integration_tests/recovery/event_log.rs b/src/tests/simulation/tests/integration_tests/recovery/event_log.rs index 65a687eef70d..51f2e8dff329 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/event_log.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/event_log.rs @@ -14,6 +14,7 @@ #![cfg(madsim)] +use std::default; use std::io::Write; use std::time::Duration; @@ -44,8 +45,7 @@ event_log_flush_interval_ms = 10\ meta_nodes: 1, compactor_nodes: 0, compute_node_cores: 2, - etcd_timeout_rate: 0.0, - etcd_data_path: None, + ..Default::default() } } diff --git a/src/tests/simulation/tests/integration_tests/sink/utils.rs b/src/tests/simulation/tests/integration_tests/sink/utils.rs index 7d397b8066e0..6b9ea61e708b 100644 --- a/src/tests/simulation/tests/integration_tests/sink/utils.rs +++ b/src/tests/simulation/tests/integration_tests/sink/utils.rs @@ -363,8 +363,7 @@ pub async fn start_sink_test_cluster() -> Result { meta_nodes: 1, compactor_nodes: 1, compute_node_cores: 2, - etcd_timeout_rate: 0.0, - etcd_data_path: None, + ..Default::default() }) .await }