diff --git a/e2e_test/source_inline/kafka/avro/glue.slt b/e2e_test/source_inline/kafka/avro/glue.slt new file mode 100644 index 0000000000000..13a1ae27ff3a7 --- /dev/null +++ b/e2e_test/source_inline/kafka/avro/glue.slt @@ -0,0 +1,121 @@ +control substitution on + +system ok +rpk topic delete 'glue-sample-my-event' + +system ok +rpk topic create 'glue-sample-my-event' + +system ok +rpk topic produce -f '%v{hex}\n' 'glue-sample-my-event' < ConnectorResult>; + /// Gets the latest schema by arn, which is used as *reader schema*. + async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult>; +} + +#[derive(Debug)] +pub enum GlueSchemaCacheImpl { + Real(RealGlueSchemaCache), + Mock(MockGlueSchemaCache), +} + +impl GlueSchemaCacheImpl { + pub async fn new( + aws_auth_props: &AwsAuthProps, + mock_config: Option<&str>, + ) -> ConnectorResult { + if let Some(mock_config) = mock_config { + return Ok(Self::Mock(MockGlueSchemaCache::new(mock_config))); + } + Ok(Self::Real(RealGlueSchemaCache::new(aws_auth_props).await?)) + } +} + +impl GlueSchemaCache for GlueSchemaCacheImpl { + async fn get_by_id(&self, schema_version_id: uuid::Uuid) -> ConnectorResult> { + match self { + Self::Real(inner) => inner.get_by_id(schema_version_id).await, + Self::Mock(inner) => inner.get_by_id(schema_version_id).await, + } + } + + async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult> { + match self { + Self::Real(inner) => inner.get_by_name(schema_arn).await, + Self::Mock(inner) => inner.get_by_name(schema_arn).await, + } + } +} + #[derive(Debug)] -pub struct GlueSchemaCache { +pub struct RealGlueSchemaCache { writer_schemas: Cache>, glue_client: Client, } -impl GlueSchemaCache { +impl RealGlueSchemaCache { /// Create a new `GlueSchemaCache` - pub fn new(client: Client) -> Self { - Self { + pub async fn new(aws_auth_props: &AwsAuthProps) -> ConnectorResult { + let client = Client::new(&aws_auth_props.build_config().await?); + Ok(Self { writer_schemas: Cache::new(u64::MAX), glue_client: client, - } + }) + } + + async fn parse_and_cache_schema( + &self, + schema_version_id: uuid::Uuid, + content: &str, + ) -> ConnectorResult> { + let schema = Schema::parse_str(content).context("failed to parse avro schema")?; + let schema = Arc::new(schema); + self.writer_schemas + .insert(schema_version_id, Arc::clone(&schema)) + .await; + Ok(schema) } +} +impl GlueSchemaCache for RealGlueSchemaCache { /// Gets the a specific schema by id, which is used as *writer schema*. - pub async fn get_by_id(&self, schema_version_id: uuid::Uuid) -> ConnectorResult> { + async fn get_by_id(&self, schema_version_id: uuid::Uuid) -> ConnectorResult> { if let Some(schema) = self.writer_schemas.get(&schema_version_id).await { return Ok(schema); } @@ -64,7 +123,7 @@ impl GlueSchemaCache { } /// Gets the latest schema by arn, which is used as *reader schema*. - pub async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult> { + async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult> { let res = self .glue_client .get_schema_version() @@ -84,17 +143,78 @@ impl GlueSchemaCache { self.parse_and_cache_schema(schema_version_id, definition) .await } +} - async fn parse_and_cache_schema( - &self, - schema_version_id: uuid::Uuid, - content: &str, - ) -> ConnectorResult> { - let schema = Schema::parse_str(content).context("failed to parse avro schema")?; - let schema = Arc::new(schema); - self.writer_schemas - .insert(schema_version_id, Arc::clone(&schema)) - .await; - Ok(schema) +#[derive(Debug)] +pub struct MockGlueSchemaCache { + by_id: HashMap>, + arn_to_latest_id: HashMap, +} + +impl MockGlueSchemaCache { + pub fn new(mock_config: &str) -> Self { + // The `mock_config` accepted is a JSON that looks like: + // { + // "by_id": { + // "4dc80ccf-2d0c-4846-9325-7e1c9e928121": { + // "type": "record", + // "name": "MyEvent", + // "fields": [...] + // }, + // "3df022f4-b16d-4afe-bdf7-cf4baf8d01d3": { + // ... + // } + // }, + // "arn_to_latest_id": { + // "arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent": "3df022f4-b16d-4afe-bdf7-cf4baf8d01d3" + // } + // } + // + // The format is not public and we can make breaking changes to it. + // Current format only supports avsc. + let parsed: serde_json::Value = + serde_json::from_str(mock_config).expect("mock config shall be valid json"); + let by_id = parsed + .get("by_id") + .unwrap() + .as_object() + .unwrap() + .iter() + .map(|(schema_version_id, schema)| { + let schema_version_id = schema_version_id.parse().unwrap(); + let schema = Schema::parse(schema).unwrap(); + (schema_version_id, Arc::new(schema)) + }) + .collect(); + let arn_to_latest_id = parsed + .get("arn_to_latest_id") + .unwrap() + .as_object() + .unwrap() + .iter() + .map(|(arn, latest_id)| (arn.clone(), latest_id.as_str().unwrap().parse().unwrap())) + .collect(); + Self { + by_id, + arn_to_latest_id, + } + } +} + +impl GlueSchemaCache for MockGlueSchemaCache { + async fn get_by_id(&self, schema_version_id: uuid::Uuid) -> ConnectorResult> { + Ok(self + .by_id + .get(&schema_version_id) + .context("schema version id not found in mock registry")? + .clone()) + } + + async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult> { + let schema_version_id = self + .arn_to_latest_id + .get(schema_arn) + .context("schema arn not found in mock registry")?; + self.get_by_id(*schema_version_id).await } } diff --git a/src/connector/src/parser/avro/mod.rs b/src/connector/src/parser/avro/mod.rs index 24b6be3b8aed8..536c700efef8e 100644 --- a/src/connector/src/parser/avro/mod.rs +++ b/src/connector/src/parser/avro/mod.rs @@ -17,5 +17,5 @@ mod glue_resolver; mod parser; pub use confluent_resolver::ConfluentSchemaCache; -pub use glue_resolver::GlueSchemaCache; +pub use glue_resolver::{GlueSchemaCache, GlueSchemaCacheImpl}; pub use parser::{AvroAccessBuilder, AvroParserConfig}; diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index d6b18bf398a75..68ac7d446846e 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -24,7 +24,7 @@ use risingwave_connector_codec::decoder::avro::{ }; use risingwave_pb::plan_common::ColumnDesc; -use super::{ConfluentSchemaCache, GlueSchemaCache}; +use super::{ConfluentSchemaCache, GlueSchemaCache as _, GlueSchemaCacheImpl}; use crate::error::ConnectorResult; use crate::parser::unified::AccessImpl; use crate::parser::util::bytes_from_url; @@ -159,7 +159,7 @@ pub struct AvroParserConfig { #[derive(Debug, Clone)] enum WriterSchemaCache { Confluent(Arc), - Glue(Arc), + Glue(Arc), File, } @@ -241,9 +241,10 @@ impl AvroParserConfig { SchemaLocation::Glue { schema_arn, aws_auth_props, + mock_config, } => { - let client = aws_sdk_glue::Client::new(&aws_auth_props.build_config().await?); - let resolver = GlueSchemaCache::new(client); + let resolver = + GlueSchemaCacheImpl::new(&aws_auth_props, mock_config.as_deref()).await?; let schema = resolver.get_by_name(&schema_arn).await?; Ok(Self { schema: Arc::new(ResolvedAvroSchema::create(schema)?), diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index b52e5ed713c52..aeb8ce93c601d 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -1094,6 +1094,8 @@ pub enum SchemaLocation { Glue { schema_arn: String, aws_auth_props: AwsAuthProps, + // When `Some(_)`, ignore AWS and load schemas from provided config + mock_config: Option, }, } @@ -1223,6 +1225,11 @@ impl SpecificParserConfig { serde_json::to_value(info.format_encode_options.clone()).unwrap(), ) .map_err(|e| anyhow::anyhow!(e))?, + // The option `mock_config` is not public and we can break compatibility. + mock_config: info + .format_encode_options + .get("aws.glue.mock_config") + .cloned(), } } else if info.use_schema_registry { SchemaLocation::Confluent {