Skip to content

Commit

Permalink
get_by_schema_version_id
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Jun 26, 2024
1 parent e08f826 commit a1120b0
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 16 deletions.
51 changes: 45 additions & 6 deletions src/connector/src/parser/avro/glue_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,55 @@

use std::sync::Arc;

// use anyhow::Context;
use anyhow::Context;
use apache_avro::Schema;
use aws_sdk_glue::Client;
use moka::future::Cache;

// use crate::error::ConnectorResult;
use crate::error::ConnectorResult;

#[derive(Debug)]
pub struct ConfluentSchemaResolver {
writer_schemas: Cache<i32, Arc<Schema>>,
// confluent_client: Client,
pub struct GlueSchemaResolver {
writer_schemas: Cache<uuid::Uuid, Arc<Schema>>,
glue_client: Client,
}

impl ConfluentSchemaResolver {}
impl GlueSchemaResolver {
/// Create a new `GlueSchemaResolver`
pub fn new(client: Client) -> Self {
Self {
writer_schemas: Cache::new(u64::MAX),
glue_client: client,
}
}

// get the writer schema by id
pub async fn get(&self, schema_version_id: uuid::Uuid) -> ConnectorResult<Arc<Schema>> {
if let Some(schema) = self.writer_schemas.get(&schema_version_id).await {
Ok(schema)
} else {
let r = self
.glue_client
.get_schema_version()
.schema_version_id(schema_version_id)
.send()
.await
.context("glue sdk error")?;
self.parse_and_cache_schema(schema_version_id, r.schema_definition().unwrap())
.await
}
}

async fn parse_and_cache_schema(
&self,
schema_version_id: uuid::Uuid,
content: &str,
) -> ConnectorResult<Arc<Schema>> {
let schema = Schema::parse_str(content).context("failed to parse avro schema")?;
let schema = Arc::new(schema);
self.writer_schemas
.insert(schema_version_id, Arc::clone(&schema))
.await;
Ok(schema)
}
}
37 changes: 35 additions & 2 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_connector_codec::decoder::avro::{
};
use risingwave_pb::plan_common::ColumnDesc;

use super::glue_resolver::GlueSchemaResolver;
use super::ConfluentSchemaCache;
use crate::error::ConnectorResult;
use crate::parser::unified::AccessImpl;
Expand Down Expand Up @@ -95,6 +96,22 @@ impl AvroAccessBuilder {
None => bail!("avro parse unexpected eof"),
}
}
AvroHeader::Glue(resolver) => {
assert!(payload.len() >= 18);
// let header_version = payload[0];
// let compression = payload[1];
let schema_version_id = uuid::Uuid::from_slice(&payload[2..18]).unwrap();
// eprintln!(">>>> {schema_version_id}");
let writer_schema = resolver.get(schema_version_id).await?;
let mut raw_payload = &payload[18..];
let rr = from_avro_datum(
writer_schema.as_ref(),
&mut raw_payload,
Some(&self.schema.original_schema),
)?;
// eprintln!("{rr:#?}");
Ok(rr)
}
}
}
}
Expand All @@ -113,7 +130,7 @@ pub struct AvroParserConfig {
#[derive(Debug, Clone)]
enum AvroHeader {
Confluent(Arc<ConfluentSchemaCache>),
// Glue(...)
Glue(Arc<GlueSchemaResolver>),
File,
// SingleObject,
// Fixed & None,
Expand Down Expand Up @@ -190,7 +207,23 @@ impl AvroParserConfig {
map_handling,
})
}
AvroHeaderProps::Glue { aws_auth_props: _ } => unreachable!(),
AvroHeaderProps::Glue { aws_auth_props } => {
let client = aws_sdk_glue::Client::new(&aws_auth_props.build_config().await?);
let resolver = GlueSchemaResolver::new(client);
if enable_upsert {
bail!("avro upsert without schema registry is not supported");
}
let url = url.first().unwrap();
let schema_content = bytes_from_url(url, None).await?;
let schema = Schema::parse_reader(&mut schema_content.as_slice())
.context("failed to parse avro schema")?;
Ok(Self {
schema: Arc::new(ResolvedAvroSchema::create(Arc::new(schema))?),
key_schema: None,
writer_schema_cache: AvroHeader::Glue(Arc::new(resolver)),
map_handling,
})
}
}
}

Expand Down
14 changes: 6 additions & 8 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ pub enum AvroHeaderProps {
name_strategy: PbSchemaRegistryNameStrategy,
},
Glue {
aws_auth_props: Option<AwsAuthProps>,
aws_auth_props: AwsAuthProps,
},
}
impl Default for AvroHeaderProps {
Expand Down Expand Up @@ -1213,13 +1213,11 @@ impl SpecificParserConfig {
.unwrap(),
};
} else {
config.header_props = AvroHeaderProps::File {
aws_auth_props: Some(
serde_json::from_value::<AwsAuthProps>(
serde_json::to_value(info.format_encode_options.clone()).unwrap(),
)
.map_err(|e| anyhow::anyhow!(e))?,
),
config.header_props = AvroHeaderProps::Glue {
aws_auth_props: serde_json::from_value::<AwsAuthProps>(
serde_json::to_value(info.format_encode_options.clone()).unwrap(),
)
.map_err(|e| anyhow::anyhow!(e))?,
};
}
EncodingProperties::Avro(config)
Expand Down

0 comments on commit a1120b0

Please sign in to comment.