Skip to content

Commit

Permalink
support scaling source backfill
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>

add notes

Signed-off-by: xxchan <[email protected]>

Merge remote-tracking branch 'origin/main' into xxchan/scale-source

add sim test

discard minor changes

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Sep 2, 2024
1 parent 8f0f545 commit f430814
Show file tree
Hide file tree
Showing 16 changed files with 567 additions and 69 deletions.
1 change: 1 addition & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ extend-exclude = [
# We don't want to fix "fals" here, but may want in other places.
# Ideally, we should just ignore that line: https://github.com/crate-ci/typos/issues/316
"src/common/src/cast/mod.rs",
"src/tests/simulation/tests/integration_tests/scale/shared_source.rs",
]
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ echo "> inserted new rows into postgres"

# start cluster w/o clean-data
unset RISINGWAVE_CI
export RUST_LOG="risingwave_stream=debug,risingwave_batch=info,risingwave_storage=info" \
export RUST_LOG="risingwave_stream=debug,risingwave_batch=info,risingwave_storage=info"

risedev dev ci-1cn-1fe-with-recovery
echo "> wait for cluster recovery finish"
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/commands/risectl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash

RUST_LOG="error" .risingwave/bin/risingwave/risectl "$@"
97 changes: 89 additions & 8 deletions e2e_test/source_inline/kafka/shared_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ create source s0 (v1 int, v2 varchar) with (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

query I
query ?
select count(*) from rw_internal_tables where name like '%s0%';
----
1
Expand Down Expand Up @@ -67,23 +67,23 @@ create materialized view mv_2 as select * from s0;

sleep 2s

query IT rowsort
query ?? rowsort
select v1, v2 from s0;
----
1 a
2 b
3 c
4 d

query IT rowsort
query ?? rowsort
select v1, v2 from mv_1;
----
1 a
2 b
3 c
4 d

query IT rowsort
query ?? rowsort
select v1, v2 from mv_2;
----
1 a
Expand Down Expand Up @@ -111,7 +111,7 @@ internal_table.mjs --name s0 --type source
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


query IT rowsort
query ?? rowsort
select v1, v2 from s0;
----
1 a
Expand All @@ -123,7 +123,7 @@ select v1, v2 from s0;
4 d
4 dd

query IT rowsort
query ?? rowsort
select v1, v2 from mv_1;
----
1 a
Expand Down Expand Up @@ -173,15 +173,15 @@ done

sleep 3s

query IT rowsort
query ?? rowsort
select v1, count(*) from s0 group by v1;
----
1 12
2 12
3 12
4 12

query IT rowsort
query ?? rowsort
select v1, count(*) from mv_1 group by v1;
----
1 12
Expand Down Expand Up @@ -210,5 +210,86 @@ internal_table.mjs --name mv_1 --type sourcebackfill
3,"""Finished"""


# # Note: the parallelism depends on the risedev profile.
# # So scale tests below are commented out.

# query ???
# select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;
# ----
# mv_1 {MVIEW,SOURCE_SCAN} 5
# mv_2 {MVIEW,SOURCE_SCAN} 5
# s0 {SOURCE} 5


# system ok
# risectl meta source-split-info --ignore-id
# ----
# Table
# Fragment (Source)
# Actor (1 splits): [0]
# Actor (1 splits): [2]
# Actor (1 splits): [3]
# Actor (1 splits): [1]
# Actor (0 splits): []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []


# # scale down
# statement ok
# ALTER MATERIALIZED VIEW mv_1 SET PARALLELISM TO 2;

# # should have no effect, because of NoShuffle
# # TODO: support ALTER SOURCE SET PARALLELISM, then we can
# query ???
# select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;
# ----
# mv_1 {MVIEW,SOURCE_SCAN} 5
# mv_2 {MVIEW,SOURCE_SCAN} 5
# s0 {SOURCE} 5

# system ok
# risectl meta source-split-info --ignore-id
# ----
# Table
# Fragment (Source)
# Actor (1 splits): [0]
# Actor (1 splits): [2]
# Actor (1 splits): [3]
# Actor (1 splits): [1]
# Actor (0 splits): []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []


# # Manual test: change the parallelism of the compute node, kill and restart, and check
# # risedev ctl meta source-split-info --ignore-id
# # risedev psql -c "select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;"


statement ok
drop source s0 cascade;
94 changes: 85 additions & 9 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub async fn get_cluster_info(context: &CtlContext) -> anyhow::Result<GetCluster
Ok(response)
}

pub async fn source_split_info(context: &CtlContext) -> anyhow::Result<()> {
pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow::Result<()> {
let GetClusterInfoResponse {
worker_nodes: _,
source_infos: _,
Expand All @@ -40,37 +40,113 @@ pub async fn source_split_info(context: &CtlContext) -> anyhow::Result<()> {
revision: _,
} = get_cluster_info(context).await?;

let mut actor_splits_map: BTreeMap<u32, String> = BTreeMap::new();

// build actor_splits_map
for table_fragment in &table_fragments {
if table_fragment.actor_splits.is_empty() {
continue;
}

println!("Table #{}", table_fragment.table_id);

for fragment in table_fragment.fragments.values() {
let fragment_type_mask = fragment.fragment_type_mask;
if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
|| fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0
&& fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
{
// no source or source backfill
continue;
}
if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
// skip dummy source for dml fragment
continue;
}

println!("\tFragment #{}", fragment.fragment_id);
for actor in &fragment.actors {
if let Some(ConnectorSplits { splits }) = actor_splits.remove(&actor.actor_id) {
let splits = splits
.iter()
.map(|split| SplitImpl::try_from(split).unwrap())
.map(|split| split.id())
.collect_vec();
.collect_vec()
.join(",");
actor_splits_map.insert(actor.actor_id, splits);
}
}
}
}

// print in the second iteration. Otherwise we don't have upstream splits info
for table_fragment in &table_fragments {
if table_fragment.actor_splits.is_empty() {
continue;
}
if ignore_id {
println!("Table");
} else {
println!("Table #{}", table_fragment.table_id);
}
for fragment in table_fragment.fragments.values() {
let fragment_type_mask = fragment.fragment_type_mask;
if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
&& fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
{
// no source or source backfill
continue;
}
if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
// skip dummy source for dml fragment
continue;
}

println!(
"\tFragment{} ({})",
if ignore_id {
"".to_string()
} else {
format!(" #{}", fragment.fragment_id)
},
if fragment_type_mask == FragmentTypeFlag::Source as u32 {
"Source"
} else {
"SourceScan"
}
);
for actor in &fragment.actors {
if let Some(splits) = actor_splits_map.get(&actor.actor_id) {
println!(
"\t\tActor #{:<3} ({}): [{}]",
actor.actor_id,
"\t\tActor{} ({} splits): [{}]{}",
if ignore_id {
"".to_string()
} else {
format!(" #{:<3}", actor.actor_id,)
},
splits.len(),
splits.join(",")
splits,
if !actor.upstream_actor_id.is_empty() {
assert!(
actor.upstream_actor_id.len() == 1,
"should have only one upstream actor, got {actor:?}"
);
let upstream_splits =
actor_splits_map.get(&actor.upstream_actor_id[0]).unwrap();
format!(
" <- Upstream Actor{}: [{}]",
if ignore_id {
"".to_string()
} else {
format!(" #{}", actor.upstream_actor_id[0])
},
upstream_splits
)
} else {
"".to_string()
}
);
} else {
println!(
"\t\tError: Actor #{:<3} (not found in actor_splits)",
actor.actor_id,
)
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,10 @@ enum MetaCommands {
/// get cluster info
ClusterInfo,
/// get source split info
SourceSplitInfo,
SourceSplitInfo {
#[clap(long)]
ignore_id: bool,
},
/// Reschedule the actors in the stream graph
///
/// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
Expand Down Expand Up @@ -808,8 +811,8 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
Commands::Meta(MetaCommands::SourceSplitInfo) => {
cmd_impl::meta::source_split_info(context).await?
Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
cmd_impl::meta::source_split_info(context, ignore_id).await?
}
Commands::Meta(MetaCommands::Reschedule {
from,
Expand Down
1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ clap = { workspace = true }
comfy-table = "7"
crepe = "0.1"
easy-ext = "1"
educe = "0.6"
either = "1"
enum-as-inner = "0.6"
etcd-client = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,14 @@ impl DdlController {
}
}

#[tracing::instrument(skip(self), level = "debug")]
pub async fn alter_parallelism(
&self,
table_id: u32,
parallelism: PbTableParallelism,
mut deferred: bool,
) -> MetaResult<()> {
tracing::info!("alter parallelism");
if self.barrier_manager.check_status_running().is_err() {
tracing::info!(
"alter parallelism is set to deferred mode because the system is in recovery state"
Expand Down
Loading

0 comments on commit f430814

Please sign in to comment.