Skip to content

Commit

Permalink
feat(stream): enable replication and independent parallelism for arra…
Browse files Browse the repository at this point in the history
…ngement backfill (#14148)

Co-authored-by: August <[email protected]>
  • Loading branch information
kwannoel and yezizp2012 authored Jan 12, 2024
1 parent 44019bc commit fd63597
Show file tree
Hide file tree
Showing 15 changed files with 277 additions and 59 deletions.
3 changes: 1 addition & 2 deletions ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ echo "--- Download artifacts"
download-and-decompress-artifact risingwave_simulation .
chmod +x ./risingwave_simulation

export RUST_LOG="info,\
risingwave_meta::barrier::recovery=debug,\
export RUST_LOG="risingwave_meta::barrier::recovery=debug,\
risingwave_meta::manager::catalog=debug,\
risingwave_meta::rpc::ddl_controller=debug,\
risingwave_meta::barrier::mod=debug,\
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set rw_implicit_flush=true;

statement ok
create table t_kafka (
id integer primary key,
Expand Down
21 changes: 20 additions & 1 deletion src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,18 @@ impl PlanRoot {

/// Generate optimized stream plan
fn gen_optimized_stream_plan(&mut self, emit_on_window_close: bool) -> Result<PlanRef> {
self.gen_optimized_stream_plan_inner(emit_on_window_close, StreamScanType::Backfill)
let stream_scan_type = if self
.plan
.ctx()
.session_ctx()
.config()
.streaming_enable_arrangement_backfill()
{
StreamScanType::ArrangementBackfill
} else {
StreamScanType::Backfill
};
self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
}

fn gen_optimized_stream_plan_inner(
Expand Down Expand Up @@ -778,6 +789,14 @@ impl PlanRoot {
) -> Result<StreamSink> {
let stream_scan_type = if without_backfill {
StreamScanType::UpstreamOnly
} else if self
.plan
.ctx()
.session_ctx()
.config()
.streaming_enable_arrangement_backfill()
{
StreamScanType::ArrangementBackfill
} else {
StreamScanType::Backfill
};
Expand Down
24 changes: 5 additions & 19 deletions src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{ColumnDesc, TableDesc};
use risingwave_common::error::Result;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::stream_plan::StreamScanType;

use super::generic::{GenericPlanNode, GenericPlanRef};
use super::utils::{childless_record, Distill};
Expand Down Expand Up @@ -521,24 +520,11 @@ impl ToBatch for LogicalScan {
impl ToStream for LogicalScan {
fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
if self.predicate().always_true() {
if self
.ctx()
.session_ctx()
.config()
.streaming_enable_arrangement_backfill()
{
Ok(StreamTableScan::new_with_stream_scan_type(
self.core.clone(),
StreamScanType::ArrangementBackfill,
)
.into())
} else {
Ok(StreamTableScan::new_with_stream_scan_type(
self.core.clone(),
ctx.stream_scan_type(),
)
.into())
}
Ok(StreamTableScan::new_with_stream_scan_type(
self.core.clone(),
ctx.stream_scan_type(),
)
.into())
} else {
let (scan, predicate, project_expr) = self.predicate_pull_up();
let mut plan = LogicalFilter::create(scan.into(), predicate);
Expand Down
4 changes: 0 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ pub struct StreamTableScan {
}

impl StreamTableScan {
pub fn new(core: generic::Scan) -> Self {
Self::new_with_stream_scan_type(core, StreamScanType::Backfill)
}

pub fn new_with_stream_scan_type(
core: generic::Scan,
stream_scan_type: StreamScanType,
Expand Down
20 changes: 15 additions & 5 deletions src/meta/src/stream/stream_graph/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::{
DispatchStrategy, Dispatcher, DispatcherType, MergeNode, StreamActor, StreamNode,
StreamScanType,
};

use super::id::GlobalFragmentIdsExt;
Expand Down Expand Up @@ -174,19 +175,28 @@ impl ActorBuilder {
downstream_fragment_id: self.fragment_id,
}];

// FIXME(kwannoel): This may not hold for Arrangement Backfill.
// As we always use the `NoShuffle` exchange for MV on MV, there should be only one
// upstream.
let upstream_actor_id = upstreams.actors.as_global_ids();
assert_eq!(upstream_actor_id.len(), 1);
let is_arrangement_backfill =
stream_scan.stream_scan_type == StreamScanType::ArrangementBackfill as i32;
if !is_arrangement_backfill {
assert_eq!(upstream_actor_id.len(), 1);
}

let upstream_dispatcher_type = if is_arrangement_backfill {
// FIXME(kwannoel): Should the upstream dispatcher type depends on the upstream distribution?
// If singleton, use `Simple` dispatcher, otherwise use `Hash` dispatcher.
DispatcherType::Hash as _
} else {
DispatcherType::NoShuffle as _
};

let input = vec![
// Fill the merge node body with correct upstream info.
StreamNode {
node_body: Some(NodeBody::Merge(MergeNode {
upstream_actor_id,
upstream_fragment_id: upstreams.fragment_id.as_global_id(),
upstream_dispatcher_type: DispatcherType::NoShuffle as _,
upstream_dispatcher_type,
fields: merge_node.fields.clone(),
})),
..merge_node.clone()
Expand Down
72 changes: 60 additions & 12 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use risingwave_pb::stream_plan::stream_fragment_graph::{
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{
DispatchStrategy, DispatcherType, FragmentTypeFlag, StreamActor,
StreamFragmentGraph as StreamFragmentGraphProto,
StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanType,
};

use crate::manager::{MetaSrvEnv, StreamingJob};
Expand Down Expand Up @@ -194,6 +194,28 @@ impl BuildingFragment {

table_columns
}

pub fn has_arrangement_backfill(&self) -> bool {
fn has_arrangement_backfill_node(stream_node: &StreamNode) -> bool {
let is_backfill = if let Some(node) = &stream_node.node_body
&& let Some(node) = node.as_stream_scan()
{
node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
} else {
false
};
is_backfill
|| stream_node
.get_input()
.iter()
.any(has_arrangement_backfill_node)
}
let stream_node = match self.inner.node.as_ref() {
Some(node) => node,
_ => return false,
};
has_arrangement_backfill_node(stream_node)
}
}

impl Deref for BuildingFragment {
Expand Down Expand Up @@ -590,6 +612,7 @@ impl CompleteStreamFragmentGraph {
// Build the extra edges between the upstream `Materialize` and the downstream `StreamScan`
// of the new materialized view.
for (&id, fragment) in &mut graph.fragments {
let uses_arrangement_backfill = fragment.has_arrangement_backfill();
for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns {
let (up_fragment_id, edge) = match table_job_type.as_ref() {
Some(TableJobType::SharedCdcSource) => {
Expand Down Expand Up @@ -636,7 +659,7 @@ impl CompleteStreamFragmentGraph {
let mview_id = GlobalFragmentId::new(mview_fragment.fragment_id);

// Resolve the required output columns from the upstream materialized view.
let output_indices = {
let (dist_key_indices, output_indices) = {
let nodes = mview_fragment.actors[0].get_nodes().unwrap();
let mview_node =
nodes.get_node_body().unwrap().as_materialize().unwrap();
Expand All @@ -647,8 +670,16 @@ impl CompleteStreamFragmentGraph {
.iter()
.map(|c| c.column_desc.as_ref().unwrap().column_id)
.collect_vec();
let dist_key_indices: Vec<_> = mview_node
.table
.as_ref()
.unwrap()
.distribution_key
.iter()
.map(|i| *i as u32)
.collect();

output_columns
let output_indices = output_columns
.iter()
.map(|c| {
all_column_ids
Expand All @@ -657,21 +688,38 @@ impl CompleteStreamFragmentGraph {
.map(|i| i as u32)
})
.collect::<Option<Vec<_>>>()
.context("column not found in the upstream materialized view")?
.context(
"column not found in the upstream materialized view",
)?;
(dist_key_indices, output_indices)
};
let dispatch_strategy = if uses_arrangement_backfill {
if !dist_key_indices.is_empty() {
DispatchStrategy {
r#type: DispatcherType::Hash as _,
dist_key_indices,
output_indices,
}
} else {
DispatchStrategy {
r#type: DispatcherType::Simple as _,
dist_key_indices: vec![], // empty for Simple
output_indices,
}
}
} else {
DispatchStrategy {
r#type: DispatcherType::NoShuffle as _,
dist_key_indices: vec![], // not used for `NoShuffle`
output_indices,
}
};
let edge = StreamFragmentEdge {
id: EdgeId::UpstreamExternal {
upstream_table_id,
downstream_fragment_id: id,
},
// We always use `NoShuffle` for the exchange between the upstream
// `Materialize` and the downstream `StreamScan` of the
// new materialized view.
dispatch_strategy: DispatchStrategy {
r#type: DispatcherType::NoShuffle as _,
dist_key_indices: vec![], // not used for `NoShuffle`
output_indices,
},
dispatch_strategy,
};

(mview_id, edge)
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ where
));
}

// FIXME(kwannoel): Replicate
// upstream_table.write_chunk(chunk);
// Replicate
upstream_table.write_chunk(chunk);
}

if upstream_chunk_buffer_is_empty {
Expand Down
14 changes: 14 additions & 0 deletions src/tests/simulation/src/arrangement_backfill.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[server]
telemetry_enabled = false
metrics_level = "Disabled"

[streaming.developer]
stream_chunk_size = 1

[system]
# NOTE(kwannoel): If can't reproduce it, set to a lower number.
# This will throttle snapshot read.
# barrier_interval_ms = 1
checkpoint_frequency = 10
barrier_interval_ms = 100
max_concurrent_creating_streaming_jobs = 0
Loading

0 comments on commit fd63597

Please sign in to comment.