diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 6467bd6e1d7e7..b6e1486fd1e54 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -136,6 +136,9 @@ message CreateMaterializedViewRequest { SERVERLESS = 2; } BackfillType backfill = 3; + + // The list of object IDs that this materialized view depends on. + repeated uint32 dependencies = 4; } message CreateMaterializedViewResponse { diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index e955525ba030e..1fbabdfe57771 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -168,6 +168,8 @@ pub trait SysCatalogReader: Sync + Send + 'static { pub type SysCatalogReaderRef = Arc; +pub type ObjectId = u32; + #[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)] #[display("{database_id}")] pub struct DatabaseId { diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 271d395181df8..ed3af36049521 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use anyhow::anyhow; use parking_lot::lock_api::ArcRwLockReadGuard; use parking_lot::{RawRwLock, RwLock}; -use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId}; +use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, ObjectId}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::catalog::{ @@ -78,6 +79,7 @@ pub trait CatalogWriter: Send + Sync { &self, table: PbTable, graph: StreamFragmentGraph, + dependencies: HashSet, ) -> Result<()>; async fn create_table( @@ -246,11 +248,12 @@ impl CatalogWriter for CatalogWriterImpl { &self, table: PbTable, graph: StreamFragmentGraph, + dependencies: HashSet, ) -> Result<()> { let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground); let version = self .meta_client - .create_materialized_view(table, graph) + .create_materialized_view(table, graph, dependencies) .await?; if matches!(create_type, PbCreateType::Foreground) { self.wait_version(version).await? diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index 3e66e3357597c..83d8759735899 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -18,7 +18,7 @@ use either::Either; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::acl::AclMode; -use risingwave_common::catalog::TableId; +use risingwave_common::catalog::{FunctionId, ObjectId, TableId}; use risingwave_pb::catalog::PbTable; use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query}; @@ -93,15 +93,7 @@ pub fn gen_create_mv_plan( ) -> Result<(PlanRef, PbTable)> { let mut binder = Binder::new_for_stream(session); let bound = binder.bind_query(query)?; - gen_create_mv_plan_bound( - session, - context, - bound, - binder.included_relations().clone(), - name, - columns, - emit_mode, - ) + gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode) } /// Generate create MV plan from a bound query @@ -109,7 +101,6 @@ pub fn gen_create_mv_plan_bound( session: &SessionImpl, context: OptimizerContextRef, query: BoundQuery, - dependent_relations: HashSet, name: ObjectName, columns: Vec, emit_mode: Option, @@ -147,17 +138,9 @@ pub fn gen_create_mv_plan_bound( let mut table = materialize.table().to_prost(schema_id, database_id); let plan: PlanRef = materialize.into(); - let dependent_relations = - RelationCollectorVisitor::collect_with(dependent_relations, plan.clone()); table.owner = session.user_id(); - // record dependent relations. - table.dependent_relations = dependent_relations - .into_iter() - .map(|t| t.table_id) - .collect_vec(); - let ctx = plan.ctx(); let explain_trace = ctx.is_explain_trace(); if explain_trace { @@ -176,10 +159,14 @@ pub async fn handle_create_mv( columns: Vec, emit_mode: Option, ) -> Result { - let (dependent_relations, bound) = { + let (dependent_relations, dependent_udfs, bound) = { let mut binder = Binder::new_for_stream(handler_args.session.as_ref()); let bound = binder.bind_query(query)?; - (binder.included_relations().clone(), bound) + ( + binder.included_relations().clone(), + binder.included_udfs().clone(), + bound, + ) }; handle_create_mv_bound( handler_args, @@ -187,6 +174,7 @@ pub async fn handle_create_mv( name, bound, dependent_relations, + dependent_udfs, columns, emit_mode, ) @@ -199,6 +187,7 @@ pub async fn handle_create_mv_bound( name: ObjectName, query: BoundQuery, dependent_relations: HashSet, + dependent_udfs: HashSet, // TODO(rc): merge with `dependent_relations` columns: Vec, emit_mode: Option, ) -> Result { @@ -215,7 +204,7 @@ pub async fn handle_create_mv_bound( return Ok(resp); } - let (table, graph) = { + let (table, graph, dependencies) = { let context = OptimizerContext::from_handler_args(handler_args); if !context.with_options().is_empty() { // get other useful fields by `remove`, the logic here is to reject unknown options. @@ -232,19 +221,23 @@ It only indicates the physical clustering of the data, which may improve the per "#.to_string()); } - let (plan, table) = gen_create_mv_plan_bound( - &session, - context.into(), - query, - dependent_relations, - name, - columns, - emit_mode, - )?; + let (plan, table) = + gen_create_mv_plan_bound(&session, context.into(), query, name, columns, emit_mode)?; + + let dependencies = + RelationCollectorVisitor::collect_with(dependent_relations, plan.clone()) + .into_iter() + .map(|id| id.table_id() as ObjectId) + .chain( + dependent_udfs + .into_iter() + .map(|id| id.function_id() as ObjectId), + ) + .collect(); let graph = build_graph(plan)?; - (table, graph) + (table, graph, dependencies) }; // Ensure writes to `StreamJobTracker` are atomic. @@ -262,7 +255,7 @@ It only indicates the physical clustering of the data, which may improve the per let session = session.clone(); let catalog_writer = session.catalog_writer()?; catalog_writer - .create_materialized_view(table, graph) + .create_materialized_view(table, graph, dependencies) .await?; Ok(PgResponse::empty_result( diff --git a/src/frontend/src/handler/extended_handle.rs b/src/frontend/src/handler/extended_handle.rs index f12eaa617352b..9abd26bda7fdf 100644 --- a/src/frontend/src/handler/extended_handle.rs +++ b/src/frontend/src/handler/extended_handle.rs @@ -166,6 +166,7 @@ pub fn handle_bind( bound, param_types, dependent_relations, + dependent_udfs, .. } = bound_result; @@ -176,6 +177,7 @@ pub fn handle_bind( param_types, parsed_params: Some(parsed_params), dependent_relations, + dependent_udfs, bound: new_bound, }; Ok(Portal::Portal(PortalResult { diff --git a/src/frontend/src/handler/fetch_cursor.rs b/src/frontend/src/handler/fetch_cursor.rs index 01de56da0310a..7f0b88826fabe 100644 --- a/src/frontend/src/handler/fetch_cursor.rs +++ b/src/frontend/src/handler/fetch_cursor.rs @@ -124,6 +124,7 @@ pub async fn handle_parse( param_types: binder.export_param_types()?, parsed_params: None, dependent_relations: binder.included_relations().clone(), + dependent_udfs: binder.included_udfs().clone(), }; let result = PreparedResult { statement, diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 632c05b1e9e45..16c01b9131195 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -22,7 +22,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Format; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::bail_not_implemented; -use risingwave_common::catalog::Schema; +use risingwave_common::catalog::{FunctionId, Schema}; use risingwave_common::session_config::QueryMode; use risingwave_common::types::{DataType, Datum}; use risingwave_sqlparser::ast::{SetExpr, Statement}; @@ -109,6 +109,7 @@ pub async fn handle_execute( let BoundResult { bound, dependent_relations, + dependent_udfs, .. } = bound_result; let create_mv = if let BoundStatement::CreateView(create_mv) = bound { @@ -144,6 +145,7 @@ pub async fn handle_execute( name, *query, dependent_relations, + dependent_udfs, columns, emit_mode, ) @@ -170,6 +172,8 @@ pub struct BoundResult { pub(crate) param_types: Vec, pub(crate) parsed_params: Option>, pub(crate) dependent_relations: HashSet, + /// TODO(rc): merge with `dependent_relations` + pub(crate) dependent_udfs: HashSet, } fn gen_bound( @@ -194,6 +198,7 @@ fn gen_bound( param_types: binder.export_param_types()?, parsed_params: None, dependent_relations: binder.included_relations().clone(), + dependent_udfs: binder.included_udfs().clone(), }) } diff --git a/src/frontend/src/optimizer/plan_visitor/relation_collector_visitor.rs b/src/frontend/src/optimizer/plan_visitor/relation_collector_visitor.rs index 59535ddd2b654..c26e49fada502 100644 --- a/src/frontend/src/optimizer/plan_visitor/relation_collector_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/relation_collector_visitor.rs @@ -21,6 +21,7 @@ use crate::optimizer::plan_node::{BatchSource, LogicalScan, StreamSource, Stream use crate::optimizer::plan_visitor::PlanVisitor; use crate::PlanRef; +/// TODO(rc): maybe we should rename this to `DependencyCollectorVisitor`. #[derive(Debug, Clone, Default)] pub struct RelationCollectorVisitor { relations: HashSet, diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index d94b1dd2652d6..64c667a7573df 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::io::Write; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicU32, Ordering}; @@ -25,8 +25,9 @@ use pgwire::pg_response::StatementType; use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator}; use pgwire::types::Row; use risingwave_common::catalog::{ - FunctionId, IndexId, TableId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, - DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME, + FunctionId, IndexId, ObjectId, TableId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, + DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME, + RW_CATALOG_SCHEMA_NAME, }; use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat}; use risingwave_common::session_config::SessionConfig; @@ -279,6 +280,7 @@ impl CatalogWriter for MockCatalogWriter { &self, mut table: PbTable, _graph: StreamFragmentGraph, + _dependencies: HashSet, ) -> Result<()> { table.id = self.gen_id(); table.stream_job_status = PbStreamJobStatus::Created as _; @@ -309,7 +311,8 @@ impl CatalogWriter for MockCatalogWriter { table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)); } - self.create_materialized_view(table, graph).await?; + self.create_materialized_view(table, graph, HashSet::new()) + .await?; Ok(()) } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index be733e8d4ec1d..8ee60d251f8b7 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Display}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; @@ -26,7 +26,7 @@ use cluster_limit_service_client::ClusterLimitServiceClient; use either::Either; use futures::stream::BoxStream; use lru::LruCache; -use risingwave_common::catalog::{FunctionId, IndexId, SecretId, TableId}; +use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, SecretId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::monitor::EndpointExt; @@ -391,11 +391,13 @@ impl MetaClient { &self, table: PbTable, graph: StreamFragmentGraph, + dependencies: HashSet, ) -> Result { let request = CreateMaterializedViewRequest { materialized_view: Some(table), fragment_graph: Some(graph), backfill: PbBackfillType::Regular as _, + dependencies: dependencies.into_iter().collect(), }; let resp = self.inner.create_materialized_view(request).await?; // TODO: handle error in `resp.status` here