diff --git a/Cargo.lock b/Cargo.lock index eed7aa43de3f2..de13ac0828a6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1339,6 +1339,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-sdk-glue" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b6c34f6f4b9e8f76274a9b309838d670b3bb69b4be6756394de54718aa2ca0a" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.9", + "regex", + "tracing", +] + [[package]] name = "aws-sdk-kinesis" version = "1.3.0" @@ -10650,6 +10672,7 @@ dependencies = [ "aws-credential-types", "aws-msk-iam-sasl-signer", "aws-sdk-dynamodb", + "aws-sdk-glue", "aws-sdk-kinesis", "aws-sdk-s3", "aws-smithy-http", diff --git a/Cargo.toml b/Cargo.toml index 6e45991afe10a..3112923546c99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,7 @@ aws-config = { version = "1", default-features = false, features = [ aws-credential-types = { version = "1", default-features = false, features = [ "hardcoded-credentials", ] } +aws-sdk-glue = "1" aws-sdk-kinesis = { version = "1", default-features = false, features = [ "rt-tokio", "rustls", diff --git a/e2e_test/source_inline/kafka/avro/glue.slt b/e2e_test/source_inline/kafka/avro/glue.slt new file mode 100644 index 0000000000000..13a1ae27ff3a7 --- /dev/null +++ b/e2e_test/source_inline/kafka/avro/glue.slt @@ -0,0 +1,121 @@ +control substitution on + +system ok +rpk topic delete 'glue-sample-my-event' + +system ok +rpk topic create 'glue-sample-my-event' + +system ok +rpk topic produce -f '%v{hex}\n' 'glue-sample-my-event' < ConnectorResult>; + /// Gets the latest schema by arn, which is used as *reader schema*. + async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult>; +} + +#[derive(Debug)] +pub enum GlueSchemaCacheImpl { + Real(RealGlueSchemaCache), + Mock(MockGlueSchemaCache), +} + +impl GlueSchemaCacheImpl { + pub async fn new( + aws_auth_props: &AwsAuthProps, + mock_config: Option<&str>, + ) -> ConnectorResult { + if let Some(mock_config) = mock_config { + return Ok(Self::Mock(MockGlueSchemaCache::new(mock_config))); + } + Ok(Self::Real(RealGlueSchemaCache::new(aws_auth_props).await?)) + } +} + +impl GlueSchemaCache for GlueSchemaCacheImpl { + async fn get_by_id(&self, schema_version_id: uuid::Uuid) -> ConnectorResult> { + match self { + Self::Real(inner) => inner.get_by_id(schema_version_id).await, + Self::Mock(inner) => inner.get_by_id(schema_version_id).await, + } + } + + async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult> { + match self { + Self::Real(inner) => inner.get_by_name(schema_arn).await, + Self::Mock(inner) => inner.get_by_name(schema_arn).await, + } + } +} + +#[derive(Debug)] +pub struct RealGlueSchemaCache { + writer_schemas: Cache>, + glue_client: Client, +} + +impl RealGlueSchemaCache { + /// Create a new `GlueSchemaCache` + pub async fn new(aws_auth_props: &AwsAuthProps) -> ConnectorResult { + let client = Client::new(&aws_auth_props.build_config().await?); + Ok(Self { + writer_schemas: Cache::new(u64::MAX), + glue_client: client, + }) + } + + async fn parse_and_cache_schema( + &self, + schema_version_id: uuid::Uuid, + content: &str, + ) -> ConnectorResult> { + let schema = Schema::parse_str(content).context("failed to parse avro schema")?; + let schema = Arc::new(schema); + self.writer_schemas + .insert(schema_version_id, Arc::clone(&schema)) + .await; + Ok(schema) + } +} + +impl GlueSchemaCache for RealGlueSchemaCache { + /// Gets the a specific schema by id, which is used as *writer schema*. + async fn get_by_id(&self, schema_version_id: uuid::Uuid) -> ConnectorResult> { + if let Some(schema) = self.writer_schemas.get(&schema_version_id).await { + return Ok(schema); + } + let res = self + .glue_client + .get_schema_version() + .schema_version_id(schema_version_id) + .send() + .await + .context("glue sdk error")?; + let definition = res + .schema_definition() + .context("glue sdk response without definition")?; + self.parse_and_cache_schema(schema_version_id, definition) + .await + } + + /// Gets the latest schema by arn, which is used as *reader schema*. + async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult> { + let res = self + .glue_client + .get_schema_version() + .schema_id(SchemaId::builder().schema_arn(schema_arn).build()) + .schema_version_number(SchemaVersionNumber::builder().latest_version(true).build()) + .send() + .await + .context("glue sdk error")?; + let schema_version_id = res + .schema_version_id() + .context("glue sdk response without schema version id")? + .parse() + .context("glue sdk response invalid schema version id")?; + let definition = res + .schema_definition() + .context("glue sdk response without definition")?; + self.parse_and_cache_schema(schema_version_id, definition) + .await + } +} + +#[derive(Debug)] +pub struct MockGlueSchemaCache { + by_id: HashMap>, + arn_to_latest_id: HashMap, +} + +impl MockGlueSchemaCache { + pub fn new(mock_config: &str) -> Self { + // The `mock_config` accepted is a JSON that looks like: + // { + // "by_id": { + // "4dc80ccf-2d0c-4846-9325-7e1c9e928121": { + // "type": "record", + // "name": "MyEvent", + // "fields": [...] + // }, + // "3df022f4-b16d-4afe-bdf7-cf4baf8d01d3": { + // ... + // } + // }, + // "arn_to_latest_id": { + // "arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent": "3df022f4-b16d-4afe-bdf7-cf4baf8d01d3" + // } + // } + // + // The format is not public and we can make breaking changes to it. + // Current format only supports avsc. + let parsed: serde_json::Value = + serde_json::from_str(mock_config).expect("mock config shall be valid json"); + let by_id = parsed + .get("by_id") + .unwrap() + .as_object() + .unwrap() + .iter() + .map(|(schema_version_id, schema)| { + let schema_version_id = schema_version_id.parse().unwrap(); + let schema = Schema::parse(schema).unwrap(); + (schema_version_id, Arc::new(schema)) + }) + .collect(); + let arn_to_latest_id = parsed + .get("arn_to_latest_id") + .unwrap() + .as_object() + .unwrap() + .iter() + .map(|(arn, latest_id)| (arn.clone(), latest_id.as_str().unwrap().parse().unwrap())) + .collect(); + Self { + by_id, + arn_to_latest_id, + } + } +} + +impl GlueSchemaCache for MockGlueSchemaCache { + async fn get_by_id(&self, schema_version_id: uuid::Uuid) -> ConnectorResult> { + Ok(self + .by_id + .get(&schema_version_id) + .context("schema version id not found in mock registry")? + .clone()) + } + + async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult> { + let schema_version_id = self + .arn_to_latest_id + .get(schema_arn) + .context("schema arn not found in mock registry")?; + self.get_by_id(*schema_version_id).await + } +} diff --git a/src/connector/src/parser/avro/mod.rs b/src/connector/src/parser/avro/mod.rs index 1c22b326770c4..c71f81960e183 100644 --- a/src/connector/src/parser/avro/mod.rs +++ b/src/connector/src/parser/avro/mod.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod confluent_resolver; +mod glue_resolver; mod parser; -pub mod schema_resolver; pub mod util; +pub use confluent_resolver as schema_resolver; +pub use glue_resolver::{GlueSchemaCache, GlueSchemaCacheImpl}; pub use parser::*; diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 4c35cea526482..34f05d7a65c31 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -23,11 +23,14 @@ use risingwave_pb::plan_common::ColumnDesc; use super::schema_resolver::ConfluentSchemaCache; use super::util::{avro_schema_to_column_descs, ResolvedAvroSchema}; +use super::{GlueSchemaCache as _, GlueSchemaCacheImpl}; use crate::error::ConnectorResult; use crate::parser::unified::avro::{AvroAccess, AvroParseOptions}; use crate::parser::unified::AccessImpl; use crate::parser::util::bytes_from_url; -use crate::parser::{AccessBuilder, AvroProperties, EncodingProperties, EncodingType, MapHandling}; +use crate::parser::{ + AccessBuilder, AvroProperties, EncodingProperties, EncodingType, MapHandling, SchemaLocation, +}; use crate::schema::schema_registry::{ extract_schema_id, get_subject_by_strategy, handle_sr_list, Client, }; @@ -37,7 +40,7 @@ use crate::schema::schema_registry::{ pub struct AvroAccessBuilder { schema: Arc, /// Refer to [`AvroParserConfig::writer_schema_cache`]. - pub writer_schema_cache: Option>, + writer_schema_cache: WriterSchemaCache, skip_fixed_header: Option, value: Option, } @@ -77,119 +80,180 @@ impl AvroAccessBuilder { async fn parse_avro_value(&self, payload: &[u8]) -> ConnectorResult { // parse payload to avro value // if use confluent schema, get writer schema from confluent schema registry - if let Some(resolver) = &self.writer_schema_cache { - let (schema_id, mut raw_payload) = extract_schema_id(payload)?; - let writer_schema = resolver.get_by_id(schema_id).await?; - return Ok(from_avro_datum( - writer_schema.as_ref(), - &mut raw_payload, - Some(&self.schema.original_schema), - )?); - } - if let Some(header_len) = self.skip_fixed_header { - let len = header_len.into(); - if payload.len() < len { - bail!("{} shorter than header to skip {}", payload.len(), len); + match &self.writer_schema_cache { + WriterSchemaCache::Confluent(resolver) => { + let (schema_id, mut raw_payload) = extract_schema_id(payload)?; + let writer_schema = resolver.get_by_id(schema_id).await?; + Ok(from_avro_datum( + writer_schema.as_ref(), + &mut raw_payload, + Some(&self.schema.original_schema), + )?) } + WriterSchemaCache::File => { + if let Some(header_len) = self.skip_fixed_header { + let len = header_len.into(); + if payload.len() < len { + bail!("{} shorter than header to skip {}", payload.len(), len); + } + return Ok(from_avro_datum( + &self.schema.original_schema, + &mut &payload[len..], + None, + )?); + } - return Ok(from_avro_datum( - &self.schema.original_schema, - &mut &payload[len..], - None, - )?); - } - let mut reader = Reader::with_schema(&self.schema.original_schema, payload)?; - match reader.next() { - Some(Ok(v)) => Ok(v), - Some(Err(e)) => Err(e)?, - None => bail!("avro parse unexpected eof"), + let mut reader = Reader::with_schema(&self.schema.original_schema, payload)?; + match reader.next() { + Some(Ok(v)) => Ok(v), + Some(Err(e)) => Err(e)?, + None => bail!("avro parse unexpected eof"), + } + } + WriterSchemaCache::Glue(resolver) => { + // + // byte 0: header version = 3 + // byte 1: compression: 0 = no compression; 5 = zlib (unsupported) + // byte 2..=17: 16-byte UUID as schema version id + // byte 18..: raw avro payload + if payload.len() < 18 { + bail!("payload shorter than 18-byte glue header"); + } + if payload[0] != 3 { + bail!( + "Only support glue header version 3 but found {}", + payload[0] + ); + } + if payload[1] != 0 { + bail!("Non-zero compression {} not supported", payload[1]); + } + let schema_version_id = uuid::Uuid::from_slice(&payload[2..18]).unwrap(); + let writer_schema = resolver.get_by_id(schema_version_id).await?; + let mut raw_payload = &payload[18..]; + Ok(from_avro_datum( + writer_schema.as_ref(), + &mut raw_payload, + Some(&self.schema.original_schema), + )?) + } } } } #[derive(Debug, Clone)] pub struct AvroParserConfig { - pub schema: Arc, - pub key_schema: Option>, + schema: Arc, + key_schema: Option>, /// Writer schema is the schema used to write the data. When parsing Avro data, the exactly same schema /// must be used to decode the message, and then convert it with the reader schema. - pub writer_schema_cache: Option>, + writer_schema_cache: WriterSchemaCache, + + map_handling: Option, + skip_fixed_header: Option, +} - pub map_handling: Option, - pub skip_fixed_header: Option, +#[derive(Debug, Clone)] +enum WriterSchemaCache { + Confluent(Arc), + Glue(Arc), + File, } impl AvroParserConfig { pub async fn new(encoding_properties: EncodingProperties) -> ConnectorResult { let AvroProperties { - use_schema_registry, - row_schema_location: schema_location, - client_config, - aws_auth_props, - topic, + schema_location, enable_upsert, record_name, key_record_name, - name_strategy, map_handling, skip_fixed_header, } = try_match_expand!(encoding_properties, EncodingProperties::Avro)?; - let url = handle_sr_list(schema_location.as_str())?; - if use_schema_registry { - let client = Client::new(url, &client_config)?; - let resolver = ConfluentSchemaCache::new(client); - - let subject_key = if enable_upsert { - Some(get_subject_by_strategy( + match schema_location { + SchemaLocation::Confluent { + urls: schema_location, + client_config, + name_strategy, + topic, + } => { + let url = handle_sr_list(schema_location.as_str())?; + let client = Client::new(url, &client_config)?; + let resolver = ConfluentSchemaCache::new(client); + + let subject_key = if enable_upsert { + Some(get_subject_by_strategy( + &name_strategy, + topic.as_str(), + key_record_name.as_deref(), + true, + )?) + } else { + if let Some(name) = &key_record_name { + bail!("unused FORMAT ENCODE option: key.message='{name}'"); + } + None + }; + let subject_value = get_subject_by_strategy( &name_strategy, topic.as_str(), - key_record_name.as_deref(), - true, - )?) - } else { - if let Some(name) = &key_record_name { - bail!("unused FORMAT ENCODE option: key.message='{name}'"); + record_name.as_deref(), + false, + )?; + tracing::debug!("infer key subject {subject_key:?}, value subject {subject_value}"); + + Ok(Self { + schema: Arc::new(ResolvedAvroSchema::create( + resolver.get_by_subject(&subject_value).await?, + )?), + key_schema: if let Some(subject_key) = subject_key { + Some(Arc::new(ResolvedAvroSchema::create( + resolver.get_by_subject(&subject_key).await?, + )?)) + } else { + None + }, + writer_schema_cache: WriterSchemaCache::Confluent(Arc::new(resolver)), + map_handling, + skip_fixed_header: None, + }) + } + SchemaLocation::File { + url: schema_location, + aws_auth_props, + } => { + let url = handle_sr_list(schema_location.as_str())?; + if enable_upsert { + bail!("avro upsert without schema registry is not supported"); } - None - }; - let subject_value = get_subject_by_strategy( - &name_strategy, - topic.as_str(), - record_name.as_deref(), - false, - )?; - tracing::debug!("infer key subject {subject_key:?}, value subject {subject_value}"); - - Ok(Self { - schema: Arc::new(ResolvedAvroSchema::create( - resolver.get_by_subject(&subject_value).await?, - )?), - key_schema: if let Some(subject_key) = subject_key { - Some(Arc::new(ResolvedAvroSchema::create( - resolver.get_by_subject(&subject_key).await?, - )?)) - } else { - None - }, - writer_schema_cache: Some(Arc::new(resolver)), - map_handling, - skip_fixed_header: None, - }) - } else { - if enable_upsert { - bail!("avro upsert without schema registry is not supported"); + let url = url.first().unwrap(); + let schema_content = bytes_from_url(url, aws_auth_props.as_ref()).await?; + let schema = Schema::parse_reader(&mut schema_content.as_slice()) + .context("failed to parse avro schema")?; + Ok(Self { + schema: Arc::new(ResolvedAvroSchema::create(Arc::new(schema))?), + key_schema: None, + writer_schema_cache: WriterSchemaCache::File, + map_handling, + skip_fixed_header, + }) + } + SchemaLocation::Glue { + schema_arn, + aws_auth_props, + mock_config, + } => { + let resolver = + GlueSchemaCacheImpl::new(&aws_auth_props, mock_config.as_deref()).await?; + let schema = resolver.get_by_name(&schema_arn).await?; + Ok(Self { + schema: Arc::new(ResolvedAvroSchema::create(schema)?), + key_schema: None, + writer_schema_cache: WriterSchemaCache::Glue(Arc::new(resolver)), + map_handling, + skip_fixed_header: None, + }) } - let url = url.first().unwrap(); - let schema_content = bytes_from_url(url, aws_auth_props.as_ref()).await?; - let schema = Schema::parse_reader(&mut schema_content.as_slice()) - .context("failed to parse avro schema")?; - Ok(Self { - schema: Arc::new(ResolvedAvroSchema::create(Arc::new(schema))?), - key_schema: None, - writer_schema_cache: None, - map_handling, - skip_fixed_header, - }) } } diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 430d5072a88db..7aa82a2d4ef5f 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use apache_avro::types::Value; use apache_avro::{from_avro_datum, Schema}; use risingwave_common::try_match_expand; -use risingwave_pb::catalog::PbSchemaRegistryNameStrategy; use risingwave_pb::plan_common::ColumnDesc; use crate::error::ConnectorResult; @@ -28,7 +27,7 @@ use crate::parser::unified::avro::{ avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions, }; use crate::parser::unified::AccessImpl; -use crate::parser::{AccessBuilder, EncodingProperties, EncodingType}; +use crate::parser::{AccessBuilder, EncodingProperties, EncodingType, SchemaLocation}; use crate::schema::schema_registry::{ extract_schema_id, get_subject_by_strategy, handle_sr_list, Client, }; @@ -95,14 +94,19 @@ pub struct DebeziumAvroParserConfig { impl DebeziumAvroParserConfig { pub async fn new(encoding_config: EncodingProperties) -> ConnectorResult { let avro_config = try_match_expand!(encoding_config, EncodingProperties::Avro)?; - let schema_location = &avro_config.row_schema_location; - let client_config = &avro_config.client_config; - let kafka_topic = &avro_config.topic; + let SchemaLocation::Confluent { + urls: schema_location, + client_config, + name_strategy, + topic: kafka_topic, + } = &avro_config.schema_location + else { + unreachable!() + }; let url = handle_sr_list(schema_location)?; let client = Client::new(url, client_config)?; let resolver = ConfluentSchemaCache::new(client); - let name_strategy = &PbSchemaRegistryNameStrategy::Unspecified; let key_subject = get_subject_by_strategy(name_strategy, kafka_topic, None, true)?; let val_subject = get_subject_by_strategy(name_strategy, kafka_topic, None, false)?; let key_schema = resolver.get_by_subject(&key_subject).await?; diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 5fb3b46d02c40..4bfba035ec471 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -61,6 +61,7 @@ use crate::parser::util::{ extreact_timestamp_from_meta, }; use crate::schema::schema_registry::SchemaRegistryAuth; +use crate::schema::AWS_GLUE_SCHEMA_ARN_KEY; use crate::source::monitor::GLOBAL_SOURCE_METRICS; use crate::source::{ extract_source_struct, BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType, @@ -1068,19 +1069,49 @@ impl SpecificParserConfig { #[derive(Debug, Default, Clone)] pub struct AvroProperties { - pub use_schema_registry: bool, - pub row_schema_location: String, - pub client_config: SchemaRegistryAuth, - pub aws_auth_props: Option, - pub topic: String, + pub schema_location: SchemaLocation, pub enable_upsert: bool, pub record_name: Option, pub key_record_name: Option, - pub name_strategy: PbSchemaRegistryNameStrategy, pub map_handling: Option, pub skip_fixed_header: Option, } +/// WIP: may cover protobuf and json schema later. +#[derive(Debug, Clone)] +pub enum SchemaLocation { + /// Avsc from `https://`, `s3://` or `file://`. + File { + url: String, + aws_auth_props: Option, // for s3 + }, + /// + Confluent { + urls: String, + client_config: SchemaRegistryAuth, + name_strategy: PbSchemaRegistryNameStrategy, + topic: String, + }, + /// + Glue { + schema_arn: String, + aws_auth_props: AwsAuthProps, + // When `Some(_)`, ignore AWS and load schemas from provided config + mock_config: Option, + }, +} + +// TODO: `SpecificParserConfig` shall not `impl`/`derive` a `Default` +impl Default for SchemaLocation { + fn default() -> Self { + // backward compatible but undesired + Self::File { + url: Default::default(), + aws_auth_props: None, + } + } +} + #[derive(Debug, Default, Clone)] pub struct ProtobufProperties { pub message_name: String, @@ -1181,10 +1212,6 @@ impl SpecificParserConfig { Some(info.proto_message_name.clone()) }, key_record_name: info.key_message_name.clone(), - name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy) - .unwrap(), - use_schema_registry: info.use_schema_registry, - row_schema_location: info.row_schema_location.clone(), map_handling: MapHandling::from_options(&info.format_encode_options)?, skip_fixed_header: info .format_encode_options @@ -1197,17 +1224,40 @@ impl SpecificParserConfig { if format == SourceFormat::Upsert { config.enable_upsert = true; } - if info.use_schema_registry { - config.topic.clone_from(get_kafka_topic(with_properties)?); - config.client_config = SchemaRegistryAuth::from(&info.format_encode_options); - } else { - config.aws_auth_props = Some( - serde_json::from_value::( + config.schema_location = if let Some(schema_arn) = + info.format_encode_options.get(AWS_GLUE_SCHEMA_ARN_KEY) + { + SchemaLocation::Glue { + schema_arn: schema_arn.clone(), + aws_auth_props: serde_json::from_value::( serde_json::to_value(info.format_encode_options.clone()).unwrap(), ) .map_err(|e| anyhow::anyhow!(e))?, - ); - } + // The option `mock_config` is not public and we can break compatibility. + mock_config: info + .format_encode_options + .get("aws.glue.mock_config") + .cloned(), + } + } else if info.use_schema_registry { + SchemaLocation::Confluent { + urls: info.row_schema_location.clone(), + client_config: SchemaRegistryAuth::from(&info.format_encode_options), + name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy) + .unwrap(), + topic: get_kafka_topic(with_properties)?.clone(), + } + } else { + SchemaLocation::File { + url: info.row_schema_location.clone(), + aws_auth_props: Some( + serde_json::from_value::( + serde_json::to_value(info.format_encode_options.clone()).unwrap(), + ) + .map_err(|e| anyhow::anyhow!(e))?, + ), + } + }; EncodingProperties::Avro(config) } (SourceFormat::Plain, SourceEncode::Protobuf) @@ -1247,12 +1297,14 @@ impl SpecificParserConfig { } else { Some(info.proto_message_name.clone()) }, - name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy) - .unwrap(), key_record_name: info.key_message_name.clone(), - row_schema_location: info.row_schema_location.clone(), - topic: get_kafka_topic(with_properties).unwrap().clone(), - client_config: SchemaRegistryAuth::from(&info.format_encode_options), + schema_location: SchemaLocation::Confluent { + urls: info.row_schema_location.clone(), + client_config: SchemaRegistryAuth::from(&info.format_encode_options), + name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy) + .unwrap(), + topic: get_kafka_topic(with_properties).unwrap().clone(), + }, ..Default::default() }) } diff --git a/src/connector/src/schema/mod.rs b/src/connector/src/schema/mod.rs index 585dd43fa8bf1..9b3757e29c094 100644 --- a/src/connector/src/schema/mod.rs +++ b/src/connector/src/schema/mod.rs @@ -26,6 +26,7 @@ const KEY_MESSAGE_NAME_KEY: &str = "key.message"; const SCHEMA_LOCATION_KEY: &str = "schema.location"; const SCHEMA_REGISTRY_KEY: &str = "schema.registry"; const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy"; +pub const AWS_GLUE_SCHEMA_ARN_KEY: &str = "aws.glue.schema_arn"; #[derive(Debug, thiserror::Error, thiserror_ext::Macro)] #[error("Invalid option: {message}")] diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 9c69d02fedae7..31d58643f9a84 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -33,11 +33,13 @@ use risingwave_connector::parser::additional_columns::{ }; use risingwave_connector::parser::{ fetch_json_schema_and_map_to_columns, AvroParserConfig, DebeziumAvroParserConfig, - ProtobufParserConfig, SpecificParserConfig, TimestamptzHandling, DEBEZIUM_IGNORE_KEY, + ProtobufParserConfig, SchemaLocation, SpecificParserConfig, TimestamptzHandling, + DEBEZIUM_IGNORE_KEY, }; use risingwave_connector::schema::schema_registry::{ name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME, }; +use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY; use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::cdc::{ CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, @@ -158,7 +160,7 @@ async fn extract_avro_table_schema( } else { if let risingwave_connector::parser::EncodingProperties::Avro(avro_props) = &parser_config.encoding_config - && !avro_props.use_schema_registry + && matches!(avro_props.schema_location, SchemaLocation::File { .. }) && !format_encode_options .get("with_deprecated_file_header") .is_some_and(|v| v == "true") @@ -382,33 +384,43 @@ pub(crate) async fn bind_columns_from_source( ) } (format @ (Format::Plain | Format::Upsert | Format::Debezium), Encode::Avro) => { - let (row_schema_location, use_schema_registry) = - get_schema_location(&mut format_encode_options_to_consume)?; + if format_encode_options_to_consume + .remove(AWS_GLUE_SCHEMA_ARN_KEY) + .is_none() + { + // Legacy logic that assumes either `schema.location` or confluent `schema.registry`. + // The handling of newly added aws glue is centralized in `connector::parser`. + // TODO(xiangjinwu): move these option parsing to `connector::parser` as well. - if matches!(format, Format::Debezium) && !use_schema_registry { - return Err(RwError::from(ProtocolError( - "schema location for DEBEZIUM_AVRO row format is not supported".to_string(), - ))); - } + let (row_schema_location, use_schema_registry) = + get_schema_location(&mut format_encode_options_to_consume)?; - let message_name = try_consume_string_from_options( - &mut format_encode_options_to_consume, - MESSAGE_NAME_KEY, - ); - let name_strategy = get_sr_name_strategy_check( - &mut format_encode_options_to_consume, - use_schema_registry, - )?; + if matches!(format, Format::Debezium) && !use_schema_registry { + return Err(RwError::from(ProtocolError( + "schema location for DEBEZIUM_AVRO row format is not supported".to_string(), + ))); + } - stream_source_info.use_schema_registry = use_schema_registry; - stream_source_info - .row_schema_location - .clone_from(&row_schema_location.0); - stream_source_info.proto_message_name = message_name.unwrap_or(AstString("".into())).0; - stream_source_info.key_message_name = - get_key_message_name(&mut format_encode_options_to_consume); - stream_source_info.name_strategy = - name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32); + let message_name = try_consume_string_from_options( + &mut format_encode_options_to_consume, + MESSAGE_NAME_KEY, + ); + let name_strategy = get_sr_name_strategy_check( + &mut format_encode_options_to_consume, + use_schema_registry, + )?; + + stream_source_info.use_schema_registry = use_schema_registry; + stream_source_info + .row_schema_location + .clone_from(&row_schema_location.0); + stream_source_info.proto_message_name = + message_name.unwrap_or(AstString("".into())).0; + stream_source_info.key_message_name = + get_key_message_name(&mut format_encode_options_to_consume); + stream_source_info.name_strategy = + name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32); + } Some( extract_avro_table_schema(