Skip to content

Commit

Permalink
finish check backfill type + add test
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jan 5, 2024
1 parent a5079d6 commit b9539fa
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 7 deletions.
22 changes: 16 additions & 6 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ impl StreamFragmentGraph {
.iter()
.any(has_arrangement_backfill_node)
}
for (_id, fragment) in &self.fragments {
for fragment in self.fragments.values() {
let fragment = &**fragment;
let node = fragment.node.as_ref().unwrap();
if has_arrangement_backfill_node(node) {
Expand Down Expand Up @@ -658,7 +658,7 @@ impl CompleteStreamFragmentGraph {
let mview_id = GlobalFragmentId::new(mview_fragment.fragment_id);

// Resolve the required output columns from the upstream materialized view.
let output_indices = {
let (dist_key_indices, output_indices) = {
let nodes = mview_fragment.actors[0].get_nodes().unwrap();
let mview_node =
nodes.get_node_body().unwrap().as_materialize().unwrap();
Expand All @@ -669,8 +669,16 @@ impl CompleteStreamFragmentGraph {
.iter()
.map(|c| c.column_desc.as_ref().unwrap().column_id)
.collect_vec();
let dist_key_indices = mview_node
.table
.as_ref()
.unwrap()
.distribution_key
.iter()
.map(|i| *i as u32)
.collect();

output_columns
let output_indices = output_columns
.iter()
.map(|c| {
all_column_ids
Expand All @@ -679,12 +687,15 @@ impl CompleteStreamFragmentGraph {
.map(|i| i as u32)
})
.collect::<Option<Vec<_>>>()
.context("column not found in the upstream materialized view")?
.context(
"column not found in the upstream materialized view",
)?;
(dist_key_indices, output_indices)
};
let dispatch_strategy = if uses_arrangement_backfill {
DispatchStrategy {
r#type: DispatcherType::Hash as _,
dist_key_indices: vec![], // not used for `Exchange`
dist_key_indices, // not used for `Exchange`
output_indices,
}
} else {
Expand All @@ -700,7 +711,6 @@ impl CompleteStreamFragmentGraph {
downstream_fragment_id: id,
},
dispatch_strategy,

};

(mview_id, edge)
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ where
}

// Replicate
upstream_table.write_chunk(chunk);
// upstream_table.write_chunk(chunk);
}

if upstream_chunk_buffer_is_empty {
Expand Down
14 changes: 14 additions & 0 deletions src/tests/simulation/src/arrangement_backfill.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[server]
telemetry_enabled = false
metrics_level = "Disabled"

[streaming.developer]
stream_chunk_size = 1

[system]
# NOTE(kwannoel): If can't reproduce it, set to a lower number.
# This will throttle snapshot read.
# barrier_interval_ms = 1
checkpoint_frequency = 10
barrier_interval_ms = 100
max_concurrent_creating_streaming_jobs = 0
23 changes: 23 additions & 0 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,29 @@ metrics_level = "Disabled"
}
}

pub fn for_arrangement_backfill() -> Self {
// Embed the config file and create a temporary file at runtime. The file will be deleted
// automatically when it's dropped.
let config_path = {
let mut file =
tempfile::NamedTempFile::new().expect("failed to create temp config file");
file.write_all(include_bytes!("arrangement_backfill.toml"))
.expect("failed to write config file");
file.into_temp_path()
};

Configuration {
config_path: ConfigPath::Temp(config_path.into()),
frontend_nodes: 1,
compute_nodes: 3,
meta_nodes: 1,
compactor_nodes: 1,
compute_node_cores: 1,
etcd_timeout_rate: 0.0,
etcd_data_path: None,
}
}

pub fn for_background_ddl() -> Self {
// Embed the config file and create a temporary file at runtime. The file will be deleted
// automatically when it's dropped.
Expand Down
79 changes: 79 additions & 0 deletions src/tests/simulation/tests/integration_tests/backfill_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::thread::sleep;
use std::time::Duration;

use anyhow::Result;
use itertools::Itertools;
use risingwave_simulation::cluster::{Cluster, Configuration};
Expand Down Expand Up @@ -102,3 +105,79 @@ async fn test_backfill_with_upstream_and_snapshot_read() -> Result<()> {
}
Ok(())
}

/// The replication scenario is tested:
/// 1. Upstream yields some chunk downstream. The chunk values are in the range 301-400.
/// 2. Backfill snapshot read is until 300.
/// 3. Receive barrier, 301-400 must be replicated, if not these records are discarded.
/// 4. Next epoch is not a checkpoint epoch.
/// 5. Next Snapshot Read occurs, checkpointed data is from 400-500.
///
/// In order to reproduce this scenario, we rely on a few things:
/// 1. Large checkpoint period, so checkpoint takes a long time to occur.
/// 2. We insert 2 partitions of records for the snapshot,
/// one at the lower bound, one at the upper bound.
/// 3. We insert a chunk of records in between.
#[tokio::test]
async fn test_arrangement_backfill_replication() -> Result<()> {
// Initialize cluster with config which has larger checkpoint interval,
// so it will rely on replication.
let mut cluster = Cluster::start(Configuration::for_arrangement_backfill()).await?;
let mut session = cluster.start_session();

// Create a table with parallelism = 1;
session.run("SET STREAMING_PARALLELISM=1;").await?;
session.run("CREATE TABLE t (v1 int primary key)").await?;
// TODO(kwannoel): test the parallelism of the table.

// Ingest snapshot data
session
.run("INSERT INTO t select * from generate_series(1, 100)")
.await?;
session.run("FLUSH;").await?;
session
.run("INSERT INTO t select * from generate_series(201, 300)")
.await?;
session.run("FLUSH;").await?;

// Start update data thread
let mut session2 = cluster.start_session();
let upstream_task = tokio::spawn(async move {
// The initial 100 records will take approx 3s
// After that we start ingesting upstream records.
sleep(Duration::from_secs(3));
for i in 101..=200 {
session2
.run(format!("insert into t values ({})", i))
.await
.unwrap();
}
session2.run("FLUSH;").await.unwrap();
});

// Create a materialized view with parallelism = 3;
session.run("SET STREAMING_PARALLELISM=3").await?;
session
.run("SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=true")
.await?;
session.run("SET STREAMING_RATE_LIMIT=30").await?;
session
.run("create materialized view m1 as select * from t")
.await?;

upstream_task.await?;

// Verify its parallelism
// TODO(kwannoel): test the parallelism of the table.

// Verify all data has been ingested, with no extra data in m1.
let result = session
.run("select t.v1 from t where t.v1 not in (select v1 from m1)")
.await?;
assert_eq!(result, "");
let result = session.run("select count(*) from m1").await?;
assert_eq!(result.parse::<usize>().unwrap(), 300);
Ok(())
}

// TODO(kwannoel): Test arrangement backfill background recovery.

0 comments on commit b9539fa

Please sign in to comment.