diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index b6e1486fd1e5..72af9d7d2d79 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -86,6 +86,8 @@ message CreateSinkRequest { stream_plan.StreamFragmentGraph fragment_graph = 2; // It is used to provide a replace plan for the downstream table in `create sink into table` requests. optional ReplaceTablePlan affected_table_change = 3; + // The list of object IDs that this materialized view depends on. + repeated uint32 dependencies = 4; } message CreateSinkResponse { diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 5f097eafa33c..b5c11aa4d4aa 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -91,7 +91,6 @@ impl SinkDesc { database_id: DatabaseId, owner: UserId, connection_id: Option, - dependent_relations: Vec, ) -> SinkCatalog { SinkCatalog { id: self.id, @@ -104,7 +103,7 @@ impl SinkDesc { downstream_pk: self.downstream_pk, distribution_key: self.distribution_key, owner, - dependent_relations, + dependent_relations: vec![], // TODO(rc): to be deprecated properties: self.properties, secret_refs: self.secret_refs, sink_type: self.sink_type, diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index ed3af3604952..cc7cac86570c 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -117,6 +117,7 @@ pub trait CatalogWriter: Send + Sync { sink: PbSink, graph: StreamFragmentGraph, affected_table_change: Option, + dependencies: HashSet, ) -> Result<()>; async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>; @@ -319,10 +320,11 @@ impl CatalogWriter for CatalogWriterImpl { sink: PbSink, graph: StreamFragmentGraph, affected_table_change: Option, + dependencies: HashSet, ) -> Result<()> { let version = self .meta_client - .create_sink(sink, graph, affected_table_change) + .create_sink(sink, graph, affected_table_change, dependencies) .await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index a226d4f8b08e..573673950a0c 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -22,7 +22,9 @@ use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType; use risingwave_common::array::arrow::IcebergArrowConvert; -use risingwave_common::catalog::{ColumnCatalog, DatabaseId, Schema, SchemaId, TableId, UserId}; +use risingwave_common::catalog::{ + ColumnCatalog, DatabaseId, ObjectId, Schema, SchemaId, TableId, UserId, +}; use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; use risingwave_common::{bail, catalog}; @@ -75,6 +77,7 @@ pub struct SinkPlanContext { pub sink_plan: PlanRef, pub sink_catalog: SinkCatalog, pub target_table_catalog: Option>, + pub dependencies: HashSet, } pub async fn gen_sink_plan( @@ -125,10 +128,14 @@ pub async fn gen_sink_plan( let (sink_database_id, sink_schema_id) = session.get_database_and_schema_id_for_create(sink_schema_name.clone())?; - let (dependent_relations, bound) = { + let (dependent_relations, dependent_udfs, bound) = { let mut binder = Binder::new_for_stream(session); let bound = binder.bind_query(*query.clone())?; - (binder.included_relations().clone(), bound) + ( + binder.included_relations().clone(), + binder.included_udfs().clone(), + bound, + ) }; let check_items = resolve_query_privileges(&bound); @@ -253,15 +260,22 @@ pub async fn gen_sink_plan( ctx.trace(sink_plan.explain_to_string()); } - let dependent_relations = - RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone()); + let dependencies = + RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone()) + .into_iter() + .map(|id| id.table_id() as ObjectId) + .chain( + dependent_udfs + .into_iter() + .map(|id| id.function_id() as ObjectId), + ) + .collect(); let sink_catalog = sink_desc.into_catalog( SchemaId::new(sink_schema_id), DatabaseId::new(sink_database_id), UserId::new(session.user_id()), None, // deprecated: private link connection id - dependent_relations.into_iter().collect_vec(), ); if let Some(table_catalog) = &target_table_catalog { @@ -314,6 +328,7 @@ pub async fn gen_sink_plan( sink_plan, sink_catalog, target_table_catalog, + dependencies, }) } @@ -428,12 +443,13 @@ pub async fn handle_create_sink( return Ok(resp); } - let (mut sink, graph, target_table_catalog) = { + let (mut sink, graph, target_table_catalog, dependencies) = { let SinkPlanContext { query, sink_plan: plan, sink_catalog: sink, target_table_catalog, + dependencies, } = gen_sink_plan(handle_args, stmt, None).await?; let has_order_by = !query.order_by.is_empty(); @@ -446,7 +462,7 @@ pub async fn handle_create_sink( let graph = build_graph(plan)?; - (sink, graph, target_table_catalog) + (sink, graph, target_table_catalog, dependencies) }; let mut target_table_replace_plan = None; @@ -506,7 +522,12 @@ pub async fn handle_create_sink( let catalog_writer = session.catalog_writer()?; catalog_writer - .create_sink(sink.to_proto(), graph, target_table_replace_plan) + .create_sink( + sink.to_proto(), + graph, + target_table_replace_plan, + dependencies, + ) .await?; Ok(PgResponse::empty_result(StatementType::CREATE_SINK)) diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 64c667a7573d..fac591e15b38 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -343,6 +343,7 @@ impl CatalogWriter for MockCatalogWriter { sink: PbSink, graph: StreamFragmentGraph, _affected_table_change: Option, + _dependencies: HashSet, ) -> Result<()> { self.create_sink_inner(sink, graph) } diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 354d1cc46963..ad6f5ba38942 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -24,6 +24,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkId; use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; use risingwave_meta::rpc::metrics::MetaMetrics; +use risingwave_meta_model::ObjectId; use risingwave_pb::catalog::{Comment, CreateType, Secret, Table}; use risingwave_pb::common::worker_node::State; use risingwave_pb::common::WorkerType; @@ -281,6 +282,11 @@ impl DdlService for DdlServiceImpl { let sink = req.get_sink()?.clone(); let fragment_graph = req.get_fragment_graph()?.clone(); let affected_table_change = req.get_affected_table_change().cloned().ok(); + let dependencies = req + .get_dependencies() + .iter() + .map(|id| *id as ObjectId) + .collect(); let stream_job = match &affected_table_change { None => StreamingJob::Sink(sink, None), @@ -296,7 +302,7 @@ impl DdlService for DdlServiceImpl { fragment_graph, CreateType::Foreground, affected_table_change.map(Self::extract_replace_table_info), - HashSet::new(), // TODO(rc): pass dependencies through this field instead of `PbSink` + dependencies, ); let version = self.ddl_controller.run_command(command).await?; @@ -382,7 +388,11 @@ impl DdlService for DdlServiceImpl { let mview = req.get_materialized_view()?.clone(); let create_type = mview.get_create_type().unwrap_or(CreateType::Foreground); let fragment_graph = req.get_fragment_graph()?.clone(); - let dependencies = req.get_dependencies().iter().map(|id| *id as i32).collect(); + let dependencies = req + .get_dependencies() + .iter() + .map(|id| *id as ObjectId) + .collect(); let stream_job = StreamingJob::MaterializedView(mview); let version = self diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 8ee60d251f8b..ddb3fea6e0ed 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -443,11 +443,13 @@ impl MetaClient { sink: PbSink, graph: StreamFragmentGraph, affected_table_change: Option, + dependencies: HashSet, ) -> Result { let request = CreateSinkRequest { sink: Some(sink), fragment_graph: Some(graph), affected_table_change, + dependencies: dependencies.into_iter().collect(), }; let resp = self.inner.create_sink(request).await?;