Skip to content

Commit

Permalink
refactor: avoid RwError in schema registry client (#14603)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Jan 24, 2024
1 parent 9382565 commit c2c2032
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 114 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ select
t Rising \x6130 3.5 4.25 22 24 23 NULL {{NULL,3},NULL,{7,NULL,2}} 2006-01-02 22:04:05+00:00 NULL NULL 12:34:56.123456 NULL
f Wave \x5a4446 1.5 NULL 11 13 12 (,foo) NULL NULL 2006-01-02 22:04:05+00:00 2021-04-01 NULL 23:45:16.654

statement error SchemaFetchError
statement error test-rw-sink-upsert-avro-err-key
create sink sink_err from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro-err',
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ strum = "0.25"
strum_macros = "0.25"
tempfile = "3"
thiserror = "1"
thiserror-ext = { workspace = true }
time = "0.3.30"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/avro/schema_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl ConfluentSchemaResolver {
self.confluent_client
.get_schema_by_subject(subject_name)
.await
.map_err(Into::into)
}

// get the writer schema by id
Expand Down
17 changes: 8 additions & 9 deletions src/connector/src/schema/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use super::schema_registry::{
SchemaRegistryAuth,
};
use super::{
SchemaFetchError, KEY_MESSAGE_NAME_KEY, MESSAGE_NAME_KEY, NAME_STRATEGY_KEY,
SCHEMA_REGISTRY_KEY,
invalid_option_error, InvalidOptionError, SchemaFetchError, KEY_MESSAGE_NAME_KEY,
MESSAGE_NAME_KEY, NAME_STRATEGY_KEY, SCHEMA_REGISTRY_KEY,
};

pub struct SchemaWithId {
Expand All @@ -36,8 +36,8 @@ impl TryFrom<ConfluentSchema> for SchemaWithId {
type Error = SchemaFetchError;

fn try_from(fetched: ConfluentSchema) -> Result<Self, Self::Error> {
let parsed =
AvroSchema::parse_str(&fetched.content).map_err(|e| SchemaFetchError(e.to_string()))?;
let parsed = AvroSchema::parse_str(&fetched.content)
.map_err(|e| SchemaFetchError::SchemaCompile(e.into()))?;
Ok(Self {
schema: Arc::new(parsed),
id: fetched.id,
Expand All @@ -52,14 +52,14 @@ pub async fn fetch_schema(
) -> Result<(SchemaWithId, SchemaWithId), SchemaFetchError> {
let schema_location = format_options
.get(SCHEMA_REGISTRY_KEY)
.ok_or_else(|| SchemaFetchError(format!("{SCHEMA_REGISTRY_KEY} required")))?
.ok_or_else(|| invalid_option_error!("{SCHEMA_REGISTRY_KEY} required"))?
.clone();
let client_config = format_options.into();
let name_strategy = format_options
.get(NAME_STRATEGY_KEY)
.map(|s| {
name_strategy_from_str(s)
.ok_or_else(|| SchemaFetchError(format!("unrecognized strategy {s}")))
.ok_or_else(|| invalid_option_error!("unrecognized strategy {s}"))
})
.transpose()?
.unwrap_or_default();
Expand All @@ -78,8 +78,7 @@ pub async fn fetch_schema(
key_record_name,
val_record_name,
)
.await
.map_err(|e| SchemaFetchError(e.to_string()))?;
.await?;

Ok((key_schema.try_into()?, val_schema.try_into()?))
}
Expand All @@ -91,7 +90,7 @@ async fn fetch_schema_inner(
topic: &str,
key_record_name: Option<&str>,
val_record_name: Option<&str>,
) -> Result<(ConfluentSchema, ConfluentSchema), risingwave_common::error::RwError> {
) -> Result<(ConfluentSchema, ConfluentSchema), SchemaFetchError> {
let urls = handle_sr_list(schema_location)?;
let client = Client::new(urls, client_config)?;

Expand Down
35 changes: 33 additions & 2 deletions src/connector/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,36 @@ const SCHEMA_LOCATION_KEY: &str = "schema.location";
const SCHEMA_REGISTRY_KEY: &str = "schema.registry";
const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";

#[derive(Debug)]
pub struct SchemaFetchError(pub String);
#[derive(Debug, thiserror::Error, thiserror_ext::Macro)]
#[error("Invalid option: {message}")]
pub struct InvalidOptionError {
message: String,
// #[backtrace]
// source: Option<risingwave_common::error::BoxedError>,
}

impl From<InvalidOptionError> for risingwave_common::error::RwError {
fn from(value: InvalidOptionError) -> Self {
anyhow::anyhow!(value).into()
}
}

#[derive(Debug, thiserror::Error)]
pub enum SchemaFetchError {
#[error(transparent)]
InvalidOption(#[from] InvalidOptionError),
#[error(transparent)]
Request(#[from] schema_registry::ConcurrentRequestError),
#[error("schema compilation error: {0}")]
SchemaCompile(
#[source]
#[backtrace]
risingwave_common::error::BoxedError,
),
#[error("{0}")] // source+{0} is effectively transparent but allows backtrace
YetToMigrate(
#[source]
#[backtrace]
risingwave_common::error::RwError,
),
}
13 changes: 8 additions & 5 deletions src/connector/src/schema/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::collections::BTreeMap;

use prost_reflect::MessageDescriptor;

use super::{SchemaFetchError, MESSAGE_NAME_KEY, SCHEMA_LOCATION_KEY};
use super::{
invalid_option_error, InvalidOptionError, SchemaFetchError, MESSAGE_NAME_KEY,
SCHEMA_LOCATION_KEY,
};
use crate::common::AwsAuthProps;
use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties};

Expand All @@ -27,15 +30,15 @@ pub async fn fetch_descriptor(
) -> Result<MessageDescriptor, SchemaFetchError> {
let row_schema_location = format_options
.get(SCHEMA_LOCATION_KEY)
.ok_or_else(|| SchemaFetchError(format!("{SCHEMA_LOCATION_KEY} required")))?
.ok_or_else(|| invalid_option_error!("{SCHEMA_LOCATION_KEY} required"))?
.clone();
let message_name = format_options
.get(MESSAGE_NAME_KEY)
.ok_or_else(|| SchemaFetchError(format!("{MESSAGE_NAME_KEY} required")))?
.ok_or_else(|| invalid_option_error!("{MESSAGE_NAME_KEY} required"))?
.clone();

if row_schema_location.starts_with("s3") && aws_auth_props.is_none() {
return Err(SchemaFetchError("s3 URL not supported yet".into()));
return Err(invalid_option_error!("s3 URL not supported yet").into());
}

let enc = EncodingProperties::Protobuf(ProtobufProperties {
Expand All @@ -52,6 +55,6 @@ pub async fn fetch_descriptor(
// This reversed dependency will be fixed when we support schema registry.
let conf = ProtobufParserConfig::new(enc)
.await
.map_err(|e| SchemaFetchError(e.to_string()))?;
.map_err(SchemaFetchError::YetToMigrate)?;
Ok(conf.message_descriptor)
}
58 changes: 35 additions & 23 deletions src/connector/src/schema/schema_registry/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use std::sync::Arc;
use futures::future::select_all;
use itertools::Itertools;
use reqwest::{Method, Url};
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};
use serde::de::DeserializeOwned;
use thiserror_ext::AsReport as _;

use super::util::*;
use crate::schema::{invalid_option_error, InvalidOptionError};

pub const SCHEMA_REGISTRY_USERNAME: &str = "schema.registry.username";
pub const SCHEMA_REGISTRY_PASSWORD: &str = "schema.registry.password";
Expand Down Expand Up @@ -61,29 +61,43 @@ pub struct Client {
password: Option<String>,
}

#[derive(Debug, thiserror::Error)]
#[error("all request confluent registry all timeout, {context}\n{}", errs.iter().map(|e| format!("\t{}", e.as_report())).join("\n"))]
pub struct ConcurrentRequestError {
errs: Vec<itertools::Either<RequestError, tokio::task::JoinError>>,
context: String,
}

impl From<ConcurrentRequestError> for risingwave_common::error::RwError {
fn from(value: ConcurrentRequestError) -> Self {
anyhow::anyhow!(value).into()
}
}

type SrResult<T> = Result<T, ConcurrentRequestError>;

impl Client {
pub(crate) fn new(url: Vec<Url>, client_config: &SchemaRegistryAuth) -> Result<Self> {
pub(crate) fn new(
url: Vec<Url>,
client_config: &SchemaRegistryAuth,
) -> Result<Self, InvalidOptionError> {
let valid_urls = url
.iter()
.map(|url| (url.cannot_be_a_base(), url))
.filter(|(x, _)| !*x)
.map(|(_, url)| url.clone())
.collect_vec();
if valid_urls.is_empty() {
return Err(RwError::from(ProtocolError(format!(
"no valid url provided, got {:?}",
url
))));
return Err(invalid_option_error!("non-base: {}", url.iter().join(" ")));
} else {
tracing::debug!(
"schema registry client will use url {:?} to connect",
valid_urls
);
}

let inner = reqwest::Client::builder().build().map_err(|e| {
RwError::from(ProtocolError(format!("build reqwest client failed {}", e)))
})?;
// `unwrap` as the builder is not affected by any input right now
let inner = reqwest::Client::builder().build().unwrap();

Ok(Client {
inner,
Expand All @@ -97,7 +111,7 @@ impl Client {
&'a self,
method: Method,
path: &'a [&'a (impl AsRef<str> + ?Sized + Debug + ToString)],
) -> Result<T>
) -> SrResult<T>
where
T: DeserializeOwned + Send + Sync + 'static,
{
Expand All @@ -124,22 +138,20 @@ impl Client {
let _ = remaining.iter().map(|ele| ele.abort());
return Ok(res);
}
Ok(Err(e)) => errs.push(e),
Err(e) => errs.push(RwError::from(e)),
Ok(Err(e)) => errs.push(itertools::Either::Left(e)),
Err(e) => errs.push(itertools::Either::Right(e)),
}
fut_req = remaining;
}

Err(RwError::from(ProtocolError(format!(
"all request confluent registry all timeout, req path {:?}, urls {:?}, err: {:?}",
path,
self.url,
errs.iter().map(|e| e.to_string()).collect_vec()
))))
Err(ConcurrentRequestError {
errs,
context: format!("req path {:?}, urls {}", path, self.url.iter().join(" ")),
})
}

/// get schema by id
pub async fn get_schema_by_id(&self, id: i32) -> Result<ConfluentSchema> {
pub async fn get_schema_by_id(&self, id: i32) -> SrResult<ConfluentSchema> {
let res: GetByIdResp = self
.concurrent_req(Method::GET, &["schemas", "ids", &id.to_string()])
.await?;
Expand All @@ -150,12 +162,12 @@ impl Client {
}

/// get the latest schema of the subject
pub async fn get_schema_by_subject(&self, subject: &str) -> Result<ConfluentSchema> {
pub async fn get_schema_by_subject(&self, subject: &str) -> SrResult<ConfluentSchema> {
self.get_subject(subject).await.map(|s| s.schema)
}

/// get the latest version of the subject
pub async fn get_subject(&self, subject: &str) -> Result<Subject> {
pub async fn get_subject(&self, subject: &str) -> SrResult<Subject> {
let res: GetBySubjectResp = self
.concurrent_req(Method::GET, &["subjects", subject, "versions", "latest"])
.await?;
Expand All @@ -174,7 +186,7 @@ impl Client {
pub async fn get_subject_and_references(
&self,
subject: &str,
) -> Result<(Subject, Vec<Subject>)> {
) -> SrResult<(Subject, Vec<Subject>)> {
let mut subjects = vec![];
let mut visited = HashSet::new();
let mut queue = vec![(subject.to_owned(), "latest".to_owned())];
Expand Down
30 changes: 14 additions & 16 deletions src/connector/src/schema/schema_registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
mod client;
mod util;
pub use client::*;
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::RwError;
use risingwave_pb::catalog::SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy;
pub(crate) use util::*;

use super::{invalid_option_error, InvalidOptionError};

pub fn name_strategy_from_str(value: &str) -> Option<PbSchemaRegistryNameStrategy> {
match value {
"topic_name_strategy" => Some(PbSchemaRegistryNameStrategy::Unspecified),
Expand All @@ -34,29 +34,27 @@ pub fn get_subject_by_strategy(
topic: &str,
record: Option<&str>,
is_key: bool,
) -> Result<String, RwError> {
let build_error_lack_field = |ns: &PbSchemaRegistryNameStrategy, expect: &str| -> RwError {
RwError::from(ProtocolError(format!(
"{} expect num-empty field {}",
ns.as_str_name(),
expect,
)))
};
) -> Result<String, InvalidOptionError> {
let record_option_name = if is_key { "key.message" } else { "message" };
let build_error_lack_field = || {
invalid_option_error!(
"{} expect non-empty field {}",
name_strategy.as_str_name(),
record_option_name,
)
};
match name_strategy {
PbSchemaRegistryNameStrategy::Unspecified => {
// default behavior
let suffix = if is_key { "key" } else { "value" };
Ok(format!("{topic}-{suffix}",))
}
ns @ PbSchemaRegistryNameStrategy::RecordNameStrategy => {
let record_name =
record.ok_or_else(|| build_error_lack_field(ns, record_option_name))?;
PbSchemaRegistryNameStrategy::RecordNameStrategy => {
let record_name = record.ok_or_else(build_error_lack_field)?;
Ok(record_name.to_string())
}
ns @ PbSchemaRegistryNameStrategy::TopicRecordNameStrategy => {
let record_name =
record.ok_or_else(|| build_error_lack_field(ns, record_option_name))?;
PbSchemaRegistryNameStrategy::TopicRecordNameStrategy => {
let record_name = record.ok_or_else(build_error_lack_field)?;
Ok(format!("{topic}-{record_name}"))
}
}
Expand Down
Loading

0 comments on commit c2c2032

Please sign in to comment.