Skip to content

Commit

Permalink
fix: fix rename of target table for sink into table
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Jan 30, 2024
1 parent 11b02eb commit e445dc4
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
16 changes: 16 additions & 0 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,22 @@ impl CatalogController {
}};
}
let objs = get_referring_objects(object_id, &txn).await?;
// TODO: For sink into table. when sink into table is ready.
// if object_type == ObjectType::Table {
// let incoming_sinks: Vec<_> = Table::find_by_id(object_id)
// .select_only()
// .column(table::Column::IncomingSinks)
// .into_tuple()
// .one(&txn)
// .await?
// .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
// objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
// oid: id as _,
// obj_type: ObjectType::Sink,
// ..Default::default()
// }));
// }

for obj in objs {
match obj.obj_type {
ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid),
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,9 @@ impl CatalogManager {
}

for sink in database_mgr.sinks.values() {
if sink.dependent_relations.contains(&relation_id) {
if sink.dependent_relations.contains(&relation_id)
|| sink.target_table == Some(relation_id)
{
let mut sink = sink.clone();
sink.definition = alter_relation_rename_refs(&sink.definition, from, to);
to_update_sinks.push(sink);
Expand Down
19 changes: 19 additions & 0 deletions src/meta/src/manager/catalog/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str
stmt:
CreateSinkStatement {
sink_from: CreateSink::AsQuery(query),
into_table_name: None,
..
},
} => {
Expand All @@ -117,9 +118,27 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str
stmt:
CreateSinkStatement {
sink_from: CreateSink::From(table_name),
into_table_name: None,
..
},
} => replace_table_name(table_name, to),
Statement::CreateSink {
stmt: CreateSinkStatement {
sink_from,
into_table_name: Some(table_name),
..
}
} => {
let idx = table_name.0.len() - 1;
if table_name.0[idx].real_value() == from {
table_name.0[idx] = Ident::new_unchecked(to);
} else {
match sink_from {
CreateSink::From(table_name) => replace_table_name(table_name, to),
CreateSink::AsQuery(query) => QueryRewriter::rewrite_query(query, from, to),
}
}
}
_ => unreachable!(),
};
stmt.to_string()
Expand Down

0 comments on commit e445dc4

Please sign in to comment.