diff --git a/src/connector/src/parser/avro/glue_resolver.rs b/src/connector/src/parser/avro/glue_resolver.rs index 85d8205f89ca6..15ec6a2ff42ea 100644 --- a/src/connector/src/parser/avro/glue_resolver.rs +++ b/src/connector/src/parser/avro/glue_resolver.rs @@ -14,16 +14,55 @@ use std::sync::Arc; -// use anyhow::Context; +use anyhow::Context; use apache_avro::Schema; +use aws_sdk_glue::Client; use moka::future::Cache; -// use crate::error::ConnectorResult; +use crate::error::ConnectorResult; #[derive(Debug)] -pub struct ConfluentSchemaResolver { - writer_schemas: Cache>, - // confluent_client: Client, +pub struct GlueSchemaResolver { + writer_schemas: Cache>, + glue_client: Client, } -impl ConfluentSchemaResolver {} +impl GlueSchemaResolver { + /// Create a new `GlueSchemaResolver` + pub fn new(client: Client) -> Self { + Self { + writer_schemas: Cache::new(u64::MAX), + glue_client: client, + } + } + + // get the writer schema by id + pub async fn get(&self, schema_version_id: uuid::Uuid) -> ConnectorResult> { + if let Some(schema) = self.writer_schemas.get(&schema_version_id).await { + Ok(schema) + } else { + let r = self + .glue_client + .get_schema_version() + .schema_version_id(schema_version_id) + .send() + .await + .context("glue sdk error")?; + self.parse_and_cache_schema(schema_version_id, r.schema_definition().unwrap()) + .await + } + } + + async fn parse_and_cache_schema( + &self, + schema_version_id: uuid::Uuid, + content: &str, + ) -> ConnectorResult> { + let schema = Schema::parse_str(content).context("failed to parse avro schema")?; + let schema = Arc::new(schema); + self.writer_schemas + .insert(schema_version_id, Arc::clone(&schema)) + .await; + Ok(schema) + } +} diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 4d6d7016b47ad..c05f5315e18f1 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -24,6 +24,7 @@ use risingwave_connector_codec::decoder::avro::{ }; use risingwave_pb::plan_common::ColumnDesc; +use super::glue_resolver::GlueSchemaResolver; use super::ConfluentSchemaCache; use crate::error::ConnectorResult; use crate::parser::unified::AccessImpl; @@ -95,6 +96,22 @@ impl AvroAccessBuilder { None => bail!("avro parse unexpected eof"), } } + AvroHeader::Glue(resolver) => { + assert!(payload.len() >= 18); + // let header_version = payload[0]; + // let compression = payload[1]; + let schema_version_id = uuid::Uuid::from_slice(&payload[2..18]).unwrap(); + // eprintln!(">>>> {schema_version_id}"); + let writer_schema = resolver.get(schema_version_id).await?; + let mut raw_payload = &payload[18..]; + let rr = from_avro_datum( + writer_schema.as_ref(), + &mut raw_payload, + Some(&self.schema.original_schema), + )?; + // eprintln!("{rr:#?}"); + Ok(rr) + } } } } @@ -113,7 +130,7 @@ pub struct AvroParserConfig { #[derive(Debug, Clone)] enum AvroHeader { Confluent(Arc), - // Glue(...) + Glue(Arc), File, // SingleObject, // Fixed & None, @@ -190,7 +207,23 @@ impl AvroParserConfig { map_handling, }) } - AvroHeaderProps::Glue { aws_auth_props: _ } => unreachable!(), + AvroHeaderProps::Glue { aws_auth_props } => { + let client = aws_sdk_glue::Client::new(&aws_auth_props.build_config().await?); + let resolver = GlueSchemaResolver::new(client); + if enable_upsert { + bail!("avro upsert without schema registry is not supported"); + } + let url = url.first().unwrap(); + let schema_content = bytes_from_url(url, None).await?; + let schema = Schema::parse_reader(&mut schema_content.as_slice()) + .context("failed to parse avro schema")?; + Ok(Self { + schema: Arc::new(ResolvedAvroSchema::create(Arc::new(schema))?), + key_schema: None, + writer_schema_cache: AvroHeader::Glue(Arc::new(resolver)), + map_handling, + }) + } } } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 11b17b814b4ec..8ffc492830b69 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -1086,7 +1086,7 @@ pub enum AvroHeaderProps { name_strategy: PbSchemaRegistryNameStrategy, }, Glue { - aws_auth_props: Option, + aws_auth_props: AwsAuthProps, }, } impl Default for AvroHeaderProps { @@ -1213,13 +1213,11 @@ impl SpecificParserConfig { .unwrap(), }; } else { - config.header_props = AvroHeaderProps::File { - aws_auth_props: Some( - serde_json::from_value::( - serde_json::to_value(info.format_encode_options.clone()).unwrap(), - ) - .map_err(|e| anyhow::anyhow!(e))?, - ), + config.header_props = AvroHeaderProps::Glue { + aws_auth_props: serde_json::from_value::( + serde_json::to_value(info.format_encode_options.clone()).unwrap(), + ) + .map_err(|e| anyhow::anyhow!(e))?, }; } EncodingProperties::Avro(config)