diff --git a/proto/catalog.proto b/proto/catalog.proto index 5894294290c21..3c3ee374a9e6e 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -339,6 +339,7 @@ message Table { string name = 4; repeated plan_common.ColumnCatalog columns = 5; repeated common.ColumnOrder pk = 6; + // For cdc table created from a cdc source, here records the source id. repeated uint32 dependent_relations = 8; oneof optional_associated_source_id { uint32 associated_source_id = 9; diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 26212d659206a..f51f0e1b3a428 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -456,6 +456,7 @@ message TableSchemaChange { TableChangeType change_type = 1; string cdc_table_id = 2; repeated plan_common.ColumnCatalog columns = 3; + string upstream_ddl = 4; } message SchemaChangeEnvelope { diff --git a/proto/meta.proto b/proto/meta.proto index ef00c1ef663e8..d69b8fd349893 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -731,6 +731,11 @@ message EventLog { common.HostAddress host_addr = 3; string panic_info = 4; } + message EventAutoSchemaChangeFail { + uint32 table_id = 1; + string cdc_table_id = 2; + string upstream_ddl = 3; + } // Event logs identifier, which should be populated by event log service. optional string unique_id = 1; // Processing time, which should be populated by event log service. @@ -743,6 +748,7 @@ message EventLog { EventInjectBarrierFail inject_barrier_fail = 7; EventCollectBarrierFail collect_barrier_fail = 8; EventLog.EventWorkerNodePanic worker_node_panic = 9; + EventLog.EventAutoSchemaChangeFail auto_schema_change_fail = 10; } } diff --git a/src/connector/src/parser/debezium/schema_change.rs b/src/connector/src/parser/debezium/schema_change.rs index c586257ae696b..dafdb89642e5c 100644 --- a/src/connector/src/parser/debezium/schema_change.rs +++ b/src/connector/src/parser/debezium/schema_change.rs @@ -68,6 +68,7 @@ pub struct TableSchemaChange { pub(crate) cdc_table_id: String, pub(crate) columns: Vec, pub(crate) change_type: TableChangeType, + pub(crate) upstream_ddl: String, } impl SchemaChangeEnvelope { @@ -89,6 +90,7 @@ impl SchemaChangeEnvelope { change_type: table_change.change_type.to_proto() as _, cdc_table_id: table_change.cdc_table_id.clone(), columns, + upstream_ddl: table_change.upstream_ddl.clone(), } }) .collect(); @@ -96,7 +98,7 @@ impl SchemaChangeEnvelope { PbSchemaChangeEnvelope { table_changes } } - pub fn table_names(&self) -> Vec { + pub fn table_ids(&self) -> Vec { self.table_changes .iter() .map(|table_change| table_change.cdc_table_id.clone()) diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index d820eb0a26dba..f1ac65d79a654 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -125,7 +125,11 @@ impl PlainParser { .generate_accessor(data) .await?; - return match parse_schema_change(&accessor, &self.source_ctx.connector_props) { + return match parse_schema_change( + &accessor, + self.source_ctx.source_id.into(), + &self.source_ctx.connector_props, + ) { Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)), Err(err) => Err(err)?, }; @@ -482,7 +486,7 @@ mod tests { .unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); - let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_1", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#; + let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#; let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( "mydb.test".to_string(), 0, @@ -508,7 +512,7 @@ mod tests { SchemaChangeEnvelope { table_changes: [ TableSchemaChange { - cdc_table_id: "mydb.test", + cdc_table_id: "0.mydb.test", columns: [ ColumnCatalog { column_desc: ColumnDesc { @@ -560,6 +564,7 @@ mod tests { }, ], change_type: Alter, + upstream_ddl: "ALTER TABLE test add column v2 varchar(32)", }, ], }, diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index a623aeb70bb61..9af7ef359e250 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -16,6 +16,7 @@ use itertools::Itertools; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; use risingwave_common::types::{ DataType, Datum, DatumCow, Scalar, ScalarImpl, ScalarRefImpl, Timestamptz, ToDatumRef, + ToOwnedDatum, }; use risingwave_connector_codec::decoder::AccessExt; use risingwave_pb::plan_common::additional_column::ColumnType; @@ -25,6 +26,7 @@ use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange}; use crate::parser::schema_change::TableChangeType; use crate::parser::TransactionControl; +use crate::source::cdc::build_cdc_table_id; use crate::source::cdc::external::mysql::{mysql_type_to_rw_type, type_name_to_mysql_type}; use crate::source::{ConnectorProperties, SourceColumnDesc}; @@ -70,6 +72,7 @@ pub struct DebeziumChangeEvent { const BEFORE: &str = "before"; const AFTER: &str = "after"; +const UPSTREAM_DDL: &str = "ddl"; const SOURCE: &str = "source"; const SOURCE_TS_MS: &str = "ts_ms"; const SOURCE_DB: &str = "db"; @@ -152,12 +155,23 @@ macro_rules! jsonb_access_field { }; } +/// Parse the schema change message from Debezium. +/// The layout of MySQL schema change message can refer to +/// pub fn parse_schema_change( accessor: &impl Access, + source_id: u32, connector_props: &ConnectorProperties, ) -> AccessResult { let mut schema_changes = vec![]; + let upstream_ddl = accessor + .access(&[UPSTREAM_DDL], &DataType::Varchar)? + .to_owned_datum() + .unwrap() + .as_utf8() + .to_string(); + if let Some(ScalarRefImpl::List(table_changes)) = accessor .access(&[TABLE_CHANGES], &DataType::List(Box::new(DataType::Jsonb)))? .to_datum_ref() @@ -210,8 +224,11 @@ pub fn parse_schema_change( column_descs.push(ColumnDesc::named(name, ColumnId::placeholder(), data_type)); } } + + // concatenate the source_id to the cdc_table_id + let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str()); schema_changes.push(TableSchemaChange { - cdc_table_id: id.replace('"', ""), // remove the double quotes + cdc_table_id, columns: column_descs .into_iter() .map(|column_desc| ColumnCatalog { @@ -220,6 +237,7 @@ pub fn parse_schema_change( }) .collect_vec(), change_type: ty.as_str().into(), + upstream_ddl: upstream_ddl.clone(), }); } diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index c56749de9b9b8..3f3626449153a 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -53,6 +53,11 @@ pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME; pub const MONGODB_CDC_CONNECTOR: &str = Mongodb::CDC_CONNECTOR_NAME; pub const SQL_SERVER_CDC_CONNECTOR: &str = SqlServer::CDC_CONNECTOR_NAME; +/// Build a unique CDC table identifier from a source ID and external table name +pub fn build_cdc_table_id(source_id: u32, external_table_name: &str) -> String { + format!("{}.{}", source_id, external_table_name) +} + pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static { const CDC_CONNECTOR_NAME: &'static str; fn source_type() -> CdcSourceType; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs index bc83a5220757f..1b0acfece2d01 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs @@ -62,6 +62,7 @@ fn event_type(e: &Event) -> String { Event::InjectBarrierFail(_) => "INJECT_BARRIER_FAIL", Event::CollectBarrierFail(_) => "COLLECT_BARRIER_FAIL", Event::WorkerNodePanic(_) => "WORKER_NODE_PANIC", + Event::AutoSchemaChangeFail(_) => "AUTO_SCHEMA_CHANGE_FAIL", } .into() } diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 3fcfca881508d..380212e1a92dd 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -29,6 +29,7 @@ use risingwave_common::catalog::{ use risingwave_common::license::Feature; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; +use risingwave_connector::source::cdc::build_cdc_table_id; use risingwave_connector::source::cdc::external::{ ExternalTableConfig, ExternalTableImpl, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, }; @@ -834,6 +835,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( vec![], ); + let cdc_table_id = build_cdc_table_id(source.id, &external_table_name); let materialize = plan_root.gen_table_plan( context, resolved_table_name, @@ -848,7 +850,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( Some(col_id_gen.into_version()), true, None, - Some(external_table_name), + Some(cdc_table_id), )?; let mut table = materialize.table().to_prost(schema_id, database_id); diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index e263cc36b6a99..de5c3deaf0d6b 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -637,7 +637,7 @@ impl PlanRoot { version: Option, with_external_source: bool, retention_seconds: Option, - cdc_table_name: Option, + cdc_table_id: Option, ) -> Result { assert_eq!(self.phase, PlanPhase::Logical); assert_eq!(self.plan.convention(), Convention::Logical); @@ -868,7 +868,7 @@ impl PlanRoot { row_id_index, version, retention_seconds, - cdc_table_name, + cdc_table_id, ) } diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 01474532ebb5c..57d4454ac254a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -133,7 +133,7 @@ impl StreamMaterialize { row_id_index: Option, version: Option, retention_seconds: Option, - cdc_table_name: Option, + cdc_table_id: Option, ) -> Result { let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?; @@ -154,7 +154,7 @@ impl StreamMaterialize { CreateType::Foreground, )?; - table.cdc_table_id = cdc_table_name; + table.cdc_table_id = cdc_table_id; Ok(Self::new(input, table)) } diff --git a/src/meta/model_v2/src/object_dependency.rs b/src/meta/model_v2/src/object_dependency.rs index d5ca89215a93d..b96943f83ffdc 100644 --- a/src/meta/model_v2/src/object_dependency.rs +++ b/src/meta/model_v2/src/object_dependency.rs @@ -44,6 +44,16 @@ pub enum Relation { on_delete = "Cascade" )] Object1, + + // To join source on the oid column + #[sea_orm( + belongs_to = "super::source::Entity", + from = "Column::Oid", + to = "super::source::Column::SourceId", + on_update = "NoAction", + on_delete = "Cascade" + )] + Source, } impl ActiveModelBehavior for ActiveModel {} diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 3722658e7294f..44cebfa6f70ba 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -177,6 +177,16 @@ pub enum Relation { on_delete = "NoAction" )] Source, + + // To join object_dependency on the used_by column + #[sea_orm( + belongs_to = "super::object_dependency::Entity", + from = "Column::TableId", + to = "super::object_dependency::Column::UsedBy", + on_update = "NoAction", + on_delete = "Cascade" + )] + ObjectDependency, } impl Related for Entity { diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 5d1b3570e1ce3..75bcfa7953cbe 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -430,7 +430,7 @@ pub async fn start_service_as_election_leader( .await .unwrap(), ); - let catalog_controller = Arc::new(CatalogController::new(env.clone())); + let catalog_controller = Arc::new(CatalogController::new(env.clone()).await?); MetadataManager::new_v2(cluster_controller, catalog_controller) } }; diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index bb9e041d716b2..1cd6168dd0984 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -20,7 +20,7 @@ use rand::seq::SliceRandom; use rand::thread_rng; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkId; -use risingwave_meta::manager::MetadataManager; +use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; use risingwave_meta::rpc::ddl_controller::fill_table_stream_graph_info; use risingwave_pb::catalog::connection::private_link_service::{ PbPrivateLinkProvider, PrivateLinkProvider, @@ -34,6 +34,8 @@ use risingwave_pb::ddl_service::ddl_service_server::DdlService; use risingwave_pb::ddl_service::drop_table_request::PbSourceId; use risingwave_pb::ddl_service::*; use risingwave_pb::frontend_service::GetTableReplacePlanRequest; +use risingwave_pb::meta::event_log; +use thiserror_ext::AsReport; use tonic::{Request, Response, Status}; use crate::barrier::BarrierManagerRef; @@ -953,17 +955,12 @@ impl DdlService for DdlServiceImpl { }; for table_change in schema_change.table_changes { - let cdc_table_id = table_change.cdc_table_id.clone(); // get the table catalog corresponding to the cdc table let tables: Vec = self .metadata_manager - .get_table_catalog_by_cdc_table_id(cdc_table_id) + .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id) .await?; - tracing::info!( - "(auto schema change) Table jobs to replace: {:?}", - tables.iter().map(|t| t.id).collect::>() - ); for table in tables { // send a request to the frontend to get the ReplaceTablePlan // will retry with exponential backoff if the request fails @@ -974,21 +971,71 @@ impl DdlService for DdlServiceImpl { table_name: table.name, table_change: Some(table_change.clone()), }) - .await? - .into_inner(); - - if let Some(plan) = resp.replace_plan { - plan.table - .as_ref() - .inspect(|t| tracing::info!("Table job to replace: {}", t.id)); - // start the schema change procedure - self.ddl_controller - .run_command(DdlCommand::ReplaceTable(Self::extract_replace_table_info( - plan, - ))) - .await?; - tracing::info!("(auto schema change) Table {} replaced success", table.id); - } + .await; + + match resp { + Ok(resp) => { + let resp = resp.into_inner(); + if let Some(plan) = resp.replace_plan { + plan.table.as_ref().inspect(|t| { + tracing::info!( + target: "auto_schema_change", + table_id = t.id, + cdc_table_id = t.cdc_table_id, + upstraem_ddl = table_change.upstream_ddl, + "Start the replace config change") + }); + // start the schema change procedure + let replace_res = self + .ddl_controller + .run_command(DdlCommand::ReplaceTable( + Self::extract_replace_table_info(plan), + )) + .await; + + match replace_res { + Ok(_) => { + tracing::info!( + target: "auto_schema_change", + table_id = table.id, + cdc_table_id = table.cdc_table_id, + "Table replaced success"); + } + Err(e) => { + tracing::error!( + target: "auto_schema_change", + error = %e.as_report(), + table_id = table.id, + cdc_table_id = table.cdc_table_id, + upstraem_ddl = table_change.upstream_ddl, + "failed to replace the table", + ); + add_auto_schema_change_fail_event_log( + table.id, + table_change.cdc_table_id.clone(), + table_change.upstream_ddl.clone(), + &self.env.event_log_manager_ref(), + ); + } + }; + } + } + Err(e) => { + tracing::error!( + target: "auto_schema_change", + error = %e.as_report(), + table_id = table.id, + cdc_table_id = table.cdc_table_id, + "failed to get replace table plan", + ); + add_auto_schema_change_fail_event_log( + table.id, + table_change.cdc_table_id.clone(), + table_change.upstream_ddl.clone(), + &self.env.event_log_manager_ref(), + ); + } + }; } } @@ -1029,3 +1076,17 @@ impl DdlServiceImpl { Ok(()) } } + +fn add_auto_schema_change_fail_event_log( + table_id: u32, + cdc_table_id: String, + upstream_ddl: String, + event_log_manager: &EventLogManagerRef, +) { + let event = event_log::EventAutoSchemaChangeFail { + table_id, + cdc_table_id, + upstream_ddl, + }; + event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]); +} diff --git a/src/meta/service/src/event_log_service.rs b/src/meta/service/src/event_log_service.rs index b47814847cdb6..c7b148e28a58f 100644 --- a/src/meta/service/src/event_log_service.rs +++ b/src/meta/service/src/event_log_service.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_meta::manager::event_log::EventLogMangerRef; +use risingwave_meta::manager::event_log::EventLogManagerRef; use risingwave_pb::meta::event_log_service_server::EventLogService; use risingwave_pb::meta::{ AddEventLogRequest, AddEventLogResponse, ListEventLogRequest, ListEventLogResponse, @@ -20,11 +20,11 @@ use risingwave_pb::meta::{ use tonic::{Request, Response, Status}; pub struct EventLogServiceImpl { - event_log_manager: EventLogMangerRef, + event_log_manager: EventLogManagerRef, } impl EventLogServiceImpl { - pub fn new(event_log_manager: EventLogMangerRef) -> Self { + pub fn new(event_log_manager: EventLogManagerRef) -> Self { Self { event_log_manager } } } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 5698a96fb3a58..ffab5160b5d9f 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -23,6 +23,7 @@ use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMA use risingwave_common::secret::LocalSecretManager; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut; use risingwave_common::{bail, current_cluster_version}; +use risingwave_connector::source::cdc::build_cdc_table_id; use risingwave_connector::source::UPSTREAM_SOURCE_KEY; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; @@ -60,6 +61,7 @@ use sea_orm::{ }; use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use tracing::info; use super::utils::{check_subscription_name_duplicate, get_fragment_ids_by_jobs}; use crate::controller::rename::{alter_relation_rename, alter_relation_rename_refs}; @@ -67,8 +69,9 @@ use crate::controller::utils::{ build_relation_group, check_connection_name_duplicate, check_database_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, check_secret_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty, - ensure_user_id, get_referring_objects, get_referring_objects_cascade, get_user_privilege, - list_user_info_by_ids, resolve_source_register_info_for_jobs, PartialObject, + ensure_user_id, extract_external_table_name_from_definition, get_referring_objects, + get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids, + resolve_source_register_info_for_jobs, PartialObject, }; use crate::controller::ObjectModel; use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION}; @@ -104,15 +107,89 @@ pub struct ReleaseContext { } impl CatalogController { - pub fn new(env: MetaSrvEnv) -> Self { + pub async fn new(env: MetaSrvEnv) -> MetaResult { let meta_store = env.meta_store().as_sql().clone(); - Self { + let catalog_controller = Self { env, inner: RwLock::new(CatalogControllerInner { db: meta_store.conn, creating_table_finish_notifier: HashMap::new(), }), + }; + + catalog_controller.init().await?; + Ok(catalog_controller) + } + + async fn init(&self) -> MetaResult<()> { + self.table_catalog_cdc_table_id_update().await?; + Ok(()) + } + + /// Fill in the `cdc_table_id` field for Table with empty `cdc_table_id` and parent Source job. + /// NOTES: We assume Table with a parent Source job is a CDC table + async fn table_catalog_cdc_table_id_update(&self) -> MetaResult<()> { + let inner = self.inner.read().await; + let txn = inner.db.begin().await?; + + // select Tables which cdc_table_id is empty and has a parent Source job + let table_and_source_id: Vec<(TableId, String, SourceId)> = Table::find() + .join(JoinType::InnerJoin, table::Relation::ObjectDependency.def()) + .join( + JoinType::InnerJoin, + object_dependency::Relation::Source.def(), + ) + .select_only() + .columns([table::Column::TableId, table::Column::Definition]) + .columns([source::Column::SourceId]) + .filter( + table::Column::TableType.eq(TableType::Table).and( + table::Column::CdcTableId + .is_null() + .or(table::Column::CdcTableId.eq("")), + ), + ) + .into_tuple() + .all(&txn) + .await?; + + // return directly if the result set is empty. + if table_and_source_id.is_empty() { + return Ok(()); + } + + info!(table_and_source_id = ?table_and_source_id, "cdc table with empty cdc_table_id"); + + let mut cdc_table_ids = HashMap::new(); + for (table_id, definition, source_id) in table_and_source_id { + match extract_external_table_name_from_definition(&definition) { + None => { + tracing::warn!( + table_id = table_id, + definition = definition, + "failed to extract cdc table name from table definition.", + ) + } + Some(external_table_name) => { + cdc_table_ids.insert( + table_id, + build_cdc_table_id(source_id as u32, &external_table_name), + ); + } + } } + + for (table_id, cdc_table_id) in cdc_table_ids { + table::ActiveModel { + table_id: Set(table_id as _), + cdc_table_id: Set(Some(cdc_table_id)), + ..Default::default() + } + .update(&txn) + .await?; + } + txn.commit().await?; + Ok(()) } /// Used in `NotificationService::subscribe`. @@ -2879,7 +2956,7 @@ impl CatalogController { pub async fn get_table_by_cdc_table_id( &self, - cdc_table_id: String, + cdc_table_id: &String, ) -> MetaResult> { let inner = self.inner.read().await; let table_objs = Table::find() @@ -3250,7 +3327,7 @@ mod tests { #[tokio::test] async fn test_database_func() -> MetaResult<()> { - let mgr = CatalogController::new(MetaSrvEnv::for_test_with_sql_meta_store().await); + let mgr = CatalogController::new(MetaSrvEnv::for_test_with_sql_meta_store().await).await?; let pb_database = PbDatabase { name: "db1".to_string(), owner: TEST_OWNER_ID as _, @@ -3282,7 +3359,7 @@ mod tests { #[tokio::test] async fn test_schema_func() -> MetaResult<()> { - let mgr = CatalogController::new(MetaSrvEnv::for_test_with_sql_meta_store().await); + let mgr = CatalogController::new(MetaSrvEnv::for_test_with_sql_meta_store().await).await?; let pb_schema = PbSchema { database_id: TEST_DATABASE_ID as _, name: "schema1".to_string(), @@ -3315,7 +3392,7 @@ mod tests { #[tokio::test] async fn test_create_view() -> MetaResult<()> { - let mgr = CatalogController::new(MetaSrvEnv::for_test_with_sql_meta_store().await); + let mgr = CatalogController::new(MetaSrvEnv::for_test_with_sql_meta_store().await).await?; let pb_view = PbView { schema_id: TEST_SCHEMA_ID as _, database_id: TEST_DATABASE_ID as _, @@ -3340,7 +3417,7 @@ mod tests { #[tokio::test] async fn test_create_function() -> MetaResult<()> { - let mgr = CatalogController::new(MetaSrvEnv::for_test_with_sql_meta_store().await); + let mgr = CatalogController::new(MetaSrvEnv::for_test_with_sql_meta_store().await).await?; let test_data_type = risingwave_pb::data::DataType { type_name: risingwave_pb::data::data_type::TypeName::Int32 as _, ..Default::default() @@ -3388,7 +3465,7 @@ mod tests { #[tokio::test] async fn test_alter_relation_rename() -> MetaResult<()> { - let mgr = CatalogController::new(MetaSrvEnv::for_test_with_sql_meta_store().await); + let mgr = CatalogController::new(MetaSrvEnv::for_test_with_sql_meta_store().await).await?; let pb_source = PbSource { schema_id: TEST_SCHEMA_ID as _, database_id: TEST_DATABASE_ID as _, diff --git a/src/meta/src/controller/user.rs b/src/meta/src/controller/user.rs index f6afedf875f58..5c7abac7a88fc 100644 --- a/src/meta/src/controller/user.rs +++ b/src/meta/src/controller/user.rs @@ -506,7 +506,7 @@ mod tests { #[tokio::test] async fn test_user_and_privilege() -> MetaResult<()> { - let mgr = CatalogController::new(MetaSrvEnv::for_test_with_sql_meta_store().await); + let mgr = CatalogController::new(MetaSrvEnv::for_test_with_sql_meta_store().await).await?; mgr.create_user(make_test_user("test_user_1")).await?; mgr.create_user(make_test_user("test_user_2")).await?; let user_1 = mgr.get_user_by_name("test_user_1").await?; diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 088796b4e9713..2d517272a3d00 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -14,7 +14,7 @@ use std::collections::{BTreeSet, HashMap, HashSet}; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; use risingwave_common::hash; @@ -42,6 +42,8 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{PbFragmentTypeFlag, PbStreamNode, StreamSource}; use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject}; use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo}; +use risingwave_sqlparser::ast::Statement as SqlStatement; +use risingwave_sqlparser::parser::Parser; use sea_orm::sea_query::{ Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType, WithClause, @@ -50,6 +52,7 @@ use sea_orm::{ ColumnTrait, ConnectionTrait, DerivePartialModel, EntityTrait, FromQueryResult, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, Statement, }; +use thiserror_ext::AsReport; use crate::{MetaError, MetaResult}; /// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object. @@ -1137,3 +1140,24 @@ pub(crate) fn build_relation_group(relation_objects: Vec) -> Noti } NotificationInfo::RelationGroup(PbRelationGroup { relations }) } + +pub fn extract_external_table_name_from_definition(table_definition: &str) -> Option { + let [mut definition]: [_; 1] = Parser::parse_sql(table_definition) + .context("unable to parse table definition") + .inspect_err(|e| { + tracing::error!( + target: "auto_schema_change", + error = %e.as_report(), + "failed to parse table definition") + }) + .unwrap() + .try_into() + .unwrap(); + if let SqlStatement::CreateTable { cdc_table_info, .. } = &mut definition { + cdc_table_info + .clone() + .map(|cdc_table_info| cdc_table_info.external_table_name) + } else { + None + } +} diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index e39b41cbaff0d..d6523060e967d 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -248,11 +248,11 @@ impl DatabaseManager { .collect() } - pub fn get_table_by_cdc_table_id(&self, cdc_table_id: String) -> Vec
{ + pub fn get_table_by_cdc_table_id(&self, cdc_table_id: &String) -> Vec
{ let cdc_table_id = Some(cdc_table_id); self.tables .values() - .filter(|t| t.cdc_table_id == cdc_table_id) + .filter(|t| t.cdc_table_id.as_ref() == cdc_table_id) .cloned() .collect() } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 29d7cd754dd4f..8939e2f9802d9 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -33,6 +33,7 @@ use risingwave_common::catalog::{ }; use risingwave_common::secret::LocalSecretManager; use risingwave_common::{bail, current_cluster_version, ensure}; +use risingwave_connector::source::cdc::build_cdc_table_id; use risingwave_connector::source::{should_copy_to_format_encode_options, UPSTREAM_SOURCE_KEY}; use risingwave_pb::catalog::subscription::PbSubscriptionState; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, TableType}; @@ -139,6 +140,7 @@ use self::utils::{ use crate::controller::rename::{ alter_relation_rename, alter_relation_rename_refs, ReplaceTableExprRewriter, }; +use crate::controller::utils::extract_external_table_name_from_definition; use crate::manager::catalog::utils::{refcnt_dec_connection, refcnt_inc_connection}; use crate::rpc::ddl_controller::DropMode; use crate::telemetry::MetaTelemetryJobDesc; @@ -268,6 +270,7 @@ impl CatalogManager { self.init_database().await?; self.source_backward_compat_check().await?; self.table_sink_catalog_update().await?; + self.table_catalog_cdc_table_id_update().await?; Ok(()) } @@ -355,6 +358,47 @@ impl CatalogManager { Ok(()) } + + // Fill in the `cdc_table_id` that wasn't written in the previous version for the table. + async fn table_catalog_cdc_table_id_update(&self) -> MetaResult<()> { + let core = &mut *self.core.lock().await; + let sources = BTreeMapTransaction::new(&mut core.database.sources); + let mut tables = BTreeMapTransaction::new(&mut core.database.tables); + let legacy_tables = tables + .tree_ref() + .iter() + .filter(|(_, table)| { + if let Some(rel_id) = table.dependent_relations.first() + && sources.contains_key(rel_id) + && table.table_type == TableType::Table as i32 + && table.cdc_table_id.is_none() + { + true + } else { + false + } + }) + .map(|(_, table)| table.clone()) + .collect_vec(); + for mut table in legacy_tables { + let source_id = table.dependent_relations[0]; + match extract_external_table_name_from_definition(&table.definition) { + None => { + tracing::warn!( + table_id = table.id, + definition = table.definition, + "failed to extract cdc table name from table definition.", + ) + } + Some(external_table_name) => { + table.cdc_table_id = Some(build_cdc_table_id(source_id, &external_table_name)); + } + } + tables.insert(table.id, table); + } + commit_meta!(self, tables)?; + Ok(()) + } } // Database catalog related methods @@ -4119,7 +4163,7 @@ impl CatalogManager { .get_table_name_and_type_mapping() } - pub async fn get_table_by_cdc_table_id(&self, cdc_table_id: String) -> Vec
{ + pub async fn get_table_by_cdc_table_id(&self, cdc_table_id: &String) -> Vec
{ self.core .lock() .await @@ -4888,3 +4932,22 @@ impl CatalogManager { Ok(()) } } + +#[cfg(test)] +mod tests { + use crate::manager::catalog::extract_external_table_name_from_definition; + + #[test] + fn test_extract_cdc_table_name() { + let ddl1 = "CREATE TABLE t1 () FROM pg_source TABLE 'public.t1'"; + let ddl2 = "CREATE TABLE t2 (v1 int) FROM pg_source TABLE 'mydb.t2'"; + assert_eq!( + extract_external_table_name_from_definition(ddl1), + Some("public.t1".into()) + ); + assert_eq!( + extract_external_table_name_from_definition(ddl2), + Some("mydb.t2".into()) + ); + } +} diff --git a/src/meta/src/manager/diagnose.rs b/src/meta/src/manager/diagnose.rs index 190bc51f41a8e..e50242a9d44e6 100644 --- a/src/meta/src/manager/diagnose.rs +++ b/src/meta/src/manager/diagnose.rs @@ -34,7 +34,7 @@ use serde_json::json; use thiserror_ext::AsReport; use crate::hummock::HummockManagerRef; -use crate::manager::event_log::EventLogMangerRef; +use crate::manager::event_log::EventLogManagerRef; use crate::manager::{MetadataManager, MetadataManagerV2}; use crate::MetaResult; @@ -43,7 +43,7 @@ pub type DiagnoseCommandRef = Arc; pub struct DiagnoseCommand { metadata_manager: MetadataManager, hummock_manger: HummockManagerRef, - event_log_manager: EventLogMangerRef, + event_log_manager: EventLogManagerRef, prometheus_client: Option, prometheus_selector: String, } @@ -52,7 +52,7 @@ impl DiagnoseCommand { pub fn new( metadata_manager: MetadataManager, hummock_manger: HummockManagerRef, - event_log_manager: EventLogMangerRef, + event_log_manager: EventLogManagerRef, prometheus_client: Option, prometheus_selector: String, ) -> Self { diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 73a514519c3aa..22f88bd9c0a75 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -38,7 +38,7 @@ use crate::controller::session_params::{SessionParamsController, SessionParamsCo use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef}; use crate::controller::SqlMetaStore; use crate::hummock::sequence::SequenceGenerator; -use crate::manager::event_log::{start_event_log_manager, EventLogMangerRef}; +use crate::manager::event_log::{start_event_log_manager, EventLogManagerRef}; use crate::manager::{ IdGeneratorManager, IdGeneratorManagerRef, IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef, @@ -131,7 +131,7 @@ pub struct MetaSrvEnv { /// idle status manager. idle_manager: IdleManagerRef, - event_log_manager: EventLogMangerRef, + event_log_manager: EventLogManagerRef, /// Unique identifier of the cluster. cluster_id: ClusterId, @@ -575,7 +575,7 @@ impl MetaSrvEnv { &self.cluster_id } - pub fn event_log_manager_ref(&self) -> EventLogMangerRef { + pub fn event_log_manager_ref(&self) -> EventLogManagerRef { self.event_log_manager.clone() } } diff --git a/src/meta/src/manager/event_log.rs b/src/meta/src/manager/event_log.rs index 92175593c3c59..a1fdc356502d9 100644 --- a/src/meta/src/manager/event_log.rs +++ b/src/meta/src/manager/event_log.rs @@ -21,7 +21,7 @@ use risingwave_pb::meta::event_log::{Event as PbEvent, Event}; use risingwave_pb::meta::EventLog as PbEventLog; use tokio::task::JoinHandle; -pub type EventLogMangerRef = Arc; +pub type EventLogManagerRef = Arc; type EventLogSender = tokio::sync::mpsc::Sender; type ShutdownSender = tokio::sync::oneshot::Sender<()>; @@ -171,6 +171,7 @@ impl From<&EventLog> for ChannelId { Event::InjectBarrierFail(_) => 5, Event::CollectBarrierFail(_) => 6, Event::WorkerNodePanic(_) => 7, + Event::AutoSchemaChangeFail(_) => 8, } } } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 3622e5e7c8504..c1b02b0e72444 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -567,7 +567,7 @@ impl MetadataManager { pub async fn get_table_catalog_by_cdc_table_id( &self, - cdc_table_id: String, + cdc_table_id: &String, ) -> MetaResult> { match &self { MetadataManager::V1(mgr) => Ok(mgr diff --git a/src/meta/src/manager/mod.rs b/src/meta/src/manager/mod.rs index a7368c72c0333..eac1b15c8e699 100644 --- a/src/meta/src/manager/mod.rs +++ b/src/meta/src/manager/mod.rs @@ -30,6 +30,7 @@ mod system_param; pub use catalog::*; pub use cluster::{WorkerKey, *}; pub use env::{MetaSrvEnv, *}; +pub use event_log::EventLogManagerRef; pub use id::*; pub use idle::*; pub use metadata::*; diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 6c951a8a28110..1531dca93b909 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -131,8 +131,10 @@ impl SourceExecutor { // spawn a task to handle schema change event from source parser let _join_handle = tokio::task::spawn(async move { while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await { - let table_names = schema_change.table_names(); - tracing::info!("recv a schema change event for tables: {:?}", table_names); + let table_ids = schema_change.table_ids(); + tracing::info!( + target: "auto_schema_change", + "recv a schema change event for tables: {:?}", table_ids); // TODO: retry on rpc error if let Some(ref meta_client) = meta_client { match meta_client @@ -141,13 +143,14 @@ impl SourceExecutor { { Ok(_) => { tracing::info!( - "schema change success for tables: {:?}", - table_names - ); + target: "auto_schema_change", + "schema change success for tables: {:?}", table_ids); finish_tx.send(()).unwrap(); } Err(e) => { - tracing::error!(error = ?e.as_report(), "schema change error"); + tracing::error!( + target: "auto_schema_change", + error = ?e.as_report(), "schema change error"); finish_tx.send(()).unwrap(); } } diff --git a/src/tests/simulation/tests/integration_tests/sink/scale.rs b/src/tests/simulation/tests/integration_tests/sink/scale.rs index c89489bf559a8..99c3b7e9ebc5c 100644 --- a/src/tests/simulation/tests/integration_tests/sink/scale.rs +++ b/src/tests/simulation/tests/integration_tests/sink/scale.rs @@ -166,5 +166,6 @@ async fn test_sink_scale() -> Result<()> { #[tokio::test] async fn test_sink_decouple_scale() -> Result<()> { + init_logger(); scale_test_inner(true).await } diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index ee4bbd88d6f87..8530a7c7a585c 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -210,6 +210,7 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { // Configure levels for some RisingWave crates. // Other RisingWave crates like `stream` and `storage` will follow the default level. filter = filter + .with_target("auto_schema_change", Level::INFO) .with_target("risingwave_sqlparser", Level::INFO) .with_target("risingwave_connector_node", Level::INFO) .with_target("pgwire", Level::INFO)