Skip to content

Commit

Permalink
Code changes: Imports, function rename, argument modifications, varia…
Browse files Browse the repository at this point in the history
…ble rename, new arguments.
  • Loading branch information
shanicky committed Nov 27, 2023
1 parent 0de805a commit dfe4e5e
Showing 1 changed file with 32 additions and 12 deletions.
44 changes: 32 additions & 12 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ use std::sync::Arc;
use std::time::Duration;

use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog;
use risingwave_common::config::DefaultParallelism;
use risingwave_common::hash::{ParallelUnitMapping, VirtualNode};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::catalog;
use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont};
use risingwave_common::{bail, catalog};
use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider;
use risingwave_pb::catalog::{
connection, Comment, Connection, CreateType, Database, Function, Schema, Sink, Source, Table,
Expand Down Expand Up @@ -573,11 +571,7 @@ impl DdlController {
}
}

async fn check_cycle_for_sink(
&self,
sink: &catalog::Sink,
table_id: TableId,
) -> MetaResult<bool> {
async fn check_cycle_for_sink(&self, sink: &Sink, table_id: TableId) -> MetaResult<bool> {
let reader = self.catalog_manager.get_catalog_core_guard().await;

let mut q: VecDeque<RelationIdEnum> = VecDeque::new();
Expand Down Expand Up @@ -625,7 +619,7 @@ impl DdlController {
// Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream.
// The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function.
// Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here.
async fn inject_replace_table_job(
async fn inject_replace_table_job_for_table_sink(
&self,
env: StreamEnvironment,
sink: Option<&Sink>,
Expand Down Expand Up @@ -686,6 +680,7 @@ impl DdlController {
let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap();

Self::inject_replace_table_plan_for_sink(
sink.map(|sink| sink.id),
&sink_fragment,
table,
&mut replace_table_ctx,
Expand Down Expand Up @@ -719,6 +714,7 @@ impl DdlController {
let sink_fragment = sink_table_fragments.sink_fragment().unwrap();

Self::inject_replace_table_plan_for_sink(
Some(*sink_id),
&sink_fragment,
table,
&mut replace_table_ctx,
Expand All @@ -728,6 +724,19 @@ impl DdlController {
}
}

// check if the union fragment is fully assigned.
for fragment in table_fragments.fragments.values_mut() {
for actor in &mut fragment.actors {
if let Some(node) = &mut actor.nodes {
visit_stream_node(node, |node| {
if let NodeBody::Merge(merge_node) = node {
assert!(!merge_node.upstream_actor_id.is_empty(), "All the mergers for the union should have been fully assigned beforehand.");
}
});
}
}
}

Ok(ReplaceTableJob {
streaming_job,
context: Some(replace_table_ctx),
Expand All @@ -737,6 +746,7 @@ impl DdlController {
}

fn inject_replace_table_plan_for_sink(
sink_id: Option<u32>,
sink_fragment: &PbFragment,
table: &Table,
replace_table_ctx: &mut ReplaceTableContext,
Expand Down Expand Up @@ -808,6 +818,10 @@ impl DdlController {
if let Some(NodeBody::Union(_)) = &mut node.node_body {
for input in &mut node.input {
if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body && merge_node.upstream_actor_id.is_empty() {
if let Some(sink_id) = sink_id {
input.identity = format!("MergeExecutor(from sink {})", sink_id);
}

*merge_node = MergeNode {
upstream_actor_id: sink_actor_ids.clone(),
upstream_fragment_id,
Expand Down Expand Up @@ -959,7 +973,13 @@ impl DdlController {
table_fragments,
col_index_mapping,
} = self
.inject_replace_table_job(env, None, None, Some(sink_id), replace_table_info)
.inject_replace_table_job_for_table_sink(
env,
None,
None,
Some(sink_id),
replace_table_info,
)
.await?;

self.stream_manager
Expand Down Expand Up @@ -1147,9 +1167,9 @@ impl DdlController {
};

Some(
self.inject_replace_table_job(
self.inject_replace_table_job_for_table_sink(
env,
sink,
Some(sink),
Some(&table_fragments),
None,
replace_table_info,
Expand Down

0 comments on commit dfe4e5e

Please sign in to comment.