Skip to content

Commit

Permalink
correct handle incoming sink
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Nov 14, 2023
1 parent be5dd6c commit d492e7e
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
6 changes: 4 additions & 2 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ pub async fn handle_create_sink(
panic!("unexpected statement type: {:?}", definition);
};

let (mut graph, table, source) = generate_stream_graph_for_table(
let (mut graph, mut table, source) = generate_stream_graph_for_table(
&session,
target_table.unwrap(),
&table_catalog,
Expand All @@ -413,6 +413,8 @@ pub async fn handle_create_sink(
)
.await?;

table.incoming_sinks = table_catalog.incoming_sinks.clone();

fn insert_merger_to_union(node: &mut StreamNode) {
if let Some(NodeBody::Union(_union_node)) = &mut node.node_body {
node.input.push(StreamNode {
Expand Down Expand Up @@ -455,7 +457,7 @@ pub async fn handle_create_sink(

target_table_replace_plan = Some(ReplaceTablePlan {
source,
table: Some(table.clone()),
table: Some(table),
fragment_graph: Some(graph),
table_col_index_mapping: Some(col_index_mapping.to_protobuf()),
});
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2441,12 +2441,13 @@ impl CatalogManager {
database_core.in_progress_creation_tracker.remove(&key);

let mut table = table.clone();
table.stream_job_status = PbStreamJobStatus::Created.into();

if let Some(incoming_sink_id) = incoming_sink_id {
table.incoming_sinks.push(incoming_sink_id);
}

table.stream_job_status = PbStreamJobStatus::Created.into();

tables.insert(table.id, table.clone());

commit_meta!(self, tables, indexes, sources)?;
Expand Down

0 comments on commit d492e7e

Please sign in to comment.