diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 3986593920770..fe917de7d3696 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -15,6 +15,8 @@ use std::collections::BTreeMap; use risingwave_common::bail; +use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; +use risingwave_common::types::DataType; use super::simd_json_parser::DebeziumJsonAccessBuilder; use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig}; @@ -28,6 +30,20 @@ use crate::parser::{ }; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; +/// Note: these columns are added in `SourceStreamChunkRowWriter::do_action`. +/// May also look for the usage of `SourceColumnType`. +pub fn debezium_cdc_source_schema() -> Vec { + let columns = vec![ + ColumnCatalog { + column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb), + is_hidden: false, + }, + ColumnCatalog::offset_column(), + ColumnCatalog::cdc_table_name_column(), + ]; + columns +} + #[derive(Debug)] pub struct DebeziumParser { key_builder: AccessBuilderImpl, @@ -192,7 +208,7 @@ mod tests { use std::ops::Deref; use std::sync::Arc; - use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; + use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId, CDC_SOURCE_COLUMN_NUM}; use risingwave_common::row::Row; use risingwave_common::types::Timestamptz; use risingwave_pb::plan_common::{ @@ -327,4 +343,11 @@ mod tests { _ => panic!("unexpected parse result: {:?}", res), } } + + #[tokio::test] + async fn test_cdc_source_job_schema() { + let columns = debezium_cdc_source_schema(); + // make sure it doesn't broken by future PRs + assert_eq!(CDC_SOURCE_COLUMN_NUM, columns.len() as u32); + } } diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 41dcc43b0f8f6..05548351492b9 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -30,8 +30,9 @@ use risingwave_sqlparser::ast::{ }; use risingwave_sqlparser::parser::Parser; -use super::alter_table_column::schema_has_schema_registry; -use super::create_source::{generate_stream_graph_for_source, validate_compatibility}; +use super::create_source::{ + generate_stream_graph_for_source, schema_has_schema_registry, validate_compatibility, +}; use super::util::SourceSchemaCompatExt; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 15554e919c77a..0342004f6b1b5 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -29,12 +29,12 @@ use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph}; use risingwave_sqlparser::ast::{ - AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Encode, - FormatEncodeOptions, Ident, ObjectName, Statement, StructField, TableConstraint, + AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Ident, ObjectName, + Statement, StructField, TableConstraint, }; use risingwave_sqlparser::parser::Parser; -use super::create_source::get_json_schema_location; +use super::create_source::schema_has_schema_registry; use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator}; use super::util::SourceSchemaCompatExt; use super::{HandlerArgs, RwPgResponse}; @@ -45,7 +45,7 @@ use crate::expr::{Expr, ExprImpl, InputRef, Literal}; use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project}; use crate::handler::create_table::bind_table_constraints; use crate::session::SessionImpl; -use crate::{Binder, TableCatalog, WithOptions}; +use crate::{Binder, TableCatalog}; /// Used in auto schema change process pub async fn get_new_table_definition_for_cdc_table( @@ -475,17 +475,6 @@ pub async fn handle_alter_table_column( Ok(PgResponse::empty_result(StatementType::ALTER_TABLE)) } -pub fn schema_has_schema_registry(schema: &FormatEncodeOptions) -> bool { - match schema.row_encode { - Encode::Avro | Encode::Protobuf => true, - Encode::Json => { - let mut options = WithOptions::try_from(schema.row_options()).unwrap(); - matches!(get_json_schema_location(options.inner_mut()), Ok(Some(_))) - } - _ => false, - } -} - pub fn fetch_table_catalog_for_alter( session: &SessionImpl, table_name: &ObjectName, diff --git a/src/frontend/src/handler/alter_table_with_sr.rs b/src/frontend/src/handler/alter_table_with_sr.rs index d0daa6f5a82a5..7d6bdab5bf1b3 100644 --- a/src/frontend/src/handler/alter_table_with_sr.rs +++ b/src/frontend/src/handler/alter_table_with_sr.rs @@ -21,7 +21,8 @@ use risingwave_sqlparser::parser::Parser; use thiserror_ext::AsReport; use super::alter_source_with_sr::alter_definition_format_encode; -use super::alter_table_column::{fetch_table_catalog_for_alter, schema_has_schema_registry}; +use super::alter_table_column::fetch_table_catalog_for_alter; +use super::create_source::schema_has_schema_registry; use super::util::SourceSchemaCompatExt; use super::{get_replace_table_plan, HandlerArgs, RwPgResponse}; use crate::error::{ErrorCode, Result}; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index a6e9ca9b1d93d..2981a96423edf 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -18,6 +18,9 @@ use std::sync::LazyLock; use anyhow::{anyhow, Context}; use either::Either; +use external_schema::debezium::extract_debezium_avro_table_pk_columns; +use external_schema::iceberg::check_iceberg_source; +use external_schema::nexmark::check_nexmark_schema; use itertools::Itertools; use maplit::{convert_args, hashmap, hashset}; use pgwire::pg_response::{PgResponse, StatementType}; @@ -99,138 +102,16 @@ use crate::utils::{ }; use crate::{bind_data_type, build_graph, OptimizerContext, WithOptions, WithOptionsSecResolved}; -/// Map a JSON schema to a relational schema -async fn extract_json_table_schema( - schema_config: &Option<(AstString, bool)>, - with_properties: &BTreeMap, - format_encode_options: &mut BTreeMap, -) -> Result>> { - match schema_config { - None => Ok(None), - Some((schema_location, use_schema_registry)) => { - let schema_registry_auth = use_schema_registry.then(|| { - let auth = SchemaRegistryAuth::from(&*format_encode_options); - try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME); - try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD); - auth - }); - Ok(Some( - fetch_json_schema_and_map_to_columns( - &schema_location.0, - schema_registry_auth, - with_properties, - ) - .await? - .into_iter() - .map(|col| ColumnCatalog { - column_desc: col.into(), - is_hidden: false, - }) - .collect_vec(), - )) - } - } -} - -/// Note: these columns are added in `SourceStreamChunkRowWriter::do_action`. -/// May also look for the usage of `SourceColumnType`. -pub fn debezium_cdc_source_schema() -> Vec { - let columns = vec![ - ColumnCatalog { - column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb), - is_hidden: false, - }, - ColumnCatalog::offset_column(), - ColumnCatalog::cdc_table_name_column(), - ]; - columns -} - -fn json_schema_infer_use_schema_registry(schema_config: &Option<(AstString, bool)>) -> bool { - match schema_config { - None => false, - Some((_, use_registry)) => *use_registry, - } -} - -/// Map an Avro schema to a relational schema. -async fn extract_avro_table_schema( - info: &StreamSourceInfo, - with_properties: &WithOptionsSecResolved, - format_encode_options: &mut BTreeMap, - is_debezium: bool, -) -> Result> { - let parser_config = SpecificParserConfig::new(info, with_properties)?; - try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME); - try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD); - consume_aws_config_from_options(format_encode_options); - - let vec_column_desc = if is_debezium { - let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?; - conf.map_to_columns()? - } else { - if let risingwave_connector::parser::EncodingProperties::Avro(avro_props) = - &parser_config.encoding_config - && matches!(avro_props.schema_location, SchemaLocation::File { .. }) - && format_encode_options - .get("with_deprecated_file_header") - .is_none_or(|v| v != "true") - { - bail_not_implemented!(issue = 12871, "avro without schema registry"); - } - let conf = AvroParserConfig::new(parser_config.encoding_config).await?; - conf.map_to_columns()? - }; - Ok(vec_column_desc - .into_iter() - .map(|col| ColumnCatalog { - column_desc: col.into(), - is_hidden: false, - }) - .collect_vec()) -} - -async fn extract_debezium_avro_table_pk_columns( - info: &StreamSourceInfo, - with_properties: &WithOptionsSecResolved, -) -> Result> { - let parser_config = SpecificParserConfig::new(info, with_properties)?; - let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?; - Ok(conf.extract_pks()?.drain(..).map(|c| c.name).collect()) -} - -/// Map a protobuf schema to a relational schema. -async fn extract_protobuf_table_schema( - schema: &ProtobufSchema, - with_properties: &WithOptionsSecResolved, - format_encode_options: &mut BTreeMap, -) -> Result> { - let info = StreamSourceInfo { - proto_message_name: schema.message_name.0.clone(), - row_schema_location: schema.row_schema_location.0.clone(), - use_schema_registry: schema.use_schema_registry, - format: FormatType::Plain.into(), - row_encode: EncodeType::Protobuf.into(), - format_encode_options: format_encode_options.clone(), - ..Default::default() - }; - let parser_config = SpecificParserConfig::new(&info, with_properties)?; - try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME); - try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD); - consume_aws_config_from_options(format_encode_options); - - let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?; - - let column_descs = conf.map_to_columns()?; - - Ok(column_descs - .into_iter() - .map(|col| ColumnCatalog { - column_desc: col.into(), - is_hidden: false, - }) - .collect_vec()) -} +mod external_schema; +pub use external_schema::{ + bind_columns_from_source, get_schema_location, schema_has_schema_registry, +}; +mod validate; +pub use validate::validate_compatibility; +use validate::{ALLOWED_CONNECTION_CONNECTOR, ALLOWED_CONNECTION_SCHEMA_REGISTRY}; +mod additional_column; +use additional_column::check_and_add_timestamp_column; +pub use additional_column::handle_addition_columns; fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec { columns @@ -260,48 +141,6 @@ fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap, -) -> Result> { - let schema_location = try_consume_string_from_options(format_encode_options, "schema.location"); - let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry"); - match (schema_location, schema_registry) { - (None, None) => Ok(None), - (None, Some(schema_registry)) => Ok(Some((schema_registry, true))), - (Some(schema_location), None) => Ok(Some((schema_location, false))), - (Some(_), Some(_)) => Err(RwError::from(ProtocolError( - "only need either the schema location or the schema registry".to_owned(), - ))), - } -} - -fn get_schema_location( - format_encode_options: &mut BTreeMap, -) -> Result<(AstString, bool)> { - let schema_location = try_consume_string_from_options(format_encode_options, "schema.location"); - let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry"); - match (schema_location, schema_registry) { - (None, None) => Err(RwError::from(ProtocolError( - "missing either a schema location or a schema registry".to_owned(), - ))), - (None, Some(schema_registry)) => Ok((schema_registry, true)), - (Some(schema_location), None) => Ok((schema_location, false)), - (Some(_), Some(_)) => Err(RwError::from(ProtocolError( - "only need either the schema location or the schema registry".to_owned(), - ))), - } -} - -#[inline] -fn get_name_strategy_or_default(name_strategy: Option) -> Result> { - match name_strategy { - None => Ok(None), - Some(name) => Ok(Some(name_strategy_from_str(name.0.as_str()) - .ok_or_else(|| RwError::from(ProtocolError(format!("\ - expect strategy name in topic_name_strategy, record_name_strategy and topic_record_name_strategy, but got {}", name))))? as i32)), - } -} - #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum CreateSourceType { SharedCdc, @@ -341,435 +180,6 @@ impl CreateSourceType { } } -/// Resolves the schema of the source from external schema file. -/// See for more information. -/// -/// Note: the returned schema strictly corresponds to the schema. -/// Other special columns like additional columns (`INCLUDE`), and `row_id` column are not included. -pub async fn bind_columns_from_source( - session: &SessionImpl, - format_encode: &FormatEncodeOptions, - with_properties: Either<&WithOptions, &WithOptionsSecResolved>, - create_source_type: CreateSourceType, -) -> Result<(Option>, StreamSourceInfo)> { - let (columns_from_resolve_source, mut source_info) = - if create_source_type == CreateSourceType::SharedCdc { - bind_columns_from_source_for_cdc(session, format_encode)? - } else { - bind_columns_from_source_for_non_cdc(session, format_encode, with_properties).await? - }; - if create_source_type.is_shared() { - // Note: this field should be called is_shared. Check field doc for more details. - source_info.cdc_source_job = true; - source_info.is_distributed = create_source_type == CreateSourceType::SharedNonCdc; - } - Ok((columns_from_resolve_source, source_info)) -} - -async fn bind_columns_from_source_for_non_cdc( - session: &SessionImpl, - format_encode: &FormatEncodeOptions, - with_properties: Either<&WithOptions, &WithOptionsSecResolved>, -) -> Result<(Option>, StreamSourceInfo)> { - const MESSAGE_NAME_KEY: &str = "message"; - const KEY_MESSAGE_NAME_KEY: &str = "key.message"; - const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy"; - - let options_with_secret = match with_properties { - Either::Left(options) => { - let (sec_resolve_props, connection_type, _) = resolve_connection_ref_and_secret_ref( - options.clone(), - session, - TelemetryDatabaseObject::Source, - )?; - if !ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) { - return Err(RwError::from(ProtocolError(format!( - "connection type {:?} is not allowed, allowed types: {:?}", - connection_type, ALLOWED_CONNECTION_CONNECTOR - )))); - } - - sec_resolve_props - } - Either::Right(options_with_secret) => options_with_secret.clone(), - }; - - let is_kafka: bool = options_with_secret.is_kafka_connector(); - - // todo: need to resolve connection ref for schema registry - let (sec_resolve_props, connection_type, schema_registry_conn_ref) = - resolve_connection_ref_and_secret_ref( - WithOptions::try_from(format_encode.row_options())?, - session, - TelemetryDatabaseObject::Source, - )?; - ensure_connection_type_allowed(connection_type, &ALLOWED_CONNECTION_SCHEMA_REGISTRY)?; - - let (format_encode_options, format_encode_secret_refs) = sec_resolve_props.into_parts(); - // Need real secret to access the schema registry - let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets( - format_encode_options.clone(), - format_encode_secret_refs.clone(), - )?; - - fn get_key_message_name(options: &mut BTreeMap) -> Option { - consume_string_from_options(options, KEY_MESSAGE_NAME_KEY) - .map(|ele| Some(ele.0)) - .unwrap_or(None) - } - fn get_sr_name_strategy_check( - options: &mut BTreeMap, - use_sr: bool, - ) -> Result> { - let name_strategy = get_name_strategy_or_default(try_consume_string_from_options( - options, - NAME_STRATEGY_KEY, - ))?; - if !use_sr && name_strategy.is_some() { - return Err(RwError::from(ProtocolError( - "schema registry name strategy only works with schema registry enabled".to_owned(), - ))); - } - Ok(name_strategy) - } - - let mut stream_source_info = StreamSourceInfo { - format: format_to_prost(&format_encode.format) as i32, - row_encode: row_encode_to_prost(&format_encode.row_encode) as i32, - format_encode_options, - format_encode_secret_refs, - connection_id: schema_registry_conn_ref, - ..Default::default() - }; - - if format_encode.format == Format::Debezium { - try_consume_string_from_options(&mut format_encode_options_to_consume, DEBEZIUM_IGNORE_KEY); - } - - let columns = match (&format_encode.format, &format_encode.row_encode) { - (Format::Native, Encode::Native) - | (Format::Plain, Encode::Bytes) - | (Format::DebeziumMongo, Encode::Json) => None, - (Format::Plain, Encode::Protobuf) | (Format::Upsert, Encode::Protobuf) => { - let (row_schema_location, use_schema_registry) = - get_schema_location(&mut format_encode_options_to_consume)?; - let protobuf_schema = ProtobufSchema { - message_name: consume_string_from_options( - &mut format_encode_options_to_consume, - MESSAGE_NAME_KEY, - )?, - row_schema_location, - use_schema_registry, - }; - let name_strategy = get_sr_name_strategy_check( - &mut format_encode_options_to_consume, - protobuf_schema.use_schema_registry, - )?; - - stream_source_info.use_schema_registry = protobuf_schema.use_schema_registry; - stream_source_info - .row_schema_location - .clone_from(&protobuf_schema.row_schema_location.0); - stream_source_info - .proto_message_name - .clone_from(&protobuf_schema.message_name.0); - stream_source_info.key_message_name = - get_key_message_name(&mut format_encode_options_to_consume); - stream_source_info.name_strategy = - name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32); - - Some( - extract_protobuf_table_schema( - &protobuf_schema, - &options_with_secret, - &mut format_encode_options_to_consume, - ) - .await?, - ) - } - (format @ (Format::Plain | Format::Upsert | Format::Debezium), Encode::Avro) => { - if format_encode_options_to_consume - .remove(AWS_GLUE_SCHEMA_ARN_KEY) - .is_none() - { - // Legacy logic that assumes either `schema.location` or confluent `schema.registry`. - // The handling of newly added aws glue is centralized in `connector::parser`. - // TODO(xiangjinwu): move these option parsing to `connector::parser` as well. - - let (row_schema_location, use_schema_registry) = - get_schema_location(&mut format_encode_options_to_consume)?; - - if matches!(format, Format::Debezium) && !use_schema_registry { - return Err(RwError::from(ProtocolError( - "schema location for DEBEZIUM_AVRO row format is not supported".to_owned(), - ))); - } - - let message_name = try_consume_string_from_options( - &mut format_encode_options_to_consume, - MESSAGE_NAME_KEY, - ); - let name_strategy = get_sr_name_strategy_check( - &mut format_encode_options_to_consume, - use_schema_registry, - )?; - - stream_source_info.use_schema_registry = use_schema_registry; - stream_source_info - .row_schema_location - .clone_from(&row_schema_location.0); - stream_source_info.proto_message_name = - message_name.unwrap_or(AstString("".into())).0; - stream_source_info.key_message_name = - get_key_message_name(&mut format_encode_options_to_consume); - stream_source_info.name_strategy = - name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32); - } - - Some( - extract_avro_table_schema( - &stream_source_info, - &options_with_secret, - &mut format_encode_options_to_consume, - matches!(format, Format::Debezium), - ) - .await?, - ) - } - (Format::Plain, Encode::Csv) => { - let chars = - consume_string_from_options(&mut format_encode_options_to_consume, "delimiter")?.0; - let delimiter = get_delimiter(chars.as_str()).context("failed to parse delimiter")?; - let has_header = try_consume_string_from_options( - &mut format_encode_options_to_consume, - "without_header", - ) - .map(|s| s.0 == "false") - .unwrap_or(true); - - if is_kafka && has_header { - return Err(RwError::from(ProtocolError( - "CSV HEADER is not supported when creating table with Kafka connector" - .to_owned(), - ))); - } - - stream_source_info.csv_delimiter = delimiter as i32; - stream_source_info.csv_has_header = has_header; - - None - } - // For parquet format, this step is implemented in parquet parser. - (Format::Plain, Encode::Parquet) => None, - ( - Format::Plain | Format::Upsert | Format::Maxwell | Format::Canal | Format::Debezium, - Encode::Json, - ) => { - if matches!( - format_encode.format, - Format::Plain | Format::Upsert | Format::Debezium - ) { - // Parse the value but throw it away. - // It would be too late to report error in `SpecificParserConfig::new`, - // which leads to recovery loop. - // TODO: rely on SpecificParserConfig::new to validate, like Avro - TimestamptzHandling::from_options(&format_encode_options_to_consume) - .map_err(|err| InvalidInputSyntax(err.message))?; - try_consume_string_from_options( - &mut format_encode_options_to_consume, - TimestamptzHandling::OPTION_KEY, - ); - } - - let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?; - stream_source_info.use_schema_registry = - json_schema_infer_use_schema_registry(&schema_config); - - extract_json_table_schema( - &schema_config, - &options_with_secret, - &mut format_encode_options_to_consume, - ) - .await? - } - (Format::None, Encode::None) => { - if options_with_secret.is_iceberg_connector() { - Some( - extract_iceberg_columns(&options_with_secret) - .await - .map_err(|err| ProtocolError(err.to_report_string()))?, - ) - } else { - None - } - } - (format, encoding) => { - return Err(RwError::from(ProtocolError(format!( - "Unknown combination {:?} {:?}", - format, encoding - )))); - } - }; - - if !format_encode_options_to_consume.is_empty() { - let err_string = format!( - "Get unknown format_encode_options for {:?} {:?}: {}", - format_encode.format, - format_encode.row_encode, - format_encode_options_to_consume - .keys() - .map(|k| k.to_string()) - .collect::>() - .join(","), - ); - session.notice_to_user(err_string); - } - Ok((columns, stream_source_info)) -} - -fn bind_columns_from_source_for_cdc( - session: &SessionImpl, - format_encode: &FormatEncodeOptions, -) -> Result<(Option>, StreamSourceInfo)> { - let with_options = WithOptions::try_from(format_encode.row_options())?; - if !with_options.connection_ref().is_empty() { - return Err(RwError::from(NotSupported( - "CDC connector does not support connection ref yet".to_owned(), - "Explicitly specify the connection in WITH clause".to_owned(), - ))); - } - let (format_encode_options, format_encode_secret_refs) = - resolve_secret_ref_in_with_options(with_options, session)?.into_parts(); - - // Need real secret to access the schema registry - let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets( - format_encode_options.clone(), - format_encode_secret_refs.clone(), - )?; - - match (&format_encode.format, &format_encode.row_encode) { - (Format::Plain, Encode::Json) => (), - (format, encoding) => { - // Note: parser will also check this. Just be extra safe here - return Err(RwError::from(ProtocolError(format!( - "Row format for CDC connectors should be either omitted or set to `FORMAT PLAIN ENCODE JSON`, got: {:?} {:?}", - format, encoding - )))); - } - }; - - let columns = debezium_cdc_source_schema(); - let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?; - - let stream_source_info = StreamSourceInfo { - format: format_to_prost(&format_encode.format) as i32, - row_encode: row_encode_to_prost(&format_encode.row_encode) as i32, - format_encode_options, - use_schema_registry: json_schema_infer_use_schema_registry(&schema_config), - cdc_source_job: true, - is_distributed: false, - format_encode_secret_refs, - ..Default::default() - }; - if !format_encode_options_to_consume.is_empty() { - let err_string = format!( - "Get unknown format_encode_options for {:?} {:?}: {}", - format_encode.format, - format_encode.row_encode, - format_encode_options_to_consume - .keys() - .map(|k| k.to_string()) - .collect::>() - .join(","), - ); - session.notice_to_user(err_string); - } - Ok((Some(columns), stream_source_info)) -} - -// check the additional column compatibility with the format and encode -fn check_additional_column_compatibility( - column_def: &IncludeOptionItem, - format_encode: Option<&FormatEncodeOptions>, -) -> Result<()> { - // only allow header column have inner field - if column_def.inner_field.is_some() - && !column_def - .column_type - .real_value() - .eq_ignore_ascii_case("header") - { - return Err(RwError::from(ProtocolError(format!( - "Only header column can have inner field, but got {:?}", - column_def.column_type.real_value(), - )))); - } - - // Payload column only allowed when encode is JSON - if let Some(schema) = format_encode - && column_def - .column_type - .real_value() - .eq_ignore_ascii_case("payload") - && !matches!(schema.row_encode, Encode::Json) - { - return Err(RwError::from(ProtocolError(format!( - "INCLUDE payload is only allowed when using ENCODE JSON, but got ENCODE {:?}", - schema.row_encode - )))); - } - Ok(()) -} - -/// add connector-spec columns to the end of column catalog -pub fn handle_addition_columns( - format_encode: Option<&FormatEncodeOptions>, - with_properties: &BTreeMap, - mut additional_columns: IncludeOption, - columns: &mut Vec, - is_cdc_backfill_table: bool, -) -> Result<()> { - let connector_name = with_properties.get_connector().unwrap(); // there must be a connector in source - - if get_supported_additional_columns(connector_name.as_str(), is_cdc_backfill_table).is_none() - && !additional_columns.is_empty() - { - return Err(RwError::from(ProtocolError(format!( - "Connector {} accepts no additional column but got {:?}", - connector_name, additional_columns - )))); - } - - while let Some(item) = additional_columns.pop() { - check_additional_column_compatibility(&item, format_encode)?; - - let data_type = item - .header_inner_expect_type - .map(|dt| bind_data_type(&dt)) - .transpose()?; - if let Some(dt) = &data_type - && !matches!(dt, DataType::Bytea | DataType::Varchar) - { - return Err( - ErrorCode::BindError(format!("invalid additional column data type: {dt}")).into(), - ); - } - let col = build_additional_column_desc( - ColumnId::placeholder(), - connector_name.as_str(), - item.column_type.real_value().as_str(), - item.column_alias.map(|alias| alias.real_value()), - item.inner_field.as_deref(), - data_type.as_ref(), - true, - is_cdc_backfill_table, - )?; - columns.push(ColumnCatalog::visible(col)); - } - - Ok(()) -} - /// Bind columns from both source and sql defined. pub(crate) fn bind_all_columns( format_encode: &FormatEncodeOptions, @@ -1076,35 +486,6 @@ pub(crate) async fn bind_source_pk( Ok(res) } -// Add a hidden column `_rw_kafka_timestamp` to each message from Kafka source. -fn check_and_add_timestamp_column(with_properties: &WithOptions, columns: &mut Vec) { - if with_properties.is_kafka_connector() { - if columns.iter().any(|col| { - matches!( - col.column_desc.additional_column.column_type, - Some(AdditionalColumnType::Timestamp(_)) - ) - }) { - // already has timestamp column, no need to add a new one - return; - } - - // add a hidden column `_rw_kafka_timestamp` to each message from Kafka source - let col = build_additional_column_desc( - ColumnId::placeholder(), - KAFKA_CONNECTOR, - "timestamp", - Some(KAFKA_TIMESTAMP_COLUMN_NAME.to_owned()), - None, - None, - true, - false, - ) - .unwrap(); - columns.push(ColumnCatalog::hidden(col)); - } -} - pub(super) fn bind_source_watermark( session: &SessionImpl, name: String, @@ -1139,236 +520,6 @@ pub(super) fn bind_source_watermark( Ok(watermark_descs) } -static ALLOWED_CONNECTION_CONNECTOR: LazyLock> = LazyLock::new(|| { - hashset! { - PbConnectionType::Unspecified, - PbConnectionType::Kafka, - PbConnectionType::Iceberg, - } -}); - -static ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock> = - LazyLock::new(|| { - hashset! { - PbConnectionType::Unspecified, - PbConnectionType::SchemaRegistry, - } - }); - -// TODO: Better design if we want to support ENCODE KEY where we will have 4 dimensional array -static CONNECTORS_COMPATIBLE_FORMATS: LazyLock>>> = - LazyLock::new(|| { - convert_args!(hashmap!( - KAFKA_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv], - Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf], - Format::Debezium => vec![Encode::Json, Encode::Avro], - Format::Maxwell => vec![Encode::Json], - Format::Canal => vec![Encode::Json], - Format::DebeziumMongo => vec![Encode::Json], - ), - PULSAR_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes], - Format::Upsert => vec![Encode::Json, Encode::Avro], - Format::Debezium => vec![Encode::Json], - Format::Maxwell => vec![Encode::Json], - Format::Canal => vec![Encode::Json], - ), - KINESIS_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes], - Format::Upsert => vec![Encode::Json, Encode::Avro], - Format::Debezium => vec![Encode::Json], - Format::Maxwell => vec![Encode::Json], - Format::Canal => vec![Encode::Json], - ), - GOOGLE_PUBSUB_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes], - Format::Debezium => vec![Encode::Json], - Format::Maxwell => vec![Encode::Json], - Format::Canal => vec![Encode::Json], - ), - NEXMARK_CONNECTOR => hashmap!( - Format::Native => vec![Encode::Native], - Format::Plain => vec![Encode::Bytes], - ), - DATAGEN_CONNECTOR => hashmap!( - Format::Native => vec![Encode::Native], - Format::Plain => vec![Encode::Bytes, Encode::Json], - ), - S3_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Csv, Encode::Json], - ), - OPENDAL_S3_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet], - ), - GCS_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet], - ), - AZBLOB_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet], - ), - POSIX_FS_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Csv], - ), - MYSQL_CDC_CONNECTOR => hashmap!( - Format::Debezium => vec![Encode::Json], - // support source stream job - Format::Plain => vec![Encode::Json], - ), - POSTGRES_CDC_CONNECTOR => hashmap!( - Format::Debezium => vec![Encode::Json], - // support source stream job - Format::Plain => vec![Encode::Json], - ), - CITUS_CDC_CONNECTOR => hashmap!( - Format::Debezium => vec![Encode::Json], - ), - MONGODB_CDC_CONNECTOR => hashmap!( - Format::DebeziumMongo => vec![Encode::Json], - ), - NATS_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes], - ), - MQTT_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Json, Encode::Bytes], - ), - TEST_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Json], - ), - ICEBERG_CONNECTOR => hashmap!( - Format::None => vec![Encode::None], - ), - SQL_SERVER_CDC_CONNECTOR => hashmap!( - Format::Debezium => vec![Encode::Json], - // support source stream job - Format::Plain => vec![Encode::Json], - ), - )) - }); - -pub fn validate_license(connector: &str) -> Result<()> { - if connector == SQL_SERVER_CDC_CONNECTOR { - Feature::SqlServerCdcSource - .check_available() - .map_err(|e| anyhow::anyhow!(e))?; - } - Ok(()) -} - -pub fn validate_compatibility( - format_encode: &FormatEncodeOptions, - props: &mut BTreeMap, -) -> Result<()> { - let mut connector = props - .get_connector() - .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_owned())))?; - - if connector == OPENDAL_S3_CONNECTOR { - // reject s3_v2 creation - return Err(RwError::from(Deprecated( - OPENDAL_S3_CONNECTOR.to_owned(), - S3_CONNECTOR.to_owned(), - ))); - } - if connector == S3_CONNECTOR { - // S3 connector is deprecated, use OPENDAL_S3_CONNECTOR instead - // do s3 -> s3_v2 migration - let entry = props.get_mut(UPSTREAM_SOURCE_KEY).unwrap(); - *entry = OPENDAL_S3_CONNECTOR.to_owned(); - connector = OPENDAL_S3_CONNECTOR.to_owned(); - } - - let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS - .get(&connector) - .ok_or_else(|| { - RwError::from(ProtocolError(format!( - "connector {:?} is not supported, accept {:?}", - connector, - CONNECTORS_COMPATIBLE_FORMATS.keys() - ))) - })?; - - validate_license(&connector)?; - if connector != KAFKA_CONNECTOR { - let res = match (&format_encode.format, &format_encode.row_encode) { - (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => { - let mut options = WithOptions::try_from(format_encode.row_options())?; - let (_, use_schema_registry) = get_schema_location(options.inner_mut())?; - use_schema_registry - } - (Format::Debezium, Encode::Avro) => true, - (_, _) => false, - }; - if res { - return Err(RwError::from(ProtocolError(format!( - "The {} must be kafka when schema registry is used", - UPSTREAM_SOURCE_KEY - )))); - } - } - - let compatible_encodes = compatible_formats - .get(&format_encode.format) - .ok_or_else(|| { - RwError::from(ProtocolError(format!( - "connector {} does not support format {:?}", - connector, format_encode.format - ))) - })?; - if !compatible_encodes.contains(&format_encode.row_encode) { - return Err(RwError::from(ProtocolError(format!( - "connector {} does not support format {:?} with encode {:?}", - connector, format_encode.format, format_encode.row_encode - )))); - } - - if connector == POSTGRES_CDC_CONNECTOR || connector == CITUS_CDC_CONNECTOR { - match props.get("slot.name") { - None => { - // Build a random slot name with UUID - // e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815" - let uuid = uuid::Uuid::new_v4(); - props.insert("slot.name".into(), format!("rw_cdc_{}", uuid.simple())); - } - Some(slot_name) => { - // please refer to - // - https://github.com/debezium/debezium/blob/97956ce25b7612e3413d363658661896b7d2e0a2/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java#L1179 - // - https://doxygen.postgresql.org/slot_8c.html#afac399f07320b9adfd2c599cf822aaa3 - if !slot_name - .chars() - .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') - || slot_name.len() > 63 - { - return Err(RwError::from(ProtocolError(format!( - "Invalid replication slot name: {:?}. Valid replication slot name must contain only digits, lowercase characters and underscores with length <= 63", - slot_name - )))); - } - } - } - - if !props.contains_key("schema.name") { - // Default schema name is "public" - props.insert("schema.name".into(), "public".into()); - } - if !props.contains_key("publication.name") { - // Default publication name is "rw_publication" - props.insert("publication.name".into(), "rw_publication".into()); - } - if !props.contains_key("publication.create.enable") { - // Default auto create publication if doesn't exist - props.insert("publication.create.enable".into(), "true".into()); - } - } - - if connector == SQL_SERVER_CDC_CONNECTOR && !props.contains_key("schema.name") { - // Default schema name is "dbo" - props.insert("schema.name".into(), "dbo".into()); - } - - Ok(()) -} - /// Performs early stage checking in frontend to see if the schema of the given `columns` is /// compatible with the connector extracted from the properties. /// @@ -1394,145 +545,6 @@ pub(super) async fn check_format_encode( } } -pub(super) fn check_nexmark_schema( - props: &WithOptionsSecResolved, - row_id_index: Option, - columns: &[ColumnCatalog], -) -> Result<()> { - let table_type = props - .get("nexmark.table.type") - .map(|t| t.to_ascii_lowercase()); - - let event_type = match table_type.as_deref() { - None => None, - Some("bid") => Some(EventType::Bid), - Some("auction") => Some(EventType::Auction), - Some("person") => Some(EventType::Person), - Some(t) => { - return Err(RwError::from(ProtocolError(format!( - "unsupported table type for nexmark source: {}", - t - )))) - } - }; - - // Ignore the generated columns and map the index of row_id column. - let user_defined_columns = columns.iter().filter(|c| !c.is_generated()); - let row_id_index = if let Some(index) = row_id_index { - let col_id = columns[index].column_id(); - user_defined_columns - .clone() - .position(|c| c.column_id() == col_id) - .unwrap() - .into() - } else { - None - }; - - let expected = get_event_data_types_with_names(event_type, row_id_index); - let user_defined = user_defined_columns - .map(|c| { - ( - c.column_desc.name.to_ascii_lowercase(), - c.column_desc.data_type.to_owned(), - ) - }) - .collect_vec(); - - if expected != user_defined { - let cmp = pretty_assertions::Comparison::new(&expected, &user_defined); - return Err(RwError::from(ProtocolError(format!( - "The schema of the nexmark source must specify all columns in order:\n{cmp}", - )))); - } - Ok(()) -} - -pub async fn extract_iceberg_columns( - with_properties: &WithOptionsSecResolved, -) -> anyhow::Result> { - let props = ConnectorProperties::extract(with_properties.clone(), true)?; - if let ConnectorProperties::Iceberg(properties) = props { - let table = properties.load_table_v2().await?; - let iceberg_schema: arrow_schema_iceberg::Schema = - iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?; - - let mut columns: Vec = iceberg_schema - .fields() - .iter() - .enumerate() - .map(|(i, field)| { - let column_desc = ColumnDesc::named( - field.name(), - ColumnId::new((i + 1).try_into().unwrap()), - IcebergArrowConvert.type_from_field(field).unwrap(), - ); - ColumnCatalog { - column_desc, - // hide the _row_id column for iceberg engine table - // This column is auto generated when users define a table without primary key - is_hidden: field.name() == ROWID_PREFIX, - } - }) - .collect(); - columns.push(ColumnCatalog::iceberg_sequence_num_column()); - - Ok(columns) - } else { - Err(anyhow!(format!( - "Invalid properties for iceberg source: {:?}", - props - ))) - } -} - -pub async fn check_iceberg_source( - props: &WithOptionsSecResolved, - columns: &[ColumnCatalog], -) -> anyhow::Result<()> { - let props = ConnectorProperties::extract(props.clone(), true)?; - let ConnectorProperties::Iceberg(properties) = props else { - return Err(anyhow!(format!( - "Invalid properties for iceberg source: {:?}", - props - ))); - }; - - let schema = Schema { - fields: columns - .iter() - .filter(|&c| c.column_desc.name != ICEBERG_SEQUENCE_NUM_COLUMN_NAME) - .cloned() - .map(|c| c.column_desc.into()) - .collect(), - }; - - let table = properties.load_table_v2().await?; - - let iceberg_schema = iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?; - - for f1 in schema.fields() { - if !iceberg_schema.fields.iter().any(|f2| f2.name() == &f1.name) { - return Err(anyhow::anyhow!(format!( - "Column {} not found in iceberg table", - f1.name - ))); - } - } - - let new_iceberg_field = iceberg_schema - .fields - .iter() - .filter(|f1| schema.fields.iter().any(|f2| f1.name() == &f2.name)) - .cloned() - .collect::>(); - let new_iceberg_schema = arrow_schema_iceberg::Schema::new(new_iceberg_field); - - risingwave_connector::sink::iceberg::try_matches_arrow_schema(&schema, &new_iceberg_schema)?; - - Ok(()) -} - pub fn bind_connector_props( handler_args: &HandlerArgs, format_encode: &FormatEncodeOptions, @@ -1875,46 +887,16 @@ pub(super) fn generate_stream_graph_for_source( Ok(graph) } -fn format_to_prost(format: &Format) -> FormatType { - match format { - Format::Native => FormatType::Native, - Format::Plain => FormatType::Plain, - Format::Upsert => FormatType::Upsert, - Format::Debezium => FormatType::Debezium, - Format::DebeziumMongo => FormatType::DebeziumMongo, - Format::Maxwell => FormatType::Maxwell, - Format::Canal => FormatType::Canal, - Format::None => FormatType::None, - } -} -fn row_encode_to_prost(row_encode: &Encode) -> EncodeType { - match row_encode { - Encode::Native => EncodeType::Native, - Encode::Json => EncodeType::Json, - Encode::Avro => EncodeType::Avro, - Encode::Protobuf => EncodeType::Protobuf, - Encode::Csv => EncodeType::Csv, - Encode::Bytes => EncodeType::Bytes, - Encode::Template => EncodeType::Template, - Encode::Parquet => EncodeType::Parquet, - Encode::None => EncodeType::None, - Encode::Text => EncodeType::Text, - } -} - #[cfg(test)] pub mod tests { use std::collections::HashMap; use std::sync::Arc; - use risingwave_common::catalog::{ - CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROWID_PREFIX, - }; + use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROWID_PREFIX}; use risingwave_common::types::{DataType, StructType}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::source_catalog::SourceCatalog; - use crate::handler::create_source::debezium_cdc_source_schema; use crate::test_utils::{create_proto_file, LocalFrontend, PROTO_FILE_DATA}; const GET_COLUMN_FROM_CATALOG: fn(&Arc) -> HashMap<&str, DataType> = @@ -2087,13 +1069,6 @@ pub mod tests { .assert_debug_eq(&columns); } - #[tokio::test] - async fn test_cdc_source_job_schema() { - let columns = debezium_cdc_source_schema(); - // make sure it doesn't broken by future PRs - assert_eq!(CDC_SOURCE_COLUMN_NUM, columns.len() as u32); - } - #[tokio::test] async fn test_source_addition_columns() { // test derive include column for format plain diff --git a/src/frontend/src/handler/create_source/additional_column.rs b/src/frontend/src/handler/create_source/additional_column.rs new file mode 100644 index 0000000000000..7af5c4519be06 --- /dev/null +++ b/src/frontend/src/handler/create_source/additional_column.rs @@ -0,0 +1,130 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +// check the additional column compatibility with the format and encode +fn check_additional_column_compatibility( + column_def: &IncludeOptionItem, + format_encode: Option<&FormatEncodeOptions>, +) -> Result<()> { + // only allow header column have inner field + if column_def.inner_field.is_some() + && !column_def + .column_type + .real_value() + .eq_ignore_ascii_case("header") + { + return Err(RwError::from(ProtocolError(format!( + "Only header column can have inner field, but got {:?}", + column_def.column_type.real_value(), + )))); + } + + // Payload column only allowed when encode is JSON + if let Some(schema) = format_encode + && column_def + .column_type + .real_value() + .eq_ignore_ascii_case("payload") + && !matches!(schema.row_encode, Encode::Json) + { + return Err(RwError::from(ProtocolError(format!( + "INCLUDE payload is only allowed when using ENCODE JSON, but got ENCODE {:?}", + schema.row_encode + )))); + } + Ok(()) +} + +/// add connector-spec columns to the end of column catalog +pub fn handle_addition_columns( + format_encode: Option<&FormatEncodeOptions>, + with_properties: &BTreeMap, + mut additional_columns: IncludeOption, + columns: &mut Vec, + is_cdc_backfill_table: bool, +) -> Result<()> { + let connector_name = with_properties.get_connector().unwrap(); // there must be a connector in source + + if get_supported_additional_columns(connector_name.as_str(), is_cdc_backfill_table).is_none() + && !additional_columns.is_empty() + { + return Err(RwError::from(ProtocolError(format!( + "Connector {} accepts no additional column but got {:?}", + connector_name, additional_columns + )))); + } + + while let Some(item) = additional_columns.pop() { + check_additional_column_compatibility(&item, format_encode)?; + + let data_type = item + .header_inner_expect_type + .map(|dt| bind_data_type(&dt)) + .transpose()?; + if let Some(dt) = &data_type + && !matches!(dt, DataType::Bytea | DataType::Varchar) + { + return Err( + ErrorCode::BindError(format!("invalid additional column data type: {dt}")).into(), + ); + } + let col = build_additional_column_desc( + ColumnId::placeholder(), + connector_name.as_str(), + item.column_type.real_value().as_str(), + item.column_alias.map(|alias| alias.real_value()), + item.inner_field.as_deref(), + data_type.as_ref(), + true, + is_cdc_backfill_table, + )?; + columns.push(ColumnCatalog::visible(col)); + } + + Ok(()) +} + +// Add a hidden column `_rw_kafka_timestamp` to each message from Kafka source. +pub fn check_and_add_timestamp_column( + with_properties: &WithOptions, + columns: &mut Vec, +) { + if with_properties.is_kafka_connector() { + if columns.iter().any(|col| { + matches!( + col.column_desc.additional_column.column_type, + Some(AdditionalColumnType::Timestamp(_)) + ) + }) { + // already has timestamp column, no need to add a new one + return; + } + + // add a hidden column `_rw_kafka_timestamp` to each message from Kafka source + let col = build_additional_column_desc( + ColumnId::placeholder(), + KAFKA_CONNECTOR, + "timestamp", + Some(KAFKA_TIMESTAMP_COLUMN_NAME.to_owned()), + None, + None, + true, + false, + ) + .unwrap(); + columns.push(ColumnCatalog::hidden(col)); + } +} diff --git a/src/frontend/src/handler/create_source/external_schema.rs b/src/frontend/src/handler/create_source/external_schema.rs new file mode 100644 index 0000000000000..8d8288eec3a1c --- /dev/null +++ b/src/frontend/src/handler/create_source/external_schema.rs @@ -0,0 +1,441 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! bind columns from external schema + +use risingwave_connector::parser::debezium_cdc_source_schema; + +use super::*; + +mod json; +use json::*; +mod avro; +use avro::extract_avro_table_schema; +pub mod debezium; +pub mod iceberg; +use iceberg::extract_iceberg_columns; +mod protobuf; +use protobuf::extract_protobuf_table_schema; +pub mod nexmark; + +/// Resolves the schema of the source from external schema file. +/// See for more information. +/// +/// Note: the returned schema strictly corresponds to the schema. +/// Other special columns like additional columns (`INCLUDE`), and `row_id` column are not included. +pub async fn bind_columns_from_source( + session: &SessionImpl, + format_encode: &FormatEncodeOptions, + with_properties: Either<&WithOptions, &WithOptionsSecResolved>, + create_source_type: CreateSourceType, +) -> Result<(Option>, StreamSourceInfo)> { + let (columns_from_resolve_source, mut source_info) = + if create_source_type == CreateSourceType::SharedCdc { + bind_columns_from_source_for_cdc(session, format_encode)? + } else { + bind_columns_from_source_for_non_cdc(session, format_encode, with_properties).await? + }; + if create_source_type.is_shared() { + // Note: this field should be called is_shared. Check field doc for more details. + source_info.cdc_source_job = true; + source_info.is_distributed = create_source_type == CreateSourceType::SharedNonCdc; + } + Ok((columns_from_resolve_source, source_info)) +} + +async fn bind_columns_from_source_for_non_cdc( + session: &SessionImpl, + format_encode: &FormatEncodeOptions, + with_properties: Either<&WithOptions, &WithOptionsSecResolved>, +) -> Result<(Option>, StreamSourceInfo)> { + const MESSAGE_NAME_KEY: &str = "message"; + const KEY_MESSAGE_NAME_KEY: &str = "key.message"; + const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy"; + + let options_with_secret = match with_properties { + Either::Left(options) => { + let (sec_resolve_props, connection_type, _) = resolve_connection_ref_and_secret_ref( + options.clone(), + session, + TelemetryDatabaseObject::Source, + )?; + if !ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) { + return Err(RwError::from(ProtocolError(format!( + "connection type {:?} is not allowed, allowed types: {:?}", + connection_type, ALLOWED_CONNECTION_CONNECTOR + )))); + } + + sec_resolve_props + } + Either::Right(options_with_secret) => options_with_secret.clone(), + }; + + let is_kafka: bool = options_with_secret.is_kafka_connector(); + + // todo: need to resolve connection ref for schema registry + let (sec_resolve_props, connection_type, schema_registry_conn_ref) = + resolve_connection_ref_and_secret_ref( + WithOptions::try_from(format_encode.row_options())?, + session, + TelemetryDatabaseObject::Source, + )?; + ensure_connection_type_allowed(connection_type, &ALLOWED_CONNECTION_SCHEMA_REGISTRY)?; + + let (format_encode_options, format_encode_secret_refs) = sec_resolve_props.into_parts(); + // Need real secret to access the schema registry + let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets( + format_encode_options.clone(), + format_encode_secret_refs.clone(), + )?; + + fn get_key_message_name(options: &mut BTreeMap) -> Option { + consume_string_from_options(options, KEY_MESSAGE_NAME_KEY) + .map(|ele| Some(ele.0)) + .unwrap_or(None) + } + fn get_sr_name_strategy_check( + options: &mut BTreeMap, + use_sr: bool, + ) -> Result> { + let name_strategy = get_name_strategy_or_default(try_consume_string_from_options( + options, + NAME_STRATEGY_KEY, + ))?; + if !use_sr && name_strategy.is_some() { + return Err(RwError::from(ProtocolError( + "schema registry name strategy only works with schema registry enabled".to_owned(), + ))); + } + Ok(name_strategy) + } + + let mut stream_source_info = StreamSourceInfo { + format: format_to_prost(&format_encode.format) as i32, + row_encode: row_encode_to_prost(&format_encode.row_encode) as i32, + format_encode_options, + format_encode_secret_refs, + connection_id: schema_registry_conn_ref, + ..Default::default() + }; + + if format_encode.format == Format::Debezium { + try_consume_string_from_options(&mut format_encode_options_to_consume, DEBEZIUM_IGNORE_KEY); + } + + let columns = match (&format_encode.format, &format_encode.row_encode) { + (Format::Native, Encode::Native) + | (Format::Plain, Encode::Bytes) + | (Format::DebeziumMongo, Encode::Json) => None, + (Format::Plain, Encode::Protobuf) | (Format::Upsert, Encode::Protobuf) => { + let (row_schema_location, use_schema_registry) = + get_schema_location(&mut format_encode_options_to_consume)?; + let protobuf_schema = ProtobufSchema { + message_name: consume_string_from_options( + &mut format_encode_options_to_consume, + MESSAGE_NAME_KEY, + )?, + row_schema_location, + use_schema_registry, + }; + let name_strategy = get_sr_name_strategy_check( + &mut format_encode_options_to_consume, + protobuf_schema.use_schema_registry, + )?; + + stream_source_info.use_schema_registry = protobuf_schema.use_schema_registry; + stream_source_info + .row_schema_location + .clone_from(&protobuf_schema.row_schema_location.0); + stream_source_info + .proto_message_name + .clone_from(&protobuf_schema.message_name.0); + stream_source_info.key_message_name = + get_key_message_name(&mut format_encode_options_to_consume); + stream_source_info.name_strategy = + name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32); + + Some( + extract_protobuf_table_schema( + &protobuf_schema, + &options_with_secret, + &mut format_encode_options_to_consume, + ) + .await?, + ) + } + (format @ (Format::Plain | Format::Upsert | Format::Debezium), Encode::Avro) => { + if format_encode_options_to_consume + .remove(AWS_GLUE_SCHEMA_ARN_KEY) + .is_none() + { + // Legacy logic that assumes either `schema.location` or confluent `schema.registry`. + // The handling of newly added aws glue is centralized in `connector::parser`. + // TODO(xiangjinwu): move these option parsing to `connector::parser` as well. + + let (row_schema_location, use_schema_registry) = + get_schema_location(&mut format_encode_options_to_consume)?; + + if matches!(format, Format::Debezium) && !use_schema_registry { + return Err(RwError::from(ProtocolError( + "schema location for DEBEZIUM_AVRO row format is not supported".to_owned(), + ))); + } + + let message_name = try_consume_string_from_options( + &mut format_encode_options_to_consume, + MESSAGE_NAME_KEY, + ); + let name_strategy = get_sr_name_strategy_check( + &mut format_encode_options_to_consume, + use_schema_registry, + )?; + + stream_source_info.use_schema_registry = use_schema_registry; + stream_source_info + .row_schema_location + .clone_from(&row_schema_location.0); + stream_source_info.proto_message_name = + message_name.unwrap_or(AstString("".into())).0; + stream_source_info.key_message_name = + get_key_message_name(&mut format_encode_options_to_consume); + stream_source_info.name_strategy = + name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32); + } + + Some( + extract_avro_table_schema( + &stream_source_info, + &options_with_secret, + &mut format_encode_options_to_consume, + matches!(format, Format::Debezium), + ) + .await?, + ) + } + (Format::Plain, Encode::Csv) => { + let chars = + consume_string_from_options(&mut format_encode_options_to_consume, "delimiter")?.0; + let delimiter = get_delimiter(chars.as_str()).context("failed to parse delimiter")?; + let has_header = try_consume_string_from_options( + &mut format_encode_options_to_consume, + "without_header", + ) + .map(|s| s.0 == "false") + .unwrap_or(true); + + if is_kafka && has_header { + return Err(RwError::from(ProtocolError( + "CSV HEADER is not supported when creating table with Kafka connector" + .to_owned(), + ))); + } + + stream_source_info.csv_delimiter = delimiter as i32; + stream_source_info.csv_has_header = has_header; + + None + } + // For parquet format, this step is implemented in parquet parser. + (Format::Plain, Encode::Parquet) => None, + ( + Format::Plain | Format::Upsert | Format::Maxwell | Format::Canal | Format::Debezium, + Encode::Json, + ) => { + if matches!( + format_encode.format, + Format::Plain | Format::Upsert | Format::Debezium + ) { + // Parse the value but throw it away. + // It would be too late to report error in `SpecificParserConfig::new`, + // which leads to recovery loop. + // TODO: rely on SpecificParserConfig::new to validate, like Avro + TimestamptzHandling::from_options(&format_encode_options_to_consume) + .map_err(|err| InvalidInputSyntax(err.message))?; + try_consume_string_from_options( + &mut format_encode_options_to_consume, + TimestamptzHandling::OPTION_KEY, + ); + } + + let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?; + stream_source_info.use_schema_registry = + json_schema_infer_use_schema_registry(&schema_config); + + extract_json_table_schema( + &schema_config, + &options_with_secret, + &mut format_encode_options_to_consume, + ) + .await? + } + (Format::None, Encode::None) => { + if options_with_secret.is_iceberg_connector() { + Some( + extract_iceberg_columns(&options_with_secret) + .await + .map_err(|err| ProtocolError(err.to_report_string()))?, + ) + } else { + None + } + } + (format, encoding) => { + return Err(RwError::from(ProtocolError(format!( + "Unknown combination {:?} {:?}", + format, encoding + )))); + } + }; + + if !format_encode_options_to_consume.is_empty() { + let err_string = format!( + "Get unknown format_encode_options for {:?} {:?}: {}", + format_encode.format, + format_encode.row_encode, + format_encode_options_to_consume + .keys() + .map(|k| k.to_string()) + .collect::>() + .join(","), + ); + session.notice_to_user(err_string); + } + Ok((columns, stream_source_info)) +} + +fn bind_columns_from_source_for_cdc( + session: &SessionImpl, + format_encode: &FormatEncodeOptions, +) -> Result<(Option>, StreamSourceInfo)> { + let with_options = WithOptions::try_from(format_encode.row_options())?; + if !with_options.connection_ref().is_empty() { + return Err(RwError::from(NotSupported( + "CDC connector does not support connection ref yet".to_owned(), + "Explicitly specify the connection in WITH clause".to_owned(), + ))); + } + let (format_encode_options, format_encode_secret_refs) = + resolve_secret_ref_in_with_options(with_options, session)?.into_parts(); + + // Need real secret to access the schema registry + let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets( + format_encode_options.clone(), + format_encode_secret_refs.clone(), + )?; + + match (&format_encode.format, &format_encode.row_encode) { + (Format::Plain, Encode::Json) => (), + (format, encoding) => { + // Note: parser will also check this. Just be extra safe here + return Err(RwError::from(ProtocolError(format!( + "Row format for CDC connectors should be either omitted or set to `FORMAT PLAIN ENCODE JSON`, got: {:?} {:?}", + format, encoding + )))); + } + }; + + let columns = debezium_cdc_source_schema(); + let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?; + + let stream_source_info = StreamSourceInfo { + format: format_to_prost(&format_encode.format) as i32, + row_encode: row_encode_to_prost(&format_encode.row_encode) as i32, + format_encode_options, + use_schema_registry: json_schema_infer_use_schema_registry(&schema_config), + cdc_source_job: true, + is_distributed: false, + format_encode_secret_refs, + ..Default::default() + }; + if !format_encode_options_to_consume.is_empty() { + let err_string = format!( + "Get unknown format_encode_options for {:?} {:?}: {}", + format_encode.format, + format_encode.row_encode, + format_encode_options_to_consume + .keys() + .map(|k| k.to_string()) + .collect::>() + .join(","), + ); + session.notice_to_user(err_string); + } + Ok((Some(columns), stream_source_info)) +} + +fn format_to_prost(format: &Format) -> FormatType { + match format { + Format::Native => FormatType::Native, + Format::Plain => FormatType::Plain, + Format::Upsert => FormatType::Upsert, + Format::Debezium => FormatType::Debezium, + Format::DebeziumMongo => FormatType::DebeziumMongo, + Format::Maxwell => FormatType::Maxwell, + Format::Canal => FormatType::Canal, + Format::None => FormatType::None, + } +} +fn row_encode_to_prost(row_encode: &Encode) -> EncodeType { + match row_encode { + Encode::Native => EncodeType::Native, + Encode::Json => EncodeType::Json, + Encode::Avro => EncodeType::Avro, + Encode::Protobuf => EncodeType::Protobuf, + Encode::Csv => EncodeType::Csv, + Encode::Bytes => EncodeType::Bytes, + Encode::Template => EncodeType::Template, + Encode::Parquet => EncodeType::Parquet, + Encode::None => EncodeType::None, + Encode::Text => EncodeType::Text, + } +} + +pub fn get_schema_location( + format_encode_options: &mut BTreeMap, +) -> Result<(AstString, bool)> { + let schema_location = try_consume_string_from_options(format_encode_options, "schema.location"); + let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry"); + match (schema_location, schema_registry) { + (None, None) => Err(RwError::from(ProtocolError( + "missing either a schema location or a schema registry".to_owned(), + ))), + (None, Some(schema_registry)) => Ok((schema_registry, true)), + (Some(schema_location), None) => Ok((schema_location, false)), + (Some(_), Some(_)) => Err(RwError::from(ProtocolError( + "only need either the schema location or the schema registry".to_owned(), + ))), + } +} + +pub fn schema_has_schema_registry(schema: &FormatEncodeOptions) -> bool { + match schema.row_encode { + Encode::Avro | Encode::Protobuf => true, + Encode::Json => { + let mut options = WithOptions::try_from(schema.row_options()).unwrap(); + matches!(get_json_schema_location(options.inner_mut()), Ok(Some(_))) + } + _ => false, + } +} + +#[inline] +fn get_name_strategy_or_default(name_strategy: Option) -> Result> { + match name_strategy { + None => Ok(None), + Some(name) => Ok(Some(name_strategy_from_str(name.0.as_str()) + .ok_or_else(|| RwError::from(ProtocolError(format!("\ + expect strategy name in topic_name_strategy, record_name_strategy and topic_record_name_strategy, but got {}", name))))? as i32)), + } +} diff --git a/src/frontend/src/handler/create_source/external_schema/avro.rs b/src/frontend/src/handler/create_source/external_schema/avro.rs new file mode 100644 index 0000000000000..e6d3e0e7cd3a5 --- /dev/null +++ b/src/frontend/src/handler/create_source/external_schema/avro.rs @@ -0,0 +1,52 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +/// Map an Avro schema to a relational schema. +pub async fn extract_avro_table_schema( + info: &StreamSourceInfo, + with_properties: &WithOptionsSecResolved, + format_encode_options: &mut BTreeMap, + is_debezium: bool, +) -> Result> { + let parser_config = SpecificParserConfig::new(info, with_properties)?; + try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME); + try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD); + consume_aws_config_from_options(format_encode_options); + + let vec_column_desc = if is_debezium { + let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?; + conf.map_to_columns()? + } else { + if let risingwave_connector::parser::EncodingProperties::Avro(avro_props) = + &parser_config.encoding_config + && matches!(avro_props.schema_location, SchemaLocation::File { .. }) + && format_encode_options + .get("with_deprecated_file_header") + .is_none_or(|v| v != "true") + { + bail_not_implemented!(issue = 12871, "avro without schema registry"); + } + let conf = AvroParserConfig::new(parser_config.encoding_config).await?; + conf.map_to_columns()? + }; + Ok(vec_column_desc + .into_iter() + .map(|col| ColumnCatalog { + column_desc: col.into(), + is_hidden: false, + }) + .collect_vec()) +} diff --git a/src/frontend/src/handler/create_source/external_schema/debezium.rs b/src/frontend/src/handler/create_source/external_schema/debezium.rs new file mode 100644 index 0000000000000..d02667016092f --- /dev/null +++ b/src/frontend/src/handler/create_source/external_schema/debezium.rs @@ -0,0 +1,24 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +pub async fn extract_debezium_avro_table_pk_columns( + info: &StreamSourceInfo, + with_properties: &WithOptionsSecResolved, +) -> Result> { + let parser_config = SpecificParserConfig::new(info, with_properties)?; + let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?; + Ok(conf.extract_pks()?.drain(..).map(|c| c.name).collect()) +} diff --git a/src/frontend/src/handler/create_source/external_schema/iceberg.rs b/src/frontend/src/handler/create_source/external_schema/iceberg.rs new file mode 100644 index 0000000000000..5ec0998a2c70e --- /dev/null +++ b/src/frontend/src/handler/create_source/external_schema/iceberg.rs @@ -0,0 +1,101 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +pub async fn extract_iceberg_columns( + with_properties: &WithOptionsSecResolved, +) -> anyhow::Result> { + let props = ConnectorProperties::extract(with_properties.clone(), true)?; + if let ConnectorProperties::Iceberg(properties) = props { + let table = properties.load_table_v2().await?; + let iceberg_schema: arrow_schema_iceberg::Schema = + ::iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?; + + let mut columns: Vec = iceberg_schema + .fields() + .iter() + .enumerate() + .map(|(i, field)| { + let column_desc = ColumnDesc::named( + field.name(), + ColumnId::new((i + 1).try_into().unwrap()), + IcebergArrowConvert.type_from_field(field).unwrap(), + ); + ColumnCatalog { + column_desc, + // hide the _row_id column for iceberg engine table + // This column is auto generated when users define a table without primary key + is_hidden: field.name() == ROWID_PREFIX, + } + }) + .collect(); + columns.push(ColumnCatalog::iceberg_sequence_num_column()); + + Ok(columns) + } else { + Err(anyhow!(format!( + "Invalid properties for iceberg source: {:?}", + props + ))) + } +} + +pub async fn check_iceberg_source( + props: &WithOptionsSecResolved, + columns: &[ColumnCatalog], +) -> anyhow::Result<()> { + let props = ConnectorProperties::extract(props.clone(), true)?; + let ConnectorProperties::Iceberg(properties) = props else { + return Err(anyhow!(format!( + "Invalid properties for iceberg source: {:?}", + props + ))); + }; + + let schema = Schema { + fields: columns + .iter() + .filter(|&c| c.column_desc.name != ICEBERG_SEQUENCE_NUM_COLUMN_NAME) + .cloned() + .map(|c| c.column_desc.into()) + .collect(), + }; + + let table = properties.load_table_v2().await?; + + let iceberg_schema = + ::iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?; + + for f1 in schema.fields() { + if !iceberg_schema.fields.iter().any(|f2| f2.name() == &f1.name) { + return Err(anyhow::anyhow!(format!( + "Column {} not found in iceberg table", + f1.name + ))); + } + } + + let new_iceberg_field = iceberg_schema + .fields + .iter() + .filter(|f1| schema.fields.iter().any(|f2| f1.name() == &f2.name)) + .cloned() + .collect::>(); + let new_iceberg_schema = arrow_schema_iceberg::Schema::new(new_iceberg_field); + + risingwave_connector::sink::iceberg::try_matches_arrow_schema(&schema, &new_iceberg_schema)?; + + Ok(()) +} diff --git a/src/frontend/src/handler/create_source/external_schema/json.rs b/src/frontend/src/handler/create_source/external_schema/json.rs new file mode 100644 index 0000000000000..1dc64f669b2cf --- /dev/null +++ b/src/frontend/src/handler/create_source/external_schema/json.rs @@ -0,0 +1,70 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +pub fn json_schema_infer_use_schema_registry(schema_config: &Option<(AstString, bool)>) -> bool { + match schema_config { + None => false, + Some((_, use_registry)) => *use_registry, + } +} + +/// Map a JSON schema to a relational schema +pub async fn extract_json_table_schema( + schema_config: &Option<(AstString, bool)>, + with_properties: &BTreeMap, + format_encode_options: &mut BTreeMap, +) -> Result>> { + match schema_config { + None => Ok(None), + Some((schema_location, use_schema_registry)) => { + let schema_registry_auth = use_schema_registry.then(|| { + let auth = SchemaRegistryAuth::from(&*format_encode_options); + try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME); + try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD); + auth + }); + Ok(Some( + fetch_json_schema_and_map_to_columns( + &schema_location.0, + schema_registry_auth, + with_properties, + ) + .await? + .into_iter() + .map(|col| ColumnCatalog { + column_desc: col.into(), + is_hidden: false, + }) + .collect_vec(), + )) + } + } +} + +pub fn get_json_schema_location( + format_encode_options: &mut BTreeMap, +) -> Result> { + let schema_location = try_consume_string_from_options(format_encode_options, "schema.location"); + let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry"); + match (schema_location, schema_registry) { + (None, None) => Ok(None), + (None, Some(schema_registry)) => Ok(Some((schema_registry, true))), + (Some(schema_location), None) => Ok(Some((schema_location, false))), + (Some(_), Some(_)) => Err(RwError::from(ProtocolError( + "only need either the schema location or the schema registry".to_owned(), + ))), + } +} diff --git a/src/frontend/src/handler/create_source/external_schema/nexmark.rs b/src/frontend/src/handler/create_source/external_schema/nexmark.rs new file mode 100644 index 0000000000000..0d08424f5fa7f --- /dev/null +++ b/src/frontend/src/handler/create_source/external_schema/nexmark.rs @@ -0,0 +1,69 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +pub fn check_nexmark_schema( + props: &WithOptionsSecResolved, + row_id_index: Option, + columns: &[ColumnCatalog], +) -> Result<()> { + let table_type = props + .get("nexmark.table.type") + .map(|t| t.to_ascii_lowercase()); + + let event_type = match table_type.as_deref() { + None => None, + Some("bid") => Some(EventType::Bid), + Some("auction") => Some(EventType::Auction), + Some("person") => Some(EventType::Person), + Some(t) => { + return Err(RwError::from(ProtocolError(format!( + "unsupported table type for nexmark source: {}", + t + )))) + } + }; + + // Ignore the generated columns and map the index of row_id column. + let user_defined_columns = columns.iter().filter(|c| !c.is_generated()); + let row_id_index = if let Some(index) = row_id_index { + let col_id = columns[index].column_id(); + user_defined_columns + .clone() + .position(|c| c.column_id() == col_id) + .unwrap() + .into() + } else { + None + }; + + let expected = get_event_data_types_with_names(event_type, row_id_index); + let user_defined = user_defined_columns + .map(|c| { + ( + c.column_desc.name.to_ascii_lowercase(), + c.column_desc.data_type.to_owned(), + ) + }) + .collect_vec(); + + if expected != user_defined { + let cmp = pretty_assertions::Comparison::new(&expected, &user_defined); + return Err(RwError::from(ProtocolError(format!( + "The schema of the nexmark source must specify all columns in order:\n{cmp}", + )))); + } + Ok(()) +} diff --git a/src/frontend/src/handler/create_source/external_schema/protobuf.rs b/src/frontend/src/handler/create_source/external_schema/protobuf.rs new file mode 100644 index 0000000000000..7d3f8d2692c27 --- /dev/null +++ b/src/frontend/src/handler/create_source/external_schema/protobuf.rs @@ -0,0 +1,48 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +/// Map a protobuf schema to a relational schema. +pub async fn extract_protobuf_table_schema( + schema: &ProtobufSchema, + with_properties: &WithOptionsSecResolved, + format_encode_options: &mut BTreeMap, +) -> Result> { + let info = StreamSourceInfo { + proto_message_name: schema.message_name.0.clone(), + row_schema_location: schema.row_schema_location.0.clone(), + use_schema_registry: schema.use_schema_registry, + format: FormatType::Plain.into(), + row_encode: EncodeType::Protobuf.into(), + format_encode_options: format_encode_options.clone(), + ..Default::default() + }; + let parser_config = SpecificParserConfig::new(&info, with_properties)?; + try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME); + try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD); + consume_aws_config_from_options(format_encode_options); + + let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?; + + let column_descs = conf.map_to_columns()?; + + Ok(column_descs + .into_iter() + .map(|col| ColumnCatalog { + column_desc: col.into(), + is_hidden: false, + }) + .collect_vec()) +} diff --git a/src/frontend/src/handler/create_source/validate.rs b/src/frontend/src/handler/create_source/validate.rs new file mode 100644 index 0000000000000..f4bd50c342466 --- /dev/null +++ b/src/frontend/src/handler/create_source/validate.rs @@ -0,0 +1,246 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +pub static ALLOWED_CONNECTION_CONNECTOR: LazyLock> = + LazyLock::new(|| { + hashset! { + PbConnectionType::Unspecified, + PbConnectionType::Kafka, + PbConnectionType::Iceberg, + } + }); + +pub static ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock> = + LazyLock::new(|| { + hashset! { + PbConnectionType::Unspecified, + PbConnectionType::SchemaRegistry, + } + }); + +// TODO: Better design if we want to support ENCODE KEY where we will have 4 dimensional array +static CONNECTORS_COMPATIBLE_FORMATS: LazyLock>>> = + LazyLock::new(|| { + convert_args!(hashmap!( + KAFKA_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv], + Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf], + Format::Debezium => vec![Encode::Json, Encode::Avro], + Format::Maxwell => vec![Encode::Json], + Format::Canal => vec![Encode::Json], + Format::DebeziumMongo => vec![Encode::Json], + ), + PULSAR_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes], + Format::Upsert => vec![Encode::Json, Encode::Avro], + Format::Debezium => vec![Encode::Json], + Format::Maxwell => vec![Encode::Json], + Format::Canal => vec![Encode::Json], + ), + KINESIS_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes], + Format::Upsert => vec![Encode::Json, Encode::Avro], + Format::Debezium => vec![Encode::Json], + Format::Maxwell => vec![Encode::Json], + Format::Canal => vec![Encode::Json], + ), + GOOGLE_PUBSUB_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes], + Format::Debezium => vec![Encode::Json], + Format::Maxwell => vec![Encode::Json], + Format::Canal => vec![Encode::Json], + ), + NEXMARK_CONNECTOR => hashmap!( + Format::Native => vec![Encode::Native], + Format::Plain => vec![Encode::Bytes], + ), + DATAGEN_CONNECTOR => hashmap!( + Format::Native => vec![Encode::Native], + Format::Plain => vec![Encode::Bytes, Encode::Json], + ), + S3_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Csv, Encode::Json], + ), + OPENDAL_S3_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet], + ), + GCS_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet], + ), + AZBLOB_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet], + ), + POSIX_FS_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Csv], + ), + MYSQL_CDC_CONNECTOR => hashmap!( + Format::Debezium => vec![Encode::Json], + // support source stream job + Format::Plain => vec![Encode::Json], + ), + POSTGRES_CDC_CONNECTOR => hashmap!( + Format::Debezium => vec![Encode::Json], + // support source stream job + Format::Plain => vec![Encode::Json], + ), + CITUS_CDC_CONNECTOR => hashmap!( + Format::Debezium => vec![Encode::Json], + ), + MONGODB_CDC_CONNECTOR => hashmap!( + Format::DebeziumMongo => vec![Encode::Json], + ), + NATS_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes], + ), + MQTT_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Json, Encode::Bytes], + ), + TEST_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Json], + ), + ICEBERG_CONNECTOR => hashmap!( + Format::None => vec![Encode::None], + ), + SQL_SERVER_CDC_CONNECTOR => hashmap!( + Format::Debezium => vec![Encode::Json], + // support source stream job + Format::Plain => vec![Encode::Json], + ), + )) + }); + +fn validate_license(connector: &str) -> Result<()> { + if connector == SQL_SERVER_CDC_CONNECTOR { + Feature::SqlServerCdcSource + .check_available() + .map_err(|e| anyhow::anyhow!(e))?; + } + Ok(()) +} + +pub fn validate_compatibility( + format_encode: &FormatEncodeOptions, + props: &mut BTreeMap, +) -> Result<()> { + let mut connector = props + .get_connector() + .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_owned())))?; + + if connector == OPENDAL_S3_CONNECTOR { + // reject s3_v2 creation + return Err(RwError::from(Deprecated( + OPENDAL_S3_CONNECTOR.to_owned(), + S3_CONNECTOR.to_owned(), + ))); + } + if connector == S3_CONNECTOR { + // S3 connector is deprecated, use OPENDAL_S3_CONNECTOR instead + // do s3 -> s3_v2 migration + let entry = props.get_mut(UPSTREAM_SOURCE_KEY).unwrap(); + *entry = OPENDAL_S3_CONNECTOR.to_owned(); + connector = OPENDAL_S3_CONNECTOR.to_owned(); + } + + let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS + .get(&connector) + .ok_or_else(|| { + RwError::from(ProtocolError(format!( + "connector {:?} is not supported, accept {:?}", + connector, + CONNECTORS_COMPATIBLE_FORMATS.keys() + ))) + })?; + + validate_license(&connector)?; + if connector != KAFKA_CONNECTOR { + let res = match (&format_encode.format, &format_encode.row_encode) { + (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => { + let mut options = WithOptions::try_from(format_encode.row_options())?; + let (_, use_schema_registry) = get_schema_location(options.inner_mut())?; + use_schema_registry + } + (Format::Debezium, Encode::Avro) => true, + (_, _) => false, + }; + if res { + return Err(RwError::from(ProtocolError(format!( + "The {} must be kafka when schema registry is used", + UPSTREAM_SOURCE_KEY + )))); + } + } + + let compatible_encodes = compatible_formats + .get(&format_encode.format) + .ok_or_else(|| { + RwError::from(ProtocolError(format!( + "connector {} does not support format {:?}", + connector, format_encode.format + ))) + })?; + if !compatible_encodes.contains(&format_encode.row_encode) { + return Err(RwError::from(ProtocolError(format!( + "connector {} does not support format {:?} with encode {:?}", + connector, format_encode.format, format_encode.row_encode + )))); + } + + if connector == POSTGRES_CDC_CONNECTOR || connector == CITUS_CDC_CONNECTOR { + match props.get("slot.name") { + None => { + // Build a random slot name with UUID + // e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815" + let uuid = uuid::Uuid::new_v4(); + props.insert("slot.name".into(), format!("rw_cdc_{}", uuid.simple())); + } + Some(slot_name) => { + // please refer to + // - https://github.com/debezium/debezium/blob/97956ce25b7612e3413d363658661896b7d2e0a2/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java#L1179 + // - https://doxygen.postgresql.org/slot_8c.html#afac399f07320b9adfd2c599cf822aaa3 + if !slot_name + .chars() + .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') + || slot_name.len() > 63 + { + return Err(RwError::from(ProtocolError(format!( + "Invalid replication slot name: {:?}. Valid replication slot name must contain only digits, lowercase characters and underscores with length <= 63", + slot_name + )))); + } + } + } + + if !props.contains_key("schema.name") { + // Default schema name is "public" + props.insert("schema.name".into(), "public".into()); + } + if !props.contains_key("publication.name") { + // Default publication name is "rw_publication" + props.insert("publication.name".into(), "rw_publication".into()); + } + if !props.contains_key("publication.create.enable") { + // Default auto create publication if doesn't exist + props.insert("publication.create.enable".into(), "true".into()); + } + } + + if connector == SQL_SERVER_CDC_CONNECTOR && !props.contains_key("schema.name") { + // Default schema name is "dbo" + props.insert("schema.name".into(), "dbo".into()); + } + + Ok(()) +} diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index bce2825f39645..7ab2912bd1118 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -17,6 +17,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::Field; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; +use risingwave_connector::parser::debezium_cdc_source_schema; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::PbStreamNode; @@ -25,7 +26,6 @@ use super::utils::{childless_record, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode}; use crate::catalog::ColumnId; use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef}; -use crate::handler::create_source::debezium_cdc_source_schema; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::property::{Distribution, DistributionDisplay};