diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index b8ca322d767a8..c8e1f42656e47 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -42,3 +42,6 @@ c583e2c6c054764249acf484438c7bf7197765f4 # chore: replace all ProstXxx with PbXxx (#8621) 6fd8821f2e053957b183d648bea9c95b6703941f + +# chore: cleanup v2 naming for sql metastore (#18941) +9a6a7f9052d5679165ff57cc01417c742c95351c diff --git a/proto/catalog.proto b/proto/catalog.proto index 169347c199eb9..5383104e9c0f2 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -95,6 +95,8 @@ message StreamSourceInfo { } message Source { + // For shared source, this is the same as the job id. + // For non-shared source and table with connector, this is a different oid. uint32 id = 1; uint32 schema_id = 2; uint32 database_id = 3; diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 1241553aff04a..ce90fa94253f3 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -35,7 +35,7 @@ use risingwave_sqlparser::ast::{ use risingwave_sqlparser::parser::Parser; use super::create_source::get_json_schema_location; -use super::create_table::{generate_stream_graph_for_table, ColumnIdGenerator}; +use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator}; use super::util::SourceSchemaCompatExt; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; @@ -192,7 +192,7 @@ pub async fn get_replace_table_plan( panic!("unexpected statement type: {:?}", definition); }; - let (mut graph, table, source, job_type) = generate_stream_graph_for_table( + let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table( session, table_name, original_catalog, diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index ea9f9e98f9b71..24a7e96fe15d7 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -54,7 +54,7 @@ use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{rewrite_now_to_proctime, ExprImpl, InputRef}; use crate::handler::alter_table_column::fetch_table_catalog_for_alter; use crate::handler::create_mv::parse_column_names; -use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGenerator}; +use crate::handler::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator}; use crate::handler::privilege::resolve_query_privileges; use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; @@ -672,7 +672,7 @@ pub(crate) async fn reparse_table_for_sink( panic!("unexpected statement type: {:?}", definition); }; - let (graph, table, source, _) = generate_stream_graph_for_table( + let (graph, table, source, _) = generate_stream_graph_for_replace_table( session, table_name, table_catalog, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index eab38a44c4ff4..1e5dc489c1a0c 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -35,6 +35,7 @@ use risingwave_connector::source::cdc::external::{ ExternalTableConfig, ExternalTableImpl, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, }; use risingwave_connector::{source, WithOptionsSecResolved}; +use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, PbTable, Table, WatermarkDesc}; use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; @@ -1322,7 +1323,7 @@ pub fn check_create_table_with_source( } #[allow(clippy::too_many_arguments)] -pub async fn generate_stream_graph_for_table( +pub async fn generate_stream_graph_for_replace_table( _session: &Arc, table_name: ObjectName, original_catalog: &Arc, @@ -1341,7 +1342,7 @@ pub async fn generate_stream_graph_for_table( ) -> Result<(StreamFragmentGraph, Table, Option, TableJobType)> { use risingwave_pb::catalog::table::OptionalAssociatedSourceId; - let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) { + let ((plan, mut source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) { (Some(format_encode), None) => ( gen_create_table_plan_with_source( handler_args, @@ -1441,13 +1442,18 @@ pub async fn generate_stream_graph_for_table( let graph = build_graph(plan)?; // Fill the original table ID. - let table = Table { + let mut table = Table { id: original_catalog.id().table_id(), - optional_associated_source_id: original_catalog - .associated_source_id() - .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())), ..table }; + if let Some(source_id) = original_catalog.associated_source_id() { + table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId( + source_id.table_id, + )); + source.as_mut().unwrap().id = source_id.table_id; + source.as_mut().unwrap().optional_associated_table_id = + Some(OptionalAssociatedTableId::AssociatedTableId(table.id)) + } Ok((graph, table, source, job_type)) } diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 1d0cc36fff3f2..e59c4a4100141 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -23,9 +23,7 @@ use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkId; use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; -use risingwave_meta::rpc::ddl_controller::fill_table_stream_graph_info; use risingwave_meta::rpc::metrics::MetaMetrics; -use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{Comment, CreateType, Secret, Table}; use risingwave_pb::common::worker_node::State; use risingwave_pb::common::WorkerType; @@ -84,27 +82,28 @@ impl DdlServiceImpl { } } - fn extract_replace_table_info(change: ReplaceTablePlan) -> ReplaceTableInfo { - let job_type = change.get_job_type().unwrap_or_default(); - let mut source = change.source; - let mut fragment_graph = change.fragment_graph.unwrap(); - let mut table = change.table.unwrap(); - if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) = - table.optional_associated_source_id - { - source.as_mut().unwrap().id = source_id; - fill_table_stream_graph_info(&mut source, &mut table, job_type, &mut fragment_graph); - } - let table_col_index_mapping = change - .table_col_index_mapping + fn extract_replace_table_info( + ReplaceTablePlan { + table, + fragment_graph, + table_col_index_mapping, + source, + job_type, + }: ReplaceTablePlan, + ) -> ReplaceTableInfo { + let table = table.unwrap(); + let col_index_mapping = table_col_index_mapping .as_ref() .map(ColIndexMapping::from_protobuf); - let stream_job = StreamingJob::Table(source, table, job_type); ReplaceTableInfo { - streaming_job: stream_job, - fragment_graph, - col_index_mapping: table_col_index_mapping, + streaming_job: StreamingJob::Table( + source, + table, + TableJobType::try_from(job_type).unwrap(), + ), + fragment_graph: fragment_graph.unwrap(), + col_index_mapping, } } } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 5257b2c1aa24a..9d5b135095fb1 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -328,8 +328,4 @@ impl StreamingJob { StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()), } } - - pub fn is_source_job(&self) -> bool { - matches!(self, StreamingJob::Source(_)) - } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index f0cbd1d5a477f..fd336a1d0a5b4 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -27,7 +27,7 @@ use risingwave_common::secret::SecretEncryption; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::{ - visit_fragment, visit_stream_node, visit_stream_node_cont_mut, + visit_stream_node, visit_stream_node_cont_mut, }; use risingwave_common::{bail, hash, must_match}; use risingwave_connector::error::ConnectorError; @@ -40,11 +40,9 @@ use risingwave_meta_model::{ ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId, }; -use risingwave_pb::catalog::source::OptionalAssociatedTableId; -use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ - Comment, Connection, CreateType, Database, Function, PbSink, PbSource, PbTable, Schema, Secret, - Sink, Source, Subscription, Table, View, + Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source, + Subscription, Table, View, }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ @@ -906,7 +904,7 @@ impl DdlController { pub async fn create_streaming_job( &self, mut streaming_job: StreamingJob, - mut fragment_graph: StreamFragmentGraphProto, + fragment_graph: StreamFragmentGraphProto, affected_table_replace_info: Option, ) -> MetaResult { let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); @@ -921,24 +919,6 @@ impl DdlController { .await?; let job_id = streaming_job.id(); - match &mut streaming_job { - StreamingJob::Table(src, table, job_type) => { - // If we're creating a table with connector, we should additionally fill its ID first. - fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph); - } - StreamingJob::Source(src) => { - // set the inner source id of source node. - for fragment in fragment_graph.fragments.values_mut() { - visit_fragment(fragment, |node_body| { - if let NodeBody::Source(source_node) = node_body { - source_node.source_inner.as_mut().unwrap().source_id = src.id; - } - }); - } - } - _ => {} - } - tracing::debug!( id = job_id, definition = streaming_job.definition(), @@ -2003,51 +1983,3 @@ impl DdlController { .await } } - -/// Fill in necessary information for `Table` stream graph. -/// e.g., fill source id for table with connector, fill external table id for CDC table. -pub fn fill_table_stream_graph_info( - source: &mut Option, - table: &mut PbTable, - table_job_type: TableJobType, - fragment_graph: &mut PbStreamFragmentGraph, -) { - let mut source_count = 0; - for fragment in fragment_graph.fragments.values_mut() { - visit_fragment(fragment, |node_body| { - if let NodeBody::Source(source_node) = node_body { - if source_node.source_inner.is_none() { - // skip empty source for dml node - return; - } - - // If we're creating a table with connector, we should additionally fill its ID first. - if let Some(source) = source { - source_node.source_inner.as_mut().unwrap().source_id = source.id; - source_count += 1; - - assert_eq!( - source_count, 1, - "require exactly 1 external stream source when creating table with a connector" - ); - - // Fill in the correct table id for source. - source.optional_associated_table_id = - Some(OptionalAssociatedTableId::AssociatedTableId(table.id)); - // Fill in the correct source id for mview. - table.optional_associated_source_id = - Some(OptionalAssociatedSourceId::AssociatedSourceId(source.id)); - } - } - - // fill table id for cdc backfill - if let NodeBody::StreamCdcScan(node) = node_body - && table_job_type == TableJobType::SharedCdcSource - { - if let Some(table_desc) = node.cdc_table_desc.as_mut() { - table_desc.table_id = table.id; - } - } - }); - } -} diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index b13acb68ac39b..ef8c1298264b9 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -1002,6 +1002,7 @@ impl SourceManager { /// create and register connector worker for source. pub async fn register_source(&self, source: &Source) -> MetaResult<()> { + tracing::debug!("register_source: {}", source.get_id()); let mut core = self.core.lock().await; if let Entry::Vacant(e) = core.managed_sources.entry(source.get_id() as _) { let handle = create_source_worker_handle(source, self.metrics.clone()) diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 86a2197a9d5bc..c2ccd4300ccf1 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -56,7 +56,7 @@ pub(super) struct BuildingFragment { inner: StreamFragment, /// The ID of the job if it contains the streaming job node. - table_id: Option, + job_id: Option, /// The required column IDs of each upstream table. /// Will be converted to indices when building the edge connected to the upstream. @@ -82,12 +82,12 @@ impl BuildingFragment { // Fill the information of the internal tables in the fragment. Self::fill_internal_tables(&mut fragment, job, table_id_gen); - let table_id = Self::fill_job(&mut fragment, job).then(|| job.id()); + let job_id = Self::fill_job(&mut fragment, job).then(|| job.id()); let upstream_table_columns = Self::extract_upstream_table_columns(&mut fragment); Self { inner: fragment, - table_id, + job_id, upstream_table_columns, } } @@ -126,17 +126,17 @@ impl BuildingFragment { /// Fill the information with the job in the fragment. fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool { - let table_id = job.id(); + let job_id = job.id(); let fragment_id = fragment.fragment_id; - let mut has_table = false; + let mut has_job = false; stream_graph_visitor::visit_fragment(fragment, |node_body| match node_body { NodeBody::Materialize(materialize_node) => { - materialize_node.table_id = table_id; + materialize_node.table_id = job_id; // Fill the ID of the `Table`. let table = materialize_node.table.as_mut().unwrap(); - table.id = table_id; + table.id = job_id; table.database_id = job.database_id(); table.schema_id = job.schema_id(); table.fragment_id = fragment_id; @@ -145,27 +145,49 @@ impl BuildingFragment { table.definition = job.name(); } - has_table = true; + has_job = true; } NodeBody::Sink(sink_node) => { - sink_node.sink_desc.as_mut().unwrap().id = table_id; + sink_node.sink_desc.as_mut().unwrap().id = job_id; - has_table = true; + has_job = true; } NodeBody::Dml(dml_node) => { - dml_node.table_id = table_id; + dml_node.table_id = job_id; dml_node.table_version_id = job.table_version_id().unwrap(); } - NodeBody::Source(_) => { - // Notice: Table job has a dumb Source node, we should be careful that `has_table` should not be overwrite to `false` - if !has_table { - has_table = job.is_source_job(); + NodeBody::Source(source_node) => { + match job { + // Note: For table without connector, it has a dummy Source node. + // Note: For table with connector, it's source node has a source id different with the table id (job id), assigned in create_job_catalog. + StreamingJob::Table(source, _table, _table_job_type) => { + if let Some(source_inner) = source_node.source_inner.as_mut() { + if let Some(source) = source { + debug_assert_ne!(source.id, job_id); + source_inner.source_id = source.id; + } + } + } + StreamingJob::Source(source) => { + has_job = true; + if let Some(source_inner) = source_node.source_inner.as_mut() { + debug_assert_eq!(source.id, job_id); + source_inner.source_id = source.id; + } + } + // For other job types, no need to fill the source id, since it refers to an existing source. + _ => {} + } + } + NodeBody::StreamCdcScan(node) => { + if let Some(table_desc) = node.cdc_table_desc.as_mut() { + table_desc.table_id = job_id; } } _ => {} }); - has_table + has_job } /// Extract the required columns (in IDs) of each upstream table. @@ -499,7 +521,7 @@ impl StreamFragmentGraph { pub fn table_fragment_id(&self) -> FragmentId { self.fragments .values() - .filter(|b| b.table_id.is_some()) + .filter(|b| b.job_id.is_some()) .map(|b| b.fragment_id) .exactly_one() .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job") @@ -1095,7 +1117,7 @@ impl CompleteStreamFragmentGraph { let internal_tables = building_fragment.extract_internal_tables(); let BuildingFragment { inner, - table_id, + job_id, upstream_table_columns: _, } = building_fragment; @@ -1104,7 +1126,7 @@ impl CompleteStreamFragmentGraph { let materialized_fragment_id = if inner.fragment_type_mask & FragmentTypeFlag::Mview as u32 != 0 { - table_id + job_id } else { None };