From a1e2562420b844f148029547bfc120edb299ebfe Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 9 Jul 2024 18:19:58 +0800 Subject: [PATCH] update --- risedev.yml | 18 ++++++++++++++++++ src/meta/src/rpc/ddl_controller.rs | 13 +++++++------ src/meta/src/stream/stream_graph/fragment.rs | 7 ++++--- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/risedev.yml b/risedev.yml index 984d92ab33d88..1c3206d7514b6 100644 --- a/risedev.yml +++ b/risedev.yml @@ -131,6 +131,24 @@ profile: - use: grafana - use: kafka persist-data: true + wtf: + steps: + - use: meta-node + - use: compute-node + - use: frontend + - use: postgres + port: 8432 + user-managed: true + application: "connector" + user: myuser + password: "123456" + database: "mydb" + - use: mysql + port: 8306 + user-managed: true + user: root + password: "123456" + database: "mydb" standalone-minio-etcd: steps: diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index c2941453ea122..b4f8f13c3fd55 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1955,7 +1955,7 @@ impl DdlController { }; // Map the column indices in the dispatchers with the given mapping. - let (downstream_fragments, downstream_actor_locations) = self + let (downstream_fragments, downstream_actor_location) = self .metadata_manager .get_downstream_chain_fragments(id) .await?; @@ -1981,25 +1981,26 @@ impl DdlController { fragment_graph, original_table_fragment.fragment_id, downstream_fragments, - downstream_actor_locations, + downstream_actor_location, ddl_type, )?, TableJobType::SharedCdcSource => { // get the upstream fragment which should be the cdc source - let (upstream_root_fragments, existing_actor_location) = self + let (upstream_root_fragments, upstream_actor_location) = self .metadata_manager .get_upstream_root_fragments(fragment_graph.dependent_table_ids()) .await?; - println!("downstream {:?}", downstream_actor_locations); - println!("existing {:?}", existing_actor_location); + println!("downstream {:?}", downstream_actor_location); + println!("existing {:?}", upstream_actor_location); CompleteStreamFragmentGraph::with_upstreams_and_downstreams( fragment_graph, upstream_root_fragments, + upstream_actor_location, original_table_fragment.fragment_id, downstream_fragments, - existing_actor_location, + downstream_actor_location, ddl_type, )? } diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 6cace1bb022c2..bd7d16eddec75 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -620,21 +620,22 @@ impl CompleteStreamFragmentGraph { pub fn with_upstreams_and_downstreams( graph: StreamFragmentGraph, upstream_root_fragments: HashMap, + upstream_actor_location: HashMap, original_table_fragment_id: FragmentId, downstream_fragments: Vec<(DispatchStrategy, Fragment)>, - existing_actor_location: HashMap, + downstream_actor_location: HashMap, ddl_type: DdlType, ) -> MetaResult { Self::build_helper( graph, Some(FragmentGraphUpstreamContext { upstream_root_fragments, - upstream_actor_location: existing_actor_location.clone(), + upstream_actor_location, }), Some(FragmentGraphDownstreamContext { original_table_fragment_id, downstream_fragments, - downstream_actor_location: existing_actor_location, + downstream_actor_location, }), ddl_type, )