diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 07ffd6eb470ac..478c748ceabfc 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -23,6 +23,9 @@ create source mysql_mytest with ( server.id = '5601' ); +statement error Should not create MATERIALIZED VIEW directly on shared CDC source. +create materialized view mv as select * from mysql_mytest; + statement error The upstream table name must contain database name prefix* create table products_test ( id INT, name STRING, diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index a9808e3a9e1e2..ee049a2248ca1 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -36,6 +36,7 @@ use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::mysql_row_to_owned_row; use crate::source::cdc::external::mock_external_table::MockExternalTableReader; use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset}; +use crate::WithPropertiesExt; #[derive(Debug)] pub enum CdcTableType { @@ -46,11 +47,8 @@ pub enum CdcTableType { } impl CdcTableType { - pub fn from_properties(with_properties: &HashMap) -> Self { - let connector = with_properties - .get("connector") - .map(|c| c.to_ascii_lowercase()) - .unwrap_or_default(); + pub fn from_properties(with_properties: &impl WithPropertiesExt) -> Self { + let connector = with_properties.get_connector().unwrap_or_default(); match connector.as_str() { "mysql-cdc" => Self::MySql, "postgres-cdc" => Self::Postgres, diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index 1ce981dd86820..b84381c35794b 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -14,6 +14,7 @@ use std::collections::{BTreeMap, HashMap}; +use crate::source::cdc::external::CdcTableType; use crate::source::iceberg::ICEBERG_CONNECTOR; use crate::source::{ GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, UPSTREAM_SOURCE_KEY, @@ -78,7 +79,7 @@ impl Get for BTreeMap { } /// Utility methods for `WITH` properties (`HashMap` and `BTreeMap`). -pub trait WithPropertiesExt: Get { +pub trait WithPropertiesExt: Get + Sized { #[inline(always)] fn get_connector(&self) -> Option { self.get(UPSTREAM_SOURCE_KEY).map(|s| s.to_lowercase()) @@ -100,6 +101,10 @@ pub trait WithPropertiesExt: Get { connector.contains("-cdc") } + fn is_shared_cdc_source(&self) -> bool { + self.is_cdc_connector() && CdcTableType::from_properties(self).can_backfill() + } + #[inline(always)] fn is_iceberg_connector(&self) -> bool { let Some(connector) = self.get_connector() else { diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index a459efd39f016..2dd3f0fc8952f 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -18,6 +18,7 @@ use itertools::Itertools; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{is_system_schema, Field}; use risingwave_common::session_config::USER_NAME_WILD_CARD; +use risingwave_connector::WithPropertiesExt; use risingwave_sqlparser::ast::{Statement, TableAlias}; use risingwave_sqlparser::parser::Parser; use thiserror_ext::AsReport; @@ -52,6 +53,12 @@ pub struct BoundSource { pub catalog: SourceCatalog, } +impl BoundSource { + pub fn is_shared_cdc_source(&self) -> bool { + self.catalog.with_properties.is_shared_cdc_source() + } +} + impl From<&SourceCatalog> for BoundSource { fn from(s: &SourceCatalog) -> Self { Self { catalog: s.clone() } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 620cd2e066872..ba308e31d1878 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -38,7 +38,6 @@ use risingwave_connector::schema::schema_registry::{ name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME, }; use risingwave_connector::sink::iceberg::IcebergConfig; -use risingwave_connector::source::cdc::external::CdcTableType; use risingwave_connector::source::cdc::{ CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, @@ -1304,12 +1303,7 @@ pub async fn handle_create_source( ensure_table_constraints_supported(&stmt.constraints)?; let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?; - // gated the feature with a session variable - let create_cdc_source_job = if with_properties.is_cdc_connector() { - CdcTableType::from_properties(&with_properties).can_backfill() - } else { - false - }; + let create_cdc_source_job = with_properties.is_shared_cdc_source(); let (columns_from_resolve_source, source_info) = if create_cdc_source_job { bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)? diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 3f64a8fde4405..f8f71d9431210 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -86,12 +86,19 @@ impl Planner { } pub(super) fn plan_source(&mut self, source: BoundSource) -> Result { - Ok(LogicalSource::with_catalog( - Rc::new(source.catalog), - SourceNodeKind::CreateMViewOrBatch, - self.ctx(), - )? - .into()) + if source.is_shared_cdc_source() { + Err(ErrorCode::InternalError( + "Should not create MATERIALIZED VIEW directly on shared CDC source. HINT: create TABLE from the source instead.".to_string(), + ) + .into()) + } else { + Ok(LogicalSource::with_catalog( + Rc::new(source.catalog), + SourceNodeKind::CreateMViewOrBatch, + self.ctx(), + )? + .into()) + } } pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result {