Skip to content

Commit

Permalink
e2e with mock_config
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Jul 11, 2024
1 parent a4797fd commit 2bc5fe5
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 23 deletions.
121 changes: 121 additions & 0 deletions e2e_test/source_inline/kafka/avro/glue.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
control substitution on

system ok
rpk topic delete 'glue-sample-my-event'

system ok
rpk topic create 'glue-sample-my-event'

system ok
rpk topic produce -f '%v{hex}\n' 'glue-sample-my-event' <<EOF
03005af405ef11b5444281a2e0563e5a734606666f6f80868dc8ebd98404
EOF

statement ok
create source t with (
connector = 'kafka',
properties.bootstrap.server='${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
topic = 'glue-sample-my-event')
format plain encode avro (
aws.glue.schema_arn = 'arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent',
aws.glue.mock_config = '{
"by_id":{
"5af405ef-11b5-4442-81a2-e0563e5a7346": {
"type": "record",
"name": "MyEvent",
"fields": [
{
"name": "f1",
"type": "string"
},
{
"name": "f2",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
}
]
}
},
"arn_to_latest_id":{
"arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent": "5af405ef-11b5-4442-81a2-e0563e5a7346"
}
}');

query TT
select * from t;
----
foo 2006-01-02 22:04:05.123456+00:00

statement ok
alter source t format plain encode avro (
aws.glue.schema_arn = 'arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent',
aws.glue.mock_config = '{
"by_id":{
"5af405ef-11b5-4442-81a2-e0563e5a7346": {
"type": "record",
"name": "MyEvent",
"fields": [
{
"name": "f1",
"type": "string"
},
{
"name": "f2",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
}
]
},
"4516411b-b1e7-4e67-839f-3ef1b8c29280": {
"type": "record",
"name": "MyEvent",
"fields": [
{
"name": "f1",
"type": "string"
},
{
"name": "f2",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
},
{
"name": "f3",
"type": ["null", "bytes"],
"default": null
}
]
}
},
"arn_to_latest_id":{
"arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent": "4516411b-b1e7-4e67-839f-3ef1b8c29280"
}
}');

query TTT
select * from t;
----
foo 2006-01-02 22:04:05.123456+00:00 NULL

system ok
rpk topic produce -f '%v{hex}\n' 'glue-sample-my-event' <<EOF
03004516411bb1e74e67839f3ef1b8c292800441428089b5e9a886ee050208deadbeef
EOF

query TTT
select * from t order by 2;
----
foo 2006-01-02 22:04:05.123456+00:00 NULL
AB 2022-04-08 00:00:00.123456+00:00 \xdeadbeef

statement ok
drop source t;

system ok
rpk topic delete 'glue-sample-my-event'
156 changes: 138 additions & 18 deletions src/connector/src/parser/avro/glue_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Context;
Expand All @@ -20,6 +21,7 @@ use aws_sdk_glue::types::{SchemaId, SchemaVersionNumber};
use aws_sdk_glue::Client;
use moka::future::Cache;

use crate::connector_common::AwsAuthProps;
use crate::error::ConnectorResult;

/// Fetch schemas from AWS Glue schema registry and cache them.
Expand All @@ -29,23 +31,80 @@ use crate::error::ConnectorResult;
/// convert it with the reader schema. (This is also why Avro has to be used with a schema registry instead of a static schema file.)
///
/// TODO: support protobuf (not sure if it's needed)
pub trait GlueSchemaCache {
/// Gets the a specific schema by id, which is used as *writer schema*.
async fn get_by_id(&self, schema_version_id: uuid::Uuid) -> ConnectorResult<Arc<Schema>>;
/// Gets the latest schema by arn, which is used as *reader schema*.
async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult<Arc<Schema>>;
}

#[derive(Debug)]
pub enum GlueSchemaCacheImpl {
Real(RealGlueSchemaCache),
Mock(MockGlueSchemaCache),
}

impl GlueSchemaCacheImpl {
pub async fn new(
aws_auth_props: &AwsAuthProps,
mock_config: Option<&str>,
) -> ConnectorResult<Self> {
if let Some(mock_config) = mock_config {
return Ok(Self::Mock(MockGlueSchemaCache::new(mock_config)));
}
Ok(Self::Real(RealGlueSchemaCache::new(aws_auth_props).await?))
}
}

impl GlueSchemaCache for GlueSchemaCacheImpl {
async fn get_by_id(&self, schema_version_id: uuid::Uuid) -> ConnectorResult<Arc<Schema>> {
match self {
Self::Real(inner) => inner.get_by_id(schema_version_id).await,
Self::Mock(inner) => inner.get_by_id(schema_version_id).await,
}
}

async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult<Arc<Schema>> {
match self {
Self::Real(inner) => inner.get_by_name(schema_arn).await,
Self::Mock(inner) => inner.get_by_name(schema_arn).await,
}
}
}

#[derive(Debug)]
pub struct GlueSchemaCache {
pub struct RealGlueSchemaCache {
writer_schemas: Cache<uuid::Uuid, Arc<Schema>>,
glue_client: Client,
}

impl GlueSchemaCache {
impl RealGlueSchemaCache {
/// Create a new `GlueSchemaCache`
pub fn new(client: Client) -> Self {
Self {
pub async fn new(aws_auth_props: &AwsAuthProps) -> ConnectorResult<Self> {
let client = Client::new(&aws_auth_props.build_config().await?);
Ok(Self {
writer_schemas: Cache::new(u64::MAX),
glue_client: client,
}
})
}

async fn parse_and_cache_schema(
&self,
schema_version_id: uuid::Uuid,
content: &str,
) -> ConnectorResult<Arc<Schema>> {
let schema = Schema::parse_str(content).context("failed to parse avro schema")?;
let schema = Arc::new(schema);
self.writer_schemas
.insert(schema_version_id, Arc::clone(&schema))
.await;
Ok(schema)
}
}

impl GlueSchemaCache for RealGlueSchemaCache {
/// Gets the a specific schema by id, which is used as *writer schema*.
pub async fn get_by_id(&self, schema_version_id: uuid::Uuid) -> ConnectorResult<Arc<Schema>> {
async fn get_by_id(&self, schema_version_id: uuid::Uuid) -> ConnectorResult<Arc<Schema>> {
if let Some(schema) = self.writer_schemas.get(&schema_version_id).await {
return Ok(schema);
}
Expand All @@ -64,7 +123,7 @@ impl GlueSchemaCache {
}

/// Gets the latest schema by arn, which is used as *reader schema*.
pub async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult<Arc<Schema>> {
async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult<Arc<Schema>> {
let res = self
.glue_client
.get_schema_version()
Expand All @@ -84,17 +143,78 @@ impl GlueSchemaCache {
self.parse_and_cache_schema(schema_version_id, definition)
.await
}
}

async fn parse_and_cache_schema(
&self,
schema_version_id: uuid::Uuid,
content: &str,
) -> ConnectorResult<Arc<Schema>> {
let schema = Schema::parse_str(content).context("failed to parse avro schema")?;
let schema = Arc::new(schema);
self.writer_schemas
.insert(schema_version_id, Arc::clone(&schema))
.await;
Ok(schema)
#[derive(Debug)]
pub struct MockGlueSchemaCache {
by_id: HashMap<uuid::Uuid, Arc<Schema>>,
arn_to_latest_id: HashMap<String, uuid::Uuid>,
}

impl MockGlueSchemaCache {
pub fn new(mock_config: &str) -> Self {
// The `mock_config` accepted is a JSON that looks like:
// {
// "by_id": {
// "4dc80ccf-2d0c-4846-9325-7e1c9e928121": {
// "type": "record",
// "name": "MyEvent",
// "fields": [...]
// },
// "3df022f4-b16d-4afe-bdf7-cf4baf8d01d3": {
// ...
// }
// },
// "arn_to_latest_id": {
// "arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent": "3df022f4-b16d-4afe-bdf7-cf4baf8d01d3"
// }
// }
//
// The format is not public and we can make breaking changes to it.
// Current format only supports avsc.
let parsed: serde_json::Value =
serde_json::from_str(mock_config).expect("mock config shall be valid json");
let by_id = parsed
.get("by_id")
.unwrap()
.as_object()
.unwrap()
.iter()
.map(|(schema_version_id, schema)| {
let schema_version_id = schema_version_id.parse().unwrap();
let schema = Schema::parse(schema).unwrap();
(schema_version_id, Arc::new(schema))
})
.collect();
let arn_to_latest_id = parsed
.get("arn_to_latest_id")
.unwrap()
.as_object()
.unwrap()
.iter()
.map(|(arn, latest_id)| (arn.clone(), latest_id.as_str().unwrap().parse().unwrap()))
.collect();
Self {
by_id,
arn_to_latest_id,
}
}
}

impl GlueSchemaCache for MockGlueSchemaCache {
async fn get_by_id(&self, schema_version_id: uuid::Uuid) -> ConnectorResult<Arc<Schema>> {
Ok(self
.by_id
.get(&schema_version_id)
.context("schema version id not found in mock registry")?
.clone())
}

async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult<Arc<Schema>> {
let schema_version_id = self
.arn_to_latest_id
.get(schema_arn)
.context("schema arn not found in mock registry")?;
self.get_by_id(*schema_version_id).await
}
}
2 changes: 1 addition & 1 deletion src/connector/src/parser/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ mod glue_resolver;
mod parser;

pub use confluent_resolver::ConfluentSchemaCache;
pub use glue_resolver::GlueSchemaCache;
pub use glue_resolver::{GlueSchemaCache, GlueSchemaCacheImpl};
pub use parser::{AvroAccessBuilder, AvroParserConfig};
9 changes: 5 additions & 4 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_connector_codec::decoder::avro::{
};
use risingwave_pb::plan_common::ColumnDesc;

use super::{ConfluentSchemaCache, GlueSchemaCache};
use super::{ConfluentSchemaCache, GlueSchemaCache as _, GlueSchemaCacheImpl};
use crate::error::ConnectorResult;
use crate::parser::unified::AccessImpl;
use crate::parser::util::bytes_from_url;
Expand Down Expand Up @@ -159,7 +159,7 @@ pub struct AvroParserConfig {
#[derive(Debug, Clone)]
enum WriterSchemaCache {
Confluent(Arc<ConfluentSchemaCache>),
Glue(Arc<GlueSchemaCache>),
Glue(Arc<GlueSchemaCacheImpl>),
File,
}

Expand Down Expand Up @@ -241,9 +241,10 @@ impl AvroParserConfig {
SchemaLocation::Glue {
schema_arn,
aws_auth_props,
mock_config,
} => {
let client = aws_sdk_glue::Client::new(&aws_auth_props.build_config().await?);
let resolver = GlueSchemaCache::new(client);
let resolver =
GlueSchemaCacheImpl::new(&aws_auth_props, mock_config.as_deref()).await?;
let schema = resolver.get_by_name(&schema_arn).await?;
Ok(Self {
schema: Arc::new(ResolvedAvroSchema::create(schema)?),
Expand Down
7 changes: 7 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,8 @@ pub enum SchemaLocation {
Glue {
schema_arn: String,
aws_auth_props: AwsAuthProps,
// When `Some(_)`, ignore AWS and load schemas from provided config
mock_config: Option<String>,
},
}

Expand Down Expand Up @@ -1223,6 +1225,11 @@ impl SpecificParserConfig {
serde_json::to_value(info.format_encode_options.clone()).unwrap(),
)
.map_err(|e| anyhow::anyhow!(e))?,
// The option `mock_config` is not public and we can break compatibility.
mock_config: info
.format_encode_options
.get("aws.glue.mock_config")
.cloned(),
}
} else if info.use_schema_registry {
SchemaLocation::Confluent {
Expand Down

0 comments on commit 2bc5fe5

Please sign in to comment.