Skip to content

Commit

Permalink
refactor(source): consolidate duplicate schema loading logic for avro…
Browse files Browse the repository at this point in the history
…/protobuf/json-schema (#14281)
  • Loading branch information
xiangjinwu authored Jan 2, 2024
1 parent 4f5b167 commit 1b691dd
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 152 deletions.
2 changes: 1 addition & 1 deletion e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ select
t Rising \x6130 3.5 4.25 22 23 24 0 26 27 (1,"") {4,0,4} (1136239445,0) 42
f Wave \x5a4446 1.5 0 11 12 13 14 15 16 (4,foo) {} (0,0) 0

statement error failed to read file
statement error No such file
create sink sink_err from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf',
Expand Down
49 changes: 14 additions & 35 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use risingwave_common::error::{Result, RwError};
use risingwave_common::try_match_expand;
use risingwave_pb::plan_common::ColumnDesc;

use super::schema_resolver::*;
use super::schema_resolver::ConfluentSchemaResolver;
use super::util::avro_schema_to_column_descs;
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
use crate::parser::unified::AccessImpl;
use crate::parser::util::{read_schema_from_http, read_schema_from_local, read_schema_from_s3};
use crate::parser::util::bytes_from_url;
use crate::parser::{AccessBuilder, EncodingProperties, EncodingType};
use crate::schema::schema_registry::{
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
Expand Down Expand Up @@ -156,18 +156,8 @@ impl AvroParserConfig {
)));
}
let url = url.first().unwrap();
let schema_content = match url.scheme() {
"file" => read_schema_from_local(url.path()),
"s3" => {
read_schema_from_s3(url, avro_config.aws_auth_props.as_ref().unwrap()).await
}
"https" | "http" => read_schema_from_http(url).await,
scheme => Err(RwError::from(ProtocolError(format!(
"path scheme {} is not supported",
scheme
)))),
}?;
let schema = Schema::parse_str(&schema_content).map_err(|e| {
let schema_content = bytes_from_url(url, avro_config.aws_auth_props.as_ref()).await?;
let schema = Schema::parse_reader(&mut schema_content.as_slice()).map_err(|e| {
RwError::from(InternalError(format!("Avro schema parse error {}", e)))
})?;
Ok(Self {
Expand Down Expand Up @@ -208,15 +198,11 @@ mod test {
use risingwave_common::catalog::ColumnId;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, Date, Interval, ScalarImpl, Timestamptz};
use risingwave_common::{error, try_match_expand};
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};
use url::Url;

use super::{
read_schema_from_http, read_schema_from_local, read_schema_from_s3, AvroAccessBuilder,
AvroParserConfig,
};
use super::*;
use crate::common::AwsAuthProps;
use crate::parser::plain_parser::PlainParser;
use crate::parser::unified::avro::unix_epoch_days;
Expand All @@ -241,33 +227,26 @@ mod test {
.to_string()
}

#[tokio::test]
async fn test_read_schema_from_local() {
let schema_path = test_data_path("complex-schema.avsc");
let content_rs = read_schema_from_local(schema_path);
assert!(content_rs.is_ok());
}

#[tokio::test]
#[ignore]
async fn test_load_schema_from_s3() {
let schema_location = "s3://mingchao-schemas/complex-schema.avsc".to_string();
let url = Url::parse(&schema_location).unwrap();
let aws_auth_config: AwsAuthProps =
serde_json::from_str(r#"region":"ap-southeast-1"#).unwrap();
let schema_content = read_schema_from_s3(&url, &aws_auth_config).await;
let schema_content = bytes_from_url(&url, Some(&aws_auth_config)).await;
assert!(schema_content.is_ok());
let schema = Schema::parse_str(&schema_content.unwrap());
let schema = Schema::parse_reader(&mut schema_content.unwrap().as_slice());
assert!(schema.is_ok());
println!("schema = {:?}", schema.unwrap());
}

#[tokio::test]
async fn test_load_schema_from_local() {
let schema_location = test_data_path("complex-schema.avsc");
let schema_content = read_schema_from_local(schema_location);
let schema_location = Url::from_file_path(test_data_path("complex-schema.avsc")).unwrap();
let schema_content = bytes_from_url(&schema_location, None).await;
assert!(schema_content.is_ok());
let schema = Schema::parse_str(&schema_content.unwrap());
let schema = Schema::parse_reader(&mut schema_content.unwrap().as_slice());
assert!(schema.is_ok());
println!("schema = {:?}", schema.unwrap());
}
Expand All @@ -278,14 +257,14 @@ mod test {
let schema_location =
"https://mingchao-schemas.s3.ap-southeast-1.amazonaws.com/complex-schema.avsc";
let url = Url::parse(schema_location).unwrap();
let schema_content = read_schema_from_http(&url).await;
let schema_content = bytes_from_url(&url, None).await;
assert!(schema_content.is_ok());
let schema = Schema::parse_str(&schema_content.unwrap());
let schema = Schema::parse_reader(&mut schema_content.unwrap().as_slice());
assert!(schema.is_ok());
println!("schema = {:?}", schema.unwrap());
}

async fn new_avro_conf_from_local(file_name: &str) -> error::Result<AvroParserConfig> {
async fn new_avro_conf_from_local(file_name: &str) -> Result<AvroParserConfig> {
let schema_path = "file://".to_owned() + &test_data_path(file_name);
let info = StreamSourceInfo {
row_schema_location: schema_path.clone(),
Expand All @@ -298,7 +277,7 @@ mod test {
AvroParserConfig::new(parser_config.encoding_config).await
}

async fn new_avro_parser_from_local(file_name: &str) -> error::Result<PlainParser> {
async fn new_avro_parser_from_local(file_name: &str) -> Result<PlainParser> {
let conf = new_avro_conf_from_local(file_name).await?;

Ok(PlainParser {
Expand Down
20 changes: 7 additions & 13 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::try_match_expand;
use risingwave_pb::plan_common::ColumnDesc;

use super::avro::schema_resolver::ConfluentSchemaResolver;
use super::util::{get_kafka_topic, read_schema_from_http, read_schema_from_local};
use super::util::{bytes_from_url, get_kafka_topic};
use super::{EncodingProperties, SchemaRegistryAuth, SpecificParserConfig};
use crate::only_parse_payload;
use crate::parser::avro::util::avro_schema_to_column_descs;
Expand Down Expand Up @@ -149,26 +149,20 @@ pub async fn schema_to_columns(
props: &HashMap<String, String>,
) -> anyhow::Result<Vec<ColumnDesc>> {
let url = handle_sr_list(schema_location)?;
let schema_content = if let Some(schema_registry_auth) = schema_registry_auth {
let json_schema = if let Some(schema_registry_auth) = schema_registry_auth {
let client = Client::new(url, &schema_registry_auth)?;
let topic = get_kafka_topic(props)?;
let resolver = ConfluentSchemaResolver::new(client);
resolver
let content = resolver
.get_raw_schema_by_subject_name(&format!("{}-value", topic))
.await?
.content
.content;
serde_json::from_str(&content)?
} else {
let url = url.first().unwrap();
match url.scheme() {
"file" => read_schema_from_local(url.path()),
"https" | "http" => read_schema_from_http(url).await,
scheme => Err(RwError::from(ProtocolError(format!(
"path scheme {} is not supported",
scheme
)))),
}?
let bytes = bytes_from_url(url, None).await?;
serde_json::from_slice(&bytes)?
};
let json_schema = serde_json::from_str(&schema_content)?;
let context = Context::default();
let avro_schema = convert_avro(&json_schema, context).to_string();
let schema = Schema::parse_str(&avro_schema)
Expand Down
42 changes: 2 additions & 40 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::path::Path;
use std::sync::Arc;

use itertools::Itertools;
Expand All @@ -28,9 +27,9 @@ use risingwave_common::types::{DataType, Datum, Decimal, JsonbVal, ScalarImpl, F
use risingwave_pb::plan_common::{AdditionalColumnType, ColumnDesc, ColumnDescVersion};

use super::schema_resolver::*;
use crate::aws_utils::load_file_descriptor_from_s3;
use crate::parser::unified::protobuf::ProtobufAccess;
use crate::parser::unified::AccessImpl;
use crate::parser::util::bytes_from_url;
use crate::parser::{AccessBuilder, EncodingProperties};
use crate::schema::schema_registry::{
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
Expand Down Expand Up @@ -112,33 +111,7 @@ impl ProtobufParserConfig {
compile_file_descriptor_from_schema_registry(schema_value.as_str(), &client).await?
} else {
let url = url.first().unwrap();
match url.scheme() {
// TODO(Tao): support local file only when it's compiled in debug mode.
"file" => {
let path = url.to_file_path().map_err(|_| {
RwError::from(InternalError(format!("illegal path: {}", location)))
})?;

if path.is_dir() {
return Err(RwError::from(ProtocolError(
"schema file location must not be a directory".to_string(),
)));
}
Self::local_read_to_bytes(&path)
}
"s3" => {
load_file_descriptor_from_s3(
url,
protobuf_config.aws_auth_props.as_ref().unwrap(),
)
.await
}
"https" | "http" => load_file_descriptor_from_http(url).await,
scheme => Err(RwError::from(ProtocolError(format!(
"path scheme {} is not supported",
scheme
)))),
}?
bytes_from_url(url, protobuf_config.aws_auth_props.as_ref()).await?
};

let pool = DescriptorPool::decode(schema_bytes.as_slice()).map_err(|e| {
Expand All @@ -162,17 +135,6 @@ impl ProtobufParserConfig {
})
}

/// read binary schema from a local file
fn local_read_to_bytes(path: &Path) -> Result<Vec<u8>> {
std::fs::read(path).map_err(|e| {
RwError::from(InternalError(format!(
"failed to read file {}: {}",
path.display(),
e
)))
})
}

/// Maps the protobuf schema to relational schema.
pub fn map_to_columns(&self) -> Result<Vec<ColumnDesc>> {
let mut columns = Vec::with_capacity(self.message_descriptor.fields().len());
Expand Down
9 changes: 0 additions & 9 deletions src/connector/src/parser/protobuf/schema_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,9 @@ use protobuf_native::compiler::{
use protobuf_native::MessageLite;
use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
use risingwave_common::error::{Result, RwError};
use url::Url;

use crate::parser::util::download_from_http;
use crate::schema::schema_registry::Client;

const PB_SCHEMA_LOCATION_S3_REGION: &str = "region";

pub(super) async fn load_file_descriptor_from_http(location: &Url) -> Result<Vec<u8>> {
let schema_bytes = download_from_http(location).await?;
Ok(schema_bytes.to_vec())
}

macro_rules! embed_wkts {
[$( $path:literal ),+ $(,)?] => {
&[$(
Expand Down
74 changes: 20 additions & 54 deletions src/connector/src/parser/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::path::Path;

use bytes::Bytes;
use reqwest::Url;
use risingwave_common::error::ErrorCode::{
InternalError, InvalidConfigValue, InvalidParameterValue, ProtocolError,
};
use risingwave_common::error::ErrorCode::{InvalidParameterValue, ProtocolError};
use risingwave_common::error::{Result, RwError};

use crate::aws_utils::{default_conn_config, s3_client};
use crate::aws_utils::load_file_descriptor_from_s3;
use crate::common::AwsAuthProps;

const AVRO_SCHEMA_LOCATION_S3_REGION: &str = "region";

/// get kafka topic name
pub(super) fn get_kafka_topic(props: &HashMap<String, String>) -> Result<&String> {
const KAFKA_TOPIC_KEY1: &str = "kafka.topic";
Expand Down Expand Up @@ -94,52 +89,23 @@ macro_rules! extract_key_config {
};
}

/// Read schema from local file. For on-premise or testing.
pub(super) fn read_schema_from_local(path: impl AsRef<Path>) -> Result<String> {
std::fs::read_to_string(path.as_ref()).map_err(|e| e.into())
}

/// Read schema from http/https. For common usage.
pub(super) async fn read_schema_from_http(location: &Url) -> Result<String> {
let schema_bytes = download_from_http(location).await?;

String::from_utf8(schema_bytes.into()).map_err(|e| {
RwError::from(InternalError(format!(
"read schema string from https failed {}",
e
)))
})
}

/// Read schema from s3 bucket.
/// S3 file location format: <s3://bucket_name/file_name>
pub(super) async fn read_schema_from_s3(url: &Url, config: &AwsAuthProps) -> Result<String> {
let bucket = url
.domain()
.ok_or_else(|| RwError::from(InternalError(format!("Illegal s3 path {}", url))))?;
if config.region.is_none() {
return Err(RwError::from(InvalidConfigValue {
config_entry: AVRO_SCHEMA_LOCATION_S3_REGION.to_string(),
config_value: "NONE".to_string(),
}));
/// Load raw bytes from:
/// * local file, for on-premise or testing.
/// * http/https, for common usage.
/// * s3 file location format: <s3://bucket_name/file_name>
pub(super) async fn bytes_from_url(url: &Url, config: Option<&AwsAuthProps>) -> Result<Vec<u8>> {
match (url.scheme(), config) {
// TODO(Tao): support local file only when it's compiled in debug mode.
("file", _) => {
let path = url
.to_file_path()
.map_err(|()| InvalidParameterValue(format!("illegal path: {url}")))?;
Ok(std::fs::read(path)?)
}
("https" | "http", _) => Ok(download_from_http(url).await?.into()),
("s3", Some(config)) => load_file_descriptor_from_s3(url, config).await,
(scheme, _) => Err(RwError::from(InvalidParameterValue(format!(
"path scheme {scheme} is not supported",
)))),
}
let key = url.path().replace('/', "");
let sdk_config = config.build_config().await?;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));
let response = s3_client
.get_object()
.bucket(bucket.to_string())
.key(key)
.send()
.await
.map_err(|e| RwError::from(InternalError(e.to_string())))?;
let body_bytes = response.body.collect().await.map_err(|e| {
RwError::from(InternalError(format!(
"Read Avro schema file from s3 {}",
e
)))
})?;
let schema_bytes = body_bytes.into_bytes().to_vec();
String::from_utf8(schema_bytes)
.map_err(|e| RwError::from(InternalError(format!("Avro schema not valid utf8 {}", e))))
}

0 comments on commit 1b691dd

Please sign in to comment.