Skip to content

Commit

Permalink
Refactor catalog.rs to handle table sinks with PartialObject
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Feb 20, 2024
1 parent 8839317 commit 1796209
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 20 deletions.
41 changes: 24 additions & 17 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_meta_model_v2::{
actor, connection, database, fragment, function, index, object, object_dependency, schema,
sink, source, streaming_job, table, user_privilege, view, ActorId, ActorUpstreamActors,
ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, I32Array,
IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SinkId, SourceId, StreamSourceInfo,
IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SourceId, StreamSourceInfo,
StreamingParallelism, TableId, UserId,
};
use risingwave_pb::catalog::table::PbTableType;
Expand Down Expand Up @@ -2084,22 +2084,28 @@ impl CatalogController {
});
}};
}
let objs = get_referring_objects(object_id, &txn).await?;
// TODO: For sink into table. when sink into table is ready.
// if object_type == ObjectType::Table {
// let incoming_sinks: Vec<_> = Table::find_by_id(object_id)
// .select_only()
// .column(table::Column::IncomingSinks)
// .into_tuple()
// .one(&txn)
// .await?
// .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
// objs.extend(incoming_sinks.into_iter().map(|id| PartialObject {
// oid: id as _,
// obj_type: ObjectType::Sink,
// ..Default::default()
// }));
// }
let mut objs = get_referring_objects(object_id, &txn).await?;
if object_type == ObjectType::Table {
let incoming_sinks: I32Array = Table::find_by_id(object_id)
.select_only()
.column(table::Column::IncomingSinks)
.into_tuple()
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;

objs.extend(
incoming_sinks
.into_inner()
.into_iter()
.map(|id| PartialObject {
oid: id,
obj_type: ObjectType::Sink,
schema_id: None,
database_id: None,
}),
);
}

for obj in objs {
match obj.obj_type {
Expand Down Expand Up @@ -2287,6 +2293,7 @@ impl CatalogController {
.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
.collect())
}

pub async fn find_creating_streaming_job_ids(
&self,
infos: Vec<PbCreatingJobInfo>,
Expand Down
19 changes: 19 additions & 0 deletions src/meta/src/controller/rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str
stmt:
CreateSinkStatement {
sink_from: CreateSink::AsQuery(query),
into_table_name: None,
..
},
} => {
Expand All @@ -89,9 +90,27 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str
stmt:
CreateSinkStatement {
sink_from: CreateSink::From(table_name),
into_table_name: None,
..
},
} => replace_table_name(table_name, to),
Statement::CreateSink {
stmt: CreateSinkStatement {
sink_from,
into_table_name: Some(table_name),
..
}
} => {
let idx = table_name.0.len() - 1;
if table_name.0[idx].real_value() == from {
table_name.0[idx] = Ident::new_unchecked(to);
} else {
match sink_from {
CreateSink::From(table_name) => replace_table_name(table_name, to),
CreateSink::AsQuery(query) => QueryRewriter::rewrite_query(query, from, to),
}
}
}
_ => unreachable!(),
};
stmt.to_string()
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl CatalogController {
let table: table::ActiveModel = table.clone().into();
Table::insert(table).exec(&txn).await?;
}
StreamingJob::Sink(sink, s) => {
StreamingJob::Sink(sink, _) => {
if let Some(target_table_id) = sink.target_table {
if Self::check_cycle_for_table_sink(target_table_id, sink, &txn).await? {
bail!("Creating such a sink will result in circular dependency.");
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -38,7 +38,6 @@ use risingwave_connector::source::{
};
use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::ObjectId;
use risingwave_pb::catalog;
use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider;
use risingwave_pb::catalog::connection::PrivateLinkService;
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
Expand Down

0 comments on commit 1796209

Please sign in to comment.