Skip to content

Commit

Permalink
Implement unique identities for sinks across SinkCatalog, handlers,…
Browse files Browse the repository at this point in the history
… `DdlController`, and `catalog::Sink`.
  • Loading branch information
shanicky committed Jul 31, 2024
1 parent d23ab4b commit 66b79d7
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 31 deletions.
5 changes: 5 additions & 0 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,11 @@ impl SinkCatalog {
pub fn downstream_pk_indices(&self) -> Vec<usize> {
self.downstream_pk.clone()
}

pub fn unique_identity(&self) -> String {
// We need to align with meta here, so we've utilized the proto method.
self.to_proto().unique_identity()
}
}

impl From<PbSink> for SinkCatalog {
Expand Down
17 changes: 8 additions & 9 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ pub async fn replace_table_with_definition(
.collect_vec();

for sink in fetch_incoming_sinks(session, &incoming_sink_ids)? {
hijack_merger_for_target_table(&mut graph, &target_columns, &sink)?;
hijack_merger_for_target_table(
&mut graph,
&target_columns,
&sink,
Some(&sink.unique_identity()),
)?;
}

table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
Expand All @@ -124,6 +129,7 @@ pub(crate) fn hijack_merger_for_target_table(
graph: &mut StreamFragmentGraph,
target_columns: &[ColumnCatalog],
sink: &SinkCatalog,
uniq_identify: Option<&str>,
) -> Result<()> {
let mut sink_columns = sink.original_target_columns.clone();
if sink_columns.is_empty() {
Expand Down Expand Up @@ -164,14 +170,7 @@ pub(crate) fn hijack_merger_for_target_table(

for fragment in graph.fragments.values_mut() {
if let Some(node) = &mut fragment.node {
insert_merger_to_union_with_project(
node,
&pb_project,
&format!(
"{}.{}.{}",
sink.database_id.database_id, sink.schema_id.schema_id, sink.name
),
);
insert_merger_to_union_with_project(node, &pb_project, uniq_identify);
}
}

Expand Down
23 changes: 15 additions & 8 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ pub async fn handle_create_sink(

let mut target_table_replace_plan = None;
if let Some(table_catalog) = target_table_catalog {
use crate::handler::alter_table_column::hijack_merger_for_target_table;

check_cycle_for_sink(session.as_ref(), sink.clone(), table_catalog.id())?;

let (mut graph, mut table, source) =
Expand All @@ -470,16 +472,20 @@ pub async fn handle_create_sink(
.clone_from(&table_catalog.incoming_sinks);

let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect();
let mut incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;
incoming_sinks.push(Arc::new(sink.clone()));
for sink in incoming_sinks {
crate::handler::alter_table_column::hijack_merger_for_target_table(
let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;

for existing_sink in incoming_sinks {
hijack_merger_for_target_table(
&mut graph,
table_catalog.columns(),
&sink,
&existing_sink,
Some(&existing_sink.unique_identity()),
)?;
}

// for new creating sink, we don't have a unique identity because the sink id is not generated yet.
hijack_merger_for_target_table(&mut graph, table_catalog.columns(), &sink, None)?;

target_table_replace_plan = Some(ReplaceTablePlan {
source,
table: Some(table),
Expand Down Expand Up @@ -696,17 +702,18 @@ pub(crate) async fn reparse_table_for_sink(
pub(crate) fn insert_merger_to_union_with_project(
node: &mut StreamNode,
project_node: &PbNodeBody,
uniq_name: &str,
uniq_identity: Option<&str>,
) {
if let Some(NodeBody::Union(_union_node)) = &mut node.node_body {
// TODO: MergeNode is used as a placeholder, see issue #17658
node.input.push(StreamNode {
input: vec![StreamNode {
node_body: Some(NodeBody::Merge(MergeNode {
..Default::default()
})),
..Default::default()
}],
identity: uniq_name.to_string(),
identity: uniq_identity.unwrap_or("").to_string(),
fields: node.fields.clone(),
node_body: Some(project_node.clone()),
..Default::default()
Expand All @@ -716,7 +723,7 @@ pub(crate) fn insert_merger_to_union_with_project(
}

for input in &mut node.input {
insert_merger_to_union_with_project(input, project_node, uniq_name);
insert_merger_to_union_with_project(input, project_node, uniq_identity);
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/handler/drop_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ pub async fn handle_drop_sink(
assert!(incoming_sink_ids.remove(&sink_id.sink_id));

for sink in fetch_incoming_sinks(&session, &incoming_sink_ids)? {
hijack_merger_for_target_table(&mut graph, table_catalog.columns(), &sink)?;
hijack_merger_for_target_table(
&mut graph,
table_catalog.columns(),
&sink,
Some(&sink.unique_identity()),
)?;
}

affected_table_change = Some(ReplaceTablePlan {
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ impl CatalogManager {
Ok(())
}

// Fill in the original_target_columns that wasn't written in the previous version for the table sink.
async fn table_sink_catalog_update(&self) -> MetaResult<()> {
let core = &mut *self.core.lock().await;
let mut sinks = BTreeMapTransaction::new(&mut core.database.sinks);
Expand Down
18 changes: 8 additions & 10 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1092,15 +1092,14 @@ impl DdlController {
if let Some(creating_sink_table_fragments) = creating_sink_table_fragments {
let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap();
let sink = sink.expect("sink not found");
let uniq_name = &format!("{}.{}.{}", sink.database_id, sink.schema_id, sink.name);
Self::inject_replace_table_plan_for_sink(
Some(sink.id),
&sink_fragment,
target_table,
&mut replace_table_ctx,
&mut table_fragments,
target_fragment_id,
uniq_name,
None,
);
}

Expand All @@ -1126,8 +1125,6 @@ impl DdlController {
continue;
};

let uniq_name = &format!("{}.{}.{}", sink.database_id, sink.schema_id, sink.name);

let sink_table_fragments = mgr
.get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id))
.await?;
Expand All @@ -1141,7 +1138,7 @@ impl DdlController {
&mut replace_table_ctx,
&mut table_fragments,
target_fragment_id,
uniq_name,
Some(&sink.unique_identity()),
);
}
}
Expand Down Expand Up @@ -1169,7 +1166,7 @@ impl DdlController {
replace_table_ctx: &mut ReplaceTableContext,
table_fragments: &mut TableFragments,
target_fragment_id: FragmentId,
uniq_name: &str,
unique_identity: Option<&str>,
) {
let sink_actor_ids = sink_fragment
.actors
Expand Down Expand Up @@ -1254,7 +1251,10 @@ impl DdlController {
let merge_stream_node =
input_project_node.input.iter_mut().exactly_one().unwrap();

if input_project_node.identity.as_str() != uniq_name {
// we need to align nodes here
if input_project_node.identity.as_str()
!= unique_identity.unwrap_or("")
{
continue;
}

Expand Down Expand Up @@ -1897,8 +1897,6 @@ impl DdlController {
for sink in catalogs {
let sink_id = &sink.id;

let uniq_name = &format!("{}.{}.{}", sink.database_id, sink.schema_id, sink.name);

let sink_table_fragments = self
.metadata_manager
.get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id))
Expand All @@ -1913,7 +1911,7 @@ impl DdlController {
&mut ctx,
&mut table_fragments,
target_fragment_id,
uniq_name,
Some(&sink.unique_identity()),
);

if sink.original_target_columns.is_empty() {
Expand Down
4 changes: 1 addition & 3 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,6 @@ impl DdlController {
for sink in catalogs {
let sink_id = &sink.id;

let uniq_name = &format!("{}.{}.{}", sink.database_id, sink.schema_id, sink.name);

let sink_table_fragments = self
.metadata_manager
.get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id))
Expand All @@ -503,7 +501,7 @@ impl DdlController {
&mut ctx,
&mut table_fragments,
target_fragment_id,
uniq_name,
Some(&sink.unique_identity()),
);

if sink.original_target_columns.is_empty() {
Expand Down
8 changes: 8 additions & 0 deletions src/prost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,14 @@ impl catalog::StreamSourceInfo {
}
}

impl catalog::Sink {
pub fn unique_identity(&self) -> String {
// TODO: use a more unique name
// We don't use sink.id because it's generated by the meta node .
format!("{}.{}.{}", self.database_id, self.schema_id, self.name)
}
}

#[cfg(test)]
mod tests {
use crate::data::{data_type, DataType};
Expand Down

0 comments on commit 66b79d7

Please sign in to comment.