Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jul 9, 2024
1 parent 5ca85b9 commit a1e2562
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
18 changes: 18 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,24 @@ profile:
- use: grafana
- use: kafka
persist-data: true
wtf:
steps:
- use: meta-node
- use: compute-node
- use: frontend
- use: postgres
port: 8432
user-managed: true
application: "connector"
user: myuser
password: "123456"
database: "mydb"
- use: mysql
port: 8306
user-managed: true
user: root
password: "123456"
database: "mydb"

standalone-minio-etcd:
steps:
Expand Down
13 changes: 7 additions & 6 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1955,7 +1955,7 @@ impl DdlController {
};

// Map the column indices in the dispatchers with the given mapping.
let (downstream_fragments, downstream_actor_locations) = self
let (downstream_fragments, downstream_actor_location) = self
.metadata_manager
.get_downstream_chain_fragments(id)
.await?;
Expand All @@ -1981,25 +1981,26 @@ impl DdlController {
fragment_graph,
original_table_fragment.fragment_id,
downstream_fragments,
downstream_actor_locations,
downstream_actor_location,
ddl_type,
)?,

TableJobType::SharedCdcSource => {
// get the upstream fragment which should be the cdc source
let (upstream_root_fragments, existing_actor_location) = self
let (upstream_root_fragments, upstream_actor_location) = self
.metadata_manager
.get_upstream_root_fragments(fragment_graph.dependent_table_ids())
.await?;

println!("downstream {:?}", downstream_actor_locations);
println!("existing {:?}", existing_actor_location);
println!("downstream {:?}", downstream_actor_location);
println!("existing {:?}", upstream_actor_location);
CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
fragment_graph,
upstream_root_fragments,
upstream_actor_location,
original_table_fragment.fragment_id,
downstream_fragments,
existing_actor_location,
downstream_actor_location,
ddl_type,
)?
}
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,21 +620,22 @@ impl CompleteStreamFragmentGraph {
pub fn with_upstreams_and_downstreams(
graph: StreamFragmentGraph,
upstream_root_fragments: HashMap<TableId, Fragment>,
upstream_actor_location: HashMap<ActorId, u32>,
original_table_fragment_id: FragmentId,
downstream_fragments: Vec<(DispatchStrategy, Fragment)>,
existing_actor_location: HashMap<ActorId, u32>,
downstream_actor_location: HashMap<ActorId, u32>,
ddl_type: DdlType,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Some(FragmentGraphUpstreamContext {
upstream_root_fragments,
upstream_actor_location: existing_actor_location.clone(),
upstream_actor_location,
}),
Some(FragmentGraphDownstreamContext {
original_table_fragment_id,
downstream_fragments,
downstream_actor_location: existing_actor_location,
downstream_actor_location,
}),
ddl_type,
)
Expand Down

0 comments on commit a1e2562

Please sign in to comment.