diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 821a40173b37..ceeee4d9940a 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -484,7 +484,7 @@ impl StreamFragmentGraph { .iter() .any(has_arrangement_backfill_node) } - for (_id, fragment) in &self.fragments { + for fragment in self.fragments.values() { let fragment = &**fragment; let node = fragment.node.as_ref().unwrap(); if has_arrangement_backfill_node(node) { @@ -658,7 +658,7 @@ impl CompleteStreamFragmentGraph { let mview_id = GlobalFragmentId::new(mview_fragment.fragment_id); // Resolve the required output columns from the upstream materialized view. - let output_indices = { + let (dist_key_indices, output_indices) = { let nodes = mview_fragment.actors[0].get_nodes().unwrap(); let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap(); @@ -669,8 +669,16 @@ impl CompleteStreamFragmentGraph { .iter() .map(|c| c.column_desc.as_ref().unwrap().column_id) .collect_vec(); + let dist_key_indices = mview_node + .table + .as_ref() + .unwrap() + .distribution_key + .iter() + .map(|i| *i as u32) + .collect(); - output_columns + let output_indices = output_columns .iter() .map(|c| { all_column_ids @@ -679,12 +687,15 @@ impl CompleteStreamFragmentGraph { .map(|i| i as u32) }) .collect::>>() - .context("column not found in the upstream materialized view")? + .context( + "column not found in the upstream materialized view", + )?; + (dist_key_indices, output_indices) }; let dispatch_strategy = if uses_arrangement_backfill { DispatchStrategy { r#type: DispatcherType::Hash as _, - dist_key_indices: vec![], // not used for `Exchange` + dist_key_indices, // not used for `Exchange` output_indices, } } else { @@ -700,7 +711,6 @@ impl CompleteStreamFragmentGraph { downstream_fragment_id: id, }, dispatch_strategy, - }; (mview_id, edge) diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 6eb0a7efdf78..f4703cb88ffc 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -373,7 +373,7 @@ where } // Replicate - upstream_table.write_chunk(chunk); + // upstream_table.write_chunk(chunk); } if upstream_chunk_buffer_is_empty { diff --git a/src/tests/simulation/src/arrangement_backfill.toml b/src/tests/simulation/src/arrangement_backfill.toml new file mode 100644 index 000000000000..6e4beddf864e --- /dev/null +++ b/src/tests/simulation/src/arrangement_backfill.toml @@ -0,0 +1,14 @@ +[server] +telemetry_enabled = false +metrics_level = "Disabled" + +[streaming.developer] +stream_chunk_size = 1 + +[system] +# NOTE(kwannoel): If can't reproduce it, set to a lower number. +# This will throttle snapshot read. +# barrier_interval_ms = 1 +checkpoint_frequency = 10 +barrier_interval_ms = 100 +max_concurrent_creating_streaming_jobs = 0 diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 3bed45d06dd0..35b004de504c 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -179,6 +179,29 @@ metrics_level = "Disabled" } } + pub fn for_arrangement_backfill() -> Self { + // Embed the config file and create a temporary file at runtime. The file will be deleted + // automatically when it's dropped. + let config_path = { + let mut file = + tempfile::NamedTempFile::new().expect("failed to create temp config file"); + file.write_all(include_bytes!("arrangement_backfill.toml")) + .expect("failed to write config file"); + file.into_temp_path() + }; + + Configuration { + config_path: ConfigPath::Temp(config_path.into()), + frontend_nodes: 1, + compute_nodes: 3, + meta_nodes: 1, + compactor_nodes: 1, + compute_node_cores: 1, + etcd_timeout_rate: 0.0, + etcd_data_path: None, + } + } + pub fn for_background_ddl() -> Self { // Embed the config file and create a temporary file at runtime. The file will be deleted // automatically when it's dropped. diff --git a/src/tests/simulation/tests/integration_tests/backfill_tests.rs b/src/tests/simulation/tests/integration_tests/backfill_tests.rs index 372adc49da6c..58ca26ffe7eb 100644 --- a/src/tests/simulation/tests/integration_tests/backfill_tests.rs +++ b/src/tests/simulation/tests/integration_tests/backfill_tests.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::thread::sleep; +use std::time::Duration; + use anyhow::Result; use itertools::Itertools; use risingwave_simulation::cluster::{Cluster, Configuration}; @@ -102,3 +105,79 @@ async fn test_backfill_with_upstream_and_snapshot_read() -> Result<()> { } Ok(()) } + +/// The replication scenario is tested: +/// 1. Upstream yields some chunk downstream. The chunk values are in the range 301-400. +/// 2. Backfill snapshot read is until 300. +/// 3. Receive barrier, 301-400 must be replicated, if not these records are discarded. +/// 4. Next epoch is not a checkpoint epoch. +/// 5. Next Snapshot Read occurs, checkpointed data is from 400-500. +/// +/// In order to reproduce this scenario, we rely on a few things: +/// 1. Large checkpoint period, so checkpoint takes a long time to occur. +/// 2. We insert 2 partitions of records for the snapshot, +/// one at the lower bound, one at the upper bound. +/// 3. We insert a chunk of records in between. +#[tokio::test] +async fn test_arrangement_backfill_replication() -> Result<()> { + // Initialize cluster with config which has larger checkpoint interval, + // so it will rely on replication. + let mut cluster = Cluster::start(Configuration::for_arrangement_backfill()).await?; + let mut session = cluster.start_session(); + + // Create a table with parallelism = 1; + session.run("SET STREAMING_PARALLELISM=1;").await?; + session.run("CREATE TABLE t (v1 int primary key)").await?; + // TODO(kwannoel): test the parallelism of the table. + + // Ingest snapshot data + session + .run("INSERT INTO t select * from generate_series(1, 100)") + .await?; + session.run("FLUSH;").await?; + session + .run("INSERT INTO t select * from generate_series(201, 300)") + .await?; + session.run("FLUSH;").await?; + + // Start update data thread + let mut session2 = cluster.start_session(); + let upstream_task = tokio::spawn(async move { + // The initial 100 records will take approx 3s + // After that we start ingesting upstream records. + sleep(Duration::from_secs(3)); + for i in 101..=200 { + session2 + .run(format!("insert into t values ({})", i)) + .await + .unwrap(); + } + session2.run("FLUSH;").await.unwrap(); + }); + + // Create a materialized view with parallelism = 3; + session.run("SET STREAMING_PARALLELISM=3").await?; + session + .run("SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=true") + .await?; + session.run("SET STREAMING_RATE_LIMIT=30").await?; + session + .run("create materialized view m1 as select * from t") + .await?; + + upstream_task.await?; + + // Verify its parallelism + // TODO(kwannoel): test the parallelism of the table. + + // Verify all data has been ingested, with no extra data in m1. + let result = session + .run("select t.v1 from t where t.v1 not in (select v1 from m1)") + .await?; + assert_eq!(result, ""); + let result = session.run("select count(*) from m1").await?; + assert_eq!(result.parse::().unwrap(), 300); + Ok(()) +} + +// TODO(kwannoel): Test arrangement backfill background recovery.