Skip to content

Commit

Permalink
feat(sink): Avro with AWS Glue Schema Registry (#20181)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored and xiangjinwu committed Jan 17, 2025
1 parent 062d33d commit f465c23
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 31 deletions.
70 changes: 70 additions & 0 deletions e2e_test/source_inline/kafka/avro/glue.slt
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,76 @@ select * from t order by 2;
foo 2006-01-02 22:04:05.123456+00:00 NULL
AB 2022-04-08 00:00:00.123456+00:00 \xdeadbeef

statement ok
create sink sk as select
'bar' as f1,
to_timestamp(1735689600) as f2,
'a0A'::bytea as f3
with (
connector = 'kafka',
properties.bootstrap.server='${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
topic = 'glue-sample-my-event')
format plain encode avro (
aws.glue.schema_arn = 'arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent',
aws.glue.mock_config = '{
"by_id":{
"5af405ef-11b5-4442-81a2-e0563e5a7346": {
"type": "record",
"name": "MyEvent",
"fields": [
{
"name": "f1",
"type": "string"
},
{
"name": "f2",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
}
]
},
"4516411b-b1e7-4e67-839f-3ef1b8c29280": {
"type": "record",
"name": "MyEvent",
"fields": [
{
"name": "f1",
"type": "string"
},
{
"name": "f2",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
},
{
"name": "f3",
"type": ["null", "bytes"],
"default": null
}
]
}
},
"arn_to_latest_id":{
"arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent": "4516411b-b1e7-4e67-839f-3ef1b8c29280"
}
}');

sleep 1s

query TTT
select * from t order by 2;
----
foo 2006-01-02 22:04:05.123456+00:00 NULL
AB 2022-04-08 00:00:00.123456+00:00 \xdeadbeef
bar 2025-01-01 00:00:00+00:00 \x613041

statement ok
drop sink sk;

statement ok
drop source t;

Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::parser::unified::AccessImpl;
use crate::parser::util::bytes_from_url;
use crate::parser::{AccessBuilder, EncodingProperties};
use crate::schema::schema_registry::{extract_schema_id, handle_sr_list, Client, WireFormatError};
use crate::schema::SchemaLoader;
use crate::schema::{ConfluentSchemaLoader, SchemaLoader};

#[derive(Debug)]
pub struct ProtobufAccessBuilder {
Expand Down Expand Up @@ -81,13 +81,13 @@ impl ProtobufParserConfig {
}
let pool = if protobuf_config.use_schema_registry {
let client = Client::new(url, &protobuf_config.client_config)?;
let loader = SchemaLoader {
let loader = SchemaLoader::Confluent(ConfluentSchemaLoader {
client,
name_strategy: protobuf_config.name_strategy,
topic: protobuf_config.topic,
key_record_name: None,
val_record_name: Some(message_name.clone()),
};
});
let (_schema_id, root_file_descriptor) = loader
.load_val_schema::<FileDescriptor>()
.await
Expand Down
181 changes: 168 additions & 13 deletions src/connector/src/schema/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,42 @@ use super::schema_registry::{
get_subject_by_strategy, handle_sr_list, name_strategy_from_str, Client, Subject,
};
use super::{
invalid_option_error, InvalidOptionError, SchemaFetchError, KEY_MESSAGE_NAME_KEY,
MESSAGE_NAME_KEY, NAME_STRATEGY_KEY, SCHEMA_REGISTRY_KEY,
invalid_option_error, malformed_response_error, InvalidOptionError, MalformedResponseError,
SchemaFetchError, AWS_GLUE_SCHEMA_ARN_KEY, KEY_MESSAGE_NAME_KEY, MESSAGE_NAME_KEY,
NAME_STRATEGY_KEY, SCHEMA_REGISTRY_KEY,
};
use crate::connector_common::AwsAuthProps;

pub struct SchemaLoader {
pub enum SchemaLoader {
Confluent(ConfluentSchemaLoader),
Glue(GlueSchemaLoader),
}

pub struct ConfluentSchemaLoader {
pub client: Client,
pub name_strategy: PbSchemaRegistryNameStrategy,
pub topic: String,
pub key_record_name: Option<String>,
pub val_record_name: Option<String>,
}

impl SchemaLoader {
pub enum GlueSchemaLoader {
Real {
client: aws_sdk_glue::Client,
schema_arn: String,
},
Mock {
schema_version_id: uuid::Uuid,
definition: String,
},
}

pub enum SchemaVersion {
Confluent(i32),
Glue(uuid::Uuid),
}

impl ConfluentSchemaLoader {
pub fn from_format_options(
topic: &str,
format_options: &BTreeMap<String, String>,
Expand Down Expand Up @@ -66,24 +89,156 @@ impl SchemaLoader {

async fn load_schema<Out: LoadedSchema, const IS_KEY: bool>(
&self,
record: Option<&str>,
) -> Result<(i32, Out), SchemaFetchError> {
) -> Result<(SchemaVersion, Out), SchemaFetchError> {
let record = match IS_KEY {
true => self.key_record_name.as_deref(),
false => self.val_record_name.as_deref(),
};
let subject = get_subject_by_strategy(&self.name_strategy, &self.topic, record, IS_KEY)?;
let (primary_subject, dependency_subjects) =
self.client.get_subject_and_references(&subject).await?;
let schema_id = primary_subject.schema.id;
let out = Out::compile(primary_subject, dependency_subjects)?;
Ok((schema_id, out))
Ok((SchemaVersion::Confluent(schema_id), out))
}
}

pub async fn load_key_schema<Out: LoadedSchema>(&self) -> Result<(i32, Out), SchemaFetchError> {
self.load_schema::<Out, true>(self.key_record_name.as_deref())
.await
impl GlueSchemaLoader {
pub async fn from_format_options(
schema_arn: &str,
format_options: &BTreeMap<String, String>,
) -> Result<Self, SchemaFetchError> {
risingwave_common::license::Feature::GlueSchemaRegistry.check_available()?;
if let Some(mock_config) = format_options.get("aws.glue.mock_config") {
// Internal format for easy testing. See `MockGlueSchemaCache` for details.
let parsed: serde_json::Value =
serde_json::from_str(mock_config).expect("mock config shall be valid json");
let schema_version_id_str = parsed
.get("arn_to_latest_id")
.unwrap()
.as_object()
.unwrap()
.get(schema_arn)
.unwrap()
.as_str()
.unwrap();
let definition = parsed
.get("by_id")
.unwrap()
.as_object()
.unwrap()
.get(schema_version_id_str)
.unwrap()
.to_string();
return Ok(Self::Mock {
schema_version_id: schema_version_id_str.parse()?,
definition,
});
};
let aws_auth_props =
serde_json::from_value::<AwsAuthProps>(serde_json::to_value(format_options).unwrap())
.map_err(|_e| invalid_option_error!(""))?;
let client = aws_sdk_glue::Client::new(
&aws_auth_props
.build_config()
.await
.map_err(SchemaFetchError::YetToMigrate)?,
);
Ok(Self::Real {
client,
schema_arn: schema_arn.to_owned(),
})
}

pub async fn load_val_schema<Out: LoadedSchema>(&self) -> Result<(i32, Out), SchemaFetchError> {
self.load_schema::<Out, false>(self.val_record_name.as_deref())
.await
async fn load_schema<Out: LoadedSchema, const IS_KEY: bool>(
&self,
) -> Result<(SchemaVersion, Out), SchemaFetchError> {
if IS_KEY {
return Err(invalid_option_error!(
"GlueSchemaRegistry cannot be key. Specify `KEY ENCODE [TEXT | BYTES]` please."
)
.into());
}
let (schema_version_id, definition) = match self {
Self::Mock {
schema_version_id,
definition,
} => (*schema_version_id, definition.clone()),
Self::Real { client, schema_arn } => {
use aws_sdk_glue::types::{SchemaId, SchemaVersionNumber};

let res = client
.get_schema_version()
.schema_id(SchemaId::builder().schema_arn(schema_arn).build())
.schema_version_number(
SchemaVersionNumber::builder().latest_version(true).build(),
)
.send()
.await
.map_err(|e| Box::new(e.into_service_error()))?;
let schema_version_id = res
.schema_version_id()
.ok_or_else(|| malformed_response_error!("missing schema_version_id"))?
.parse()?;
let definition = res
.schema_definition()
.ok_or_else(|| malformed_response_error!("missing schema_definition"))?
.to_owned();
(schema_version_id, definition)
}
};

// https://github.com/awslabs/aws-glue-schema-registry/issues/32
// No references in AWS Glue Schema Registry yet
let primary = Subject {
version: 0,
name: "".to_owned(),
schema: super::schema_registry::ConfluentSchema {
id: 0,
content: definition,
},
};
let out = Out::compile(primary, vec![])?;
Ok((SchemaVersion::Glue(schema_version_id), out))
}
}

impl SchemaLoader {
pub async fn from_format_options(
topic: &str,
format_options: &BTreeMap<String, String>,
) -> Result<Self, SchemaFetchError> {
if let Some(schema_arn) = format_options.get(AWS_GLUE_SCHEMA_ARN_KEY) {
Ok(Self::Glue(
GlueSchemaLoader::from_format_options(schema_arn, format_options).await?,
))
} else {
Ok(Self::Confluent(ConfluentSchemaLoader::from_format_options(
topic,
format_options,
)?))
}
}

async fn load_schema<Out: LoadedSchema, const IS_KEY: bool>(
&self,
) -> Result<(SchemaVersion, Out), SchemaFetchError> {
match self {
Self::Confluent(inner) => inner.load_schema::<Out, IS_KEY>().await,
Self::Glue(inner) => inner.load_schema::<Out, IS_KEY>().await,
}
}

pub async fn load_key_schema<Out: LoadedSchema>(
&self,
) -> Result<(SchemaVersion, Out), SchemaFetchError> {
self.load_schema::<Out, true>().await
}

pub async fn load_val_schema<Out: LoadedSchema>(
&self,
) -> Result<(SchemaVersion, Out), SchemaFetchError> {
self.load_schema::<Out, false>().await
}
}

Expand Down
16 changes: 15 additions & 1 deletion src/connector/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod loader;
pub mod protobuf;
pub mod schema_registry;

pub use loader::SchemaLoader;
pub use loader::{ConfluentSchemaLoader, SchemaLoader, SchemaVersion};

const MESSAGE_NAME_KEY: &str = "message";
const KEY_MESSAGE_NAME_KEY: &str = "key.message";
Expand All @@ -36,12 +36,26 @@ pub struct InvalidOptionError {
// source: Option<risingwave_common::error::BoxedError>,
}

#[derive(Debug, thiserror::Error, thiserror_ext::Macro)]
#[error("Malformed response: {message}")]
pub struct MalformedResponseError {
pub message: String,
}

#[derive(Debug, thiserror::Error)]
pub enum SchemaFetchError {
#[error(transparent)]
InvalidOption(#[from] InvalidOptionError),
#[error(transparent)]
License(#[from] risingwave_common::license::FeatureNotAvailable),
#[error(transparent)]
Request(#[from] schema_registry::ConcurrentRequestError),
#[error(transparent)]
AwsGlue(#[from] Box<aws_sdk_glue::operation::get_schema_version::GetSchemaVersionError>),
#[error(transparent)]
MalformedResponse(#[from] MalformedResponseError),
#[error("schema version id invalid: {0}")]
InvalidUuid(#[from] uuid::Error),
#[error("schema compilation error: {0}")]
SchemaCompile(
#[source]
Expand Down
10 changes: 9 additions & 1 deletion src/connector/src/schema/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,17 @@ pub async fn fetch_from_registry(
format_options: &BTreeMap<String, String>,
topic: &str,
) -> Result<(MessageDescriptor, i32), SchemaFetchError> {
let loader = SchemaLoader::from_format_options(topic, format_options)?;
let loader = SchemaLoader::from_format_options(topic, format_options).await?;

let (vid, vpb) = loader.load_val_schema::<FileDescriptor>().await?;
let vid = match vid {
super::SchemaVersion::Confluent(vid) => vid,
super::SchemaVersion::Glue(_) => {
return Err(
invalid_option_error!("Protobuf with Glue Schema Registry unsupported").into(),
)
}
};

Ok((
vpb.parent_pool().get_message_by_name(message_name).unwrap(),
Expand Down
Loading

0 comments on commit f465c23

Please sign in to comment.