Skip to content

Commit

Permalink
feat(source): parse avro map into jsonb (#16948)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored May 28, 2024
1 parent 469c380 commit 2b52536
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ icelake = { workspace = true }
indexmap = { version = "1.9.3", features = ["serde"] }
itertools = { workspace = true }
jni = { version = "0.21.1", features = ["invocation"] }
jsonbb = { workspace = true }
jsonwebtoken = "9.2.0"
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
maplit = "1.0.2"
Expand Down
43 changes: 28 additions & 15 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::error::ConnectorResult;
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
use crate::parser::unified::AccessImpl;
use crate::parser::util::bytes_from_url;
use crate::parser::{AccessBuilder, EncodingProperties, EncodingType};
use crate::parser::{AccessBuilder, AvroProperties, EncodingProperties, EncodingType, MapHandling};
use crate::schema::schema_registry::{
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
};
Expand Down Expand Up @@ -101,35 +101,46 @@ pub struct AvroParserConfig {
pub schema: Arc<Schema>,
pub key_schema: Option<Arc<Schema>>,
pub schema_resolver: Option<Arc<ConfluentSchemaResolver>>,

pub map_handling: Option<MapHandling>,
}

impl AvroParserConfig {
pub async fn new(encoding_properties: EncodingProperties) -> ConnectorResult<Self> {
let avro_config = try_match_expand!(encoding_properties, EncodingProperties::Avro)?;
let schema_location = &avro_config.row_schema_location;
let enable_upsert = avro_config.enable_upsert;
let AvroProperties {
use_schema_registry,
row_schema_location: schema_location,
client_config,
aws_auth_props,
topic,
enable_upsert,
record_name,
key_record_name,
name_strategy,
map_handling,
} = try_match_expand!(encoding_properties, EncodingProperties::Avro)?;
let url = handle_sr_list(schema_location.as_str())?;
if avro_config.use_schema_registry {
let client = Client::new(url, &avro_config.client_config)?;
if use_schema_registry {
let client = Client::new(url, &client_config)?;
let resolver = ConfluentSchemaResolver::new(client);

let subject_key = if enable_upsert {
Some(get_subject_by_strategy(
&avro_config.name_strategy,
avro_config.topic.as_str(),
avro_config.key_record_name.as_deref(),
&name_strategy,
topic.as_str(),
key_record_name.as_deref(),
true,
)?)
} else {
if let Some(name) = &avro_config.key_record_name {
if let Some(name) = &key_record_name {
bail!("key.message = {name} not used");
}
None
};
let subject_value = get_subject_by_strategy(
&avro_config.name_strategy,
avro_config.topic.as_str(),
avro_config.record_name.as_deref(),
&name_strategy,
topic.as_str(),
record_name.as_deref(),
false,
)?;
tracing::debug!("infer key subject {subject_key:?}, value subject {subject_value}");
Expand All @@ -142,25 +153,27 @@ impl AvroParserConfig {
None
},
schema_resolver: Some(Arc::new(resolver)),
map_handling,
})
} else {
if enable_upsert {
bail!("avro upsert without schema registry is not supported");
}
let url = url.first().unwrap();
let schema_content = bytes_from_url(url, avro_config.aws_auth_props.as_ref()).await?;
let schema_content = bytes_from_url(url, aws_auth_props.as_ref()).await?;
let schema = Schema::parse_reader(&mut schema_content.as_slice())
.context("failed to parse avro schema")?;
Ok(Self {
schema: Arc::new(schema),
key_schema: None,
schema_resolver: None,
map_handling,
})
}
}

pub fn map_to_columns(&self) -> ConnectorResult<Vec<ColumnDesc>> {
avro_schema_to_column_descs(self.schema.as_ref())
avro_schema_to_column_descs(self.schema.as_ref(), self.map_handling)
}
}

Expand Down
173 changes: 164 additions & 9 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,28 @@
use std::sync::LazyLock;

use apache_avro::schema::{DecimalSchema, RecordSchema, Schema};
use apache_avro::types::{Value, ValueKind};
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{DataType, Decimal};
use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion};

use crate::error::ConnectorResult;
use crate::parser::unified::bail_uncategorized;
use crate::parser::{AccessError, MapHandling};

pub fn avro_schema_to_column_descs(schema: &Schema) -> ConnectorResult<Vec<ColumnDesc>> {
pub fn avro_schema_to_column_descs(
schema: &Schema,
map_handling: Option<MapHandling>,
) -> ConnectorResult<Vec<ColumnDesc>> {
if let Schema::Record(RecordSchema { fields, .. }) = schema {
let mut index = 0;
let fields = fields
.iter()
.map(|field| avro_field_to_column_desc(&field.name, &field.schema, &mut index))
.map(|field| {
avro_field_to_column_desc(&field.name, &field.schema, &mut index, map_handling)
})
.collect::<ConnectorResult<Vec<_>>>()?;
Ok(fields)
} else {
Expand All @@ -43,8 +51,9 @@ fn avro_field_to_column_desc(
name: &str,
schema: &Schema,
index: &mut i32,
map_handling: Option<MapHandling>,
) -> ConnectorResult<ColumnDesc> {
let data_type = avro_type_mapping(schema)?;
let data_type = avro_type_mapping(schema, map_handling)?;
match schema {
Schema::Record(RecordSchema {
name: schema_name,
Expand All @@ -53,7 +62,7 @@ fn avro_field_to_column_desc(
}) => {
let vec_column = fields
.iter()
.map(|f| avro_field_to_column_desc(&f.name, &f.schema, index))
.map(|f| avro_field_to_column_desc(&f.name, &f.schema, index, map_handling))
.collect::<ConnectorResult<Vec<_>>>()?;
*index += 1;
Ok(ColumnDesc {
Expand Down Expand Up @@ -83,7 +92,10 @@ fn avro_field_to_column_desc(
}
}

fn avro_type_mapping(schema: &Schema) -> ConnectorResult<DataType> {
fn avro_type_mapping(
schema: &Schema,
map_handling: Option<MapHandling>,
) -> ConnectorResult<DataType> {
let data_type = match schema {
Schema::String => DataType::Varchar,
Schema::Int => DataType::Int32,
Expand Down Expand Up @@ -125,13 +137,13 @@ fn avro_type_mapping(schema: &Schema) -> ConnectorResult<DataType> {

let struct_fields = fields
.iter()
.map(|f| avro_type_mapping(&f.schema))
.map(|f| avro_type_mapping(&f.schema, map_handling))
.collect::<ConnectorResult<Vec<_>>>()?;
let struct_names = fields.iter().map(|f| f.name.clone()).collect_vec();
DataType::new_struct(struct_fields, struct_names)
}
Schema::Array(item_schema) => {
let item_type = avro_type_mapping(item_schema.as_ref())?;
let item_type = avro_type_mapping(item_schema.as_ref(), map_handling)?;
DataType::List(Box::new(item_type))
}
Schema::Union(union_schema) => {
Expand All @@ -141,7 +153,7 @@ fn avro_type_mapping(schema: &Schema) -> ConnectorResult<DataType> {
.find_or_first(|s| !matches!(s, Schema::Null))
.ok_or_else(|| anyhow::format_err!("unsupported Avro type: {:?}", union_schema))?;

avro_type_mapping(nested_schema)?
avro_type_mapping(nested_schema, map_handling)?
}
Schema::Ref { name } => {
if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME
Expand All @@ -152,10 +164,153 @@ fn avro_type_mapping(schema: &Schema) -> ConnectorResult<DataType> {
bail!("unsupported Avro type: {:?}", schema);
}
}
Schema::Map(_) | Schema::Null | Schema::Fixed(_) | Schema::Uuid => {
Schema::Map(value_schema) => {
// TODO: support native map type
match map_handling {
Some(MapHandling::Jsonb) => {
if supported_avro_to_json_type(value_schema) {
DataType::Jsonb
} else {
bail!(
"unsupported Avro type, cannot convert map to jsonb: {:?}",
schema
)
}
}
None => {
bail!("`map.handling.mode` not specified in ENCODE AVRO (...). Currently supported modes: `jsonb`")
}
}
}
Schema::Null | Schema::Fixed(_) | Schema::Uuid => {
bail!("unsupported Avro type: {:?}", schema)
}
};

Ok(data_type)
}

/// Check for [`avro_to_jsonb`]
fn supported_avro_to_json_type(schema: &Schema) -> bool {
match schema {
Schema::Null | Schema::Boolean | Schema::Int | Schema::String => true,

Schema::Map(value_schema) | Schema::Array(value_schema) => {
supported_avro_to_json_type(value_schema)
}
Schema::Record(RecordSchema { fields, .. }) => fields
.iter()
.all(|f| supported_avro_to_json_type(&f.schema)),
Schema::Long
| Schema::Float
| Schema::Double
| Schema::Bytes
| Schema::Enum(_)
| Schema::Fixed(_)
| Schema::Decimal(_)
| Schema::Uuid
| Schema::Date
| Schema::TimeMillis
| Schema::TimeMicros
| Schema::TimestampMillis
| Schema::TimestampMicros
| Schema::LocalTimestampMillis
| Schema::LocalTimestampMicros
| Schema::Duration
| Schema::Ref { name: _ }
| Schema::Union(_) => false,
}
}

pub(crate) fn avro_to_jsonb(
avro: &Value,
builder: &mut jsonbb::Builder,
) -> crate::parser::AccessResult<()> {
match avro {
Value::Null => builder.add_null(),
Value::Boolean(b) => builder.add_bool(*b),
Value::Int(i) => builder.add_i64(*i as i64),
Value::String(s) => builder.add_string(s),
Value::Map(m) => {
builder.begin_object();
for (k, v) in m {
builder.add_string(k);
avro_to_jsonb(v, builder)?;
}
builder.end_object()
}
// same representation as map
Value::Record(r) => {
builder.begin_object();
for (k, v) in r {
builder.add_string(k);
avro_to_jsonb(v, builder)?;
}
builder.end_object()
}
Value::Array(a) => {
builder.begin_array();
for v in a {
avro_to_jsonb(v, builder)?;
}
builder.end_array()
}

// TODO: figure out where the following encoding is reasonable before enabling them.
// See discussions: https://github.com/risingwavelabs/risingwave/pull/16948

// jsonbb supports int64, but JSON spec does not allow it. How should we handle it?
// BTW, protobuf canonical JSON converts int64 to string.
// Value::Long(l) => builder.add_i64(*l),
// Value::Float(f) => {
// if f.is_nan() || f.is_infinite() {
// // XXX: pad null or return err here?
// builder.add_null()
// } else {
// builder.add_f64(*f as f64)
// }
// }
// Value::Double(f) => {
// if f.is_nan() || f.is_infinite() {
// // XXX: pad null or return err here?
// builder.add_null()
// } else {
// builder.add_f64(*f)
// }
// }
// // XXX: What encoding to use?
// // ToText is \x plus hex string.
// Value::Bytes(b) => builder.add_string(&ToText::to_text(&b.as_slice())),
// Value::Enum(_, symbol) => {
// builder.add_string(&symbol);
// }
// Value::Uuid(id) => builder.add_string(&id.as_hyphenated().to_string()),
// // For Union, one concern is that the avro union is tagged (like rust enum) but json union is untagged (like c union).
// // When the union consists of multiple records, it is possible to distinguish which variant is active in avro, but in json they will all become jsonb objects and indistinguishable.
// Value::Union(_, v) => avro_to_jsonb(v, builder)?
// XXX: pad null or return err here?
v @ (Value::Long(_)
| Value::Float(_)
| Value::Double(_)
| Value::Bytes(_)
| Value::Enum(_, _)
| Value::Fixed(_, _)
| Value::Date(_)
| Value::Decimal(_)
| Value::TimeMillis(_)
| Value::TimeMicros(_)
| Value::TimestampMillis(_)
| Value::TimestampMicros(_)
| Value::LocalTimestampMillis(_)
| Value::LocalTimestampMicros(_)
| Value::Duration(_)
| Value::Uuid(_)
| Value::Union(_, _)) => {
bail_uncategorized!(
"unimplemented conversion from avro to jsonb: {:?}",
ValueKind::from(v)
)
}
}
Ok(())
}
Loading

0 comments on commit 2b52536

Please sign in to comment.