Skip to content

Commit

Permalink
InvalidOptionError to cover SrUrlError + ClientInitError + SubjectError
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Jan 17, 2024
1 parent fd2c8df commit 2ed20c9
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 70 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ strum = "0.25"
strum_macros = "0.25"
tempfile = "3"
thiserror = "1"
thiserror-ext = { workspace = true }
time = "0.3.30"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
Expand Down
23 changes: 9 additions & 14 deletions src/connector/src/schema/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use super::schema_registry::{
SchemaRegistryAuth,
};
use super::{
SchemaFetchError, KEY_MESSAGE_NAME_KEY, MESSAGE_NAME_KEY, NAME_STRATEGY_KEY,
SCHEMA_REGISTRY_KEY,
invalid_option_error, InvalidOptionError, SchemaFetchError, KEY_MESSAGE_NAME_KEY,
MESSAGE_NAME_KEY, NAME_STRATEGY_KEY, SCHEMA_REGISTRY_KEY,
};

pub struct SchemaWithId {
Expand Down Expand Up @@ -52,15 +52,14 @@ pub async fn fetch_schema(
) -> Result<(SchemaWithId, SchemaWithId), SchemaFetchError> {
let schema_location = format_options
.get(SCHEMA_REGISTRY_KEY)
.ok_or_else(|| SchemaFetchError::InvalidOption(format!("{SCHEMA_REGISTRY_KEY} required")))?
.ok_or_else(|| invalid_option_error!("{SCHEMA_REGISTRY_KEY} required"))?
.clone();
let client_config = format_options.into();
let name_strategy = format_options
.get(NAME_STRATEGY_KEY)
.map(|s| {
name_strategy_from_str(s).ok_or_else(|| {
SchemaFetchError::InvalidOption(format!("unrecognized strategy {s}"))
})
name_strategy_from_str(s)
.ok_or_else(|| invalid_option_error!("unrecognized strategy {s}"))
})
.transpose()?
.unwrap_or_default();
Expand Down Expand Up @@ -92,20 +91,16 @@ async fn fetch_schema_inner(
key_record_name: Option<&str>,
val_record_name: Option<&str>,
) -> Result<(ConfluentSchema, ConfluentSchema), SchemaFetchError> {
let urls = handle_sr_list(schema_location)
.map_err(|e| SchemaFetchError::InvalidOptionInner(e.into()))?;
let client = Client::new(urls, client_config)
.map_err(|e| SchemaFetchError::InvalidOptionInner(e.into()))?;
let urls = handle_sr_list(schema_location)?;
let client = Client::new(urls, client_config)?;

let key_subject = get_subject_by_strategy(name_strategy, topic, key_record_name, true)
.map_err(|e| SchemaFetchError::InvalidOptionInner(e.into()))?;
let key_subject = get_subject_by_strategy(name_strategy, topic, key_record_name, true)?;
let key_schema = client
.get_schema_by_subject(&key_subject)
.await
.map_err(SchemaFetchError::Request)?;

let val_subject = get_subject_by_strategy(name_strategy, topic, val_record_name, false)
.map_err(|e| SchemaFetchError::InvalidOptionInner(e.into()))?;
let val_subject = get_subject_by_strategy(name_strategy, topic, val_record_name, false)?;
let val_schema = client
.get_schema_by_subject(&val_subject)
.await
Expand Down
17 changes: 14 additions & 3 deletions src/connector/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,23 @@ const SCHEMA_LOCATION_KEY: &str = "schema.location";
const SCHEMA_REGISTRY_KEY: &str = "schema.registry";
const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";

#[derive(Debug, thiserror::Error, thiserror_ext::Macro)]
#[error("Invalid option: {message}")]
pub struct InvalidOptionError {
message: String,
// source: Option<risingwave_common::error::BoxedError>,
}

impl From<InvalidOptionError> for risingwave_common::error::RwError {
fn from(value: InvalidOptionError) -> Self {
anyhow::anyhow!(value).into()
}
}

#[derive(Debug, thiserror::Error)]
pub enum SchemaFetchError {
#[error("{0}")]
InvalidOption(String),
#[error(transparent)]
InvalidOptionInner(risingwave_common::error::BoxedError),
InvalidOption(#[from] InvalidOptionError),
#[error("schema registry client error")]
Request(#[source] schema_registry::ConcurrentRequestError),
#[error("schema compilation error")]
Expand Down
13 changes: 7 additions & 6 deletions src/connector/src/schema/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::collections::BTreeMap;

use prost_reflect::MessageDescriptor;

use super::{SchemaFetchError, MESSAGE_NAME_KEY, SCHEMA_LOCATION_KEY};
use super::{
invalid_option_error, InvalidOptionError, SchemaFetchError, MESSAGE_NAME_KEY,
SCHEMA_LOCATION_KEY,
};
use crate::common::AwsAuthProps;
use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties};

Expand All @@ -27,17 +30,15 @@ pub async fn fetch_descriptor(
) -> Result<MessageDescriptor, SchemaFetchError> {
let row_schema_location = format_options
.get(SCHEMA_LOCATION_KEY)
.ok_or_else(|| SchemaFetchError::InvalidOption(format!("{SCHEMA_LOCATION_KEY} required")))?
.ok_or_else(|| invalid_option_error!("{SCHEMA_LOCATION_KEY} required"))?
.clone();
let message_name = format_options
.get(MESSAGE_NAME_KEY)
.ok_or_else(|| SchemaFetchError::InvalidOption(format!("{MESSAGE_NAME_KEY} required")))?
.ok_or_else(|| invalid_option_error!("{MESSAGE_NAME_KEY} required"))?
.clone();

if row_schema_location.starts_with("s3") && aws_auth_props.is_none() {
return Err(SchemaFetchError::InvalidOption(
"s3 URL not supported yet".into(),
));
return Err(invalid_option_error!("s3 URL not supported yet").into());
}

let enc = EncodingProperties::Protobuf(ProtobufProperties {
Expand Down
22 changes: 5 additions & 17 deletions src/connector/src/schema/schema_registry/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ use futures::future::select_all;
use itertools::Itertools;
use reqwest::{Method, Url};
use serde::de::DeserializeOwned;
use thiserror_ext::AsReport as _;

use super::util::*;
use crate::schema::{invalid_option_error, InvalidOptionError};

pub const SCHEMA_REGISTRY_USERNAME: &str = "schema.registry.username";
pub const SCHEMA_REGISTRY_PASSWORD: &str = "schema.registry.password";
Expand Down Expand Up @@ -59,20 +61,6 @@ pub struct Client {
password: Option<String>,
}

#[derive(Debug, thiserror::Error)]
pub enum ClientInitError {
#[error("no valid url provided, got {0:?}")]
InvalidUrls(Vec<Url>),
#[error("build reqwest client failed")]
Builder(#[source] reqwest::Error),
}

impl From<ClientInitError> for risingwave_common::error::RwError {
fn from(value: ClientInitError) -> Self {
anyhow::anyhow!(value).into()
}
}

#[derive(Debug, thiserror::Error)]
#[error("{context}, errs: {errs:?}")]
pub struct ConcurrentRequestError {
Expand All @@ -92,15 +80,15 @@ impl Client {
pub(crate) fn new(
url: Vec<Url>,
client_config: &SchemaRegistryAuth,
) -> Result<Self, ClientInitError> {
) -> Result<Self, InvalidOptionError> {
let valid_urls = url
.iter()
.map(|url| (url.cannot_be_a_base(), url))
.filter(|(x, _)| !*x)
.map(|(_, url)| url.clone())
.collect_vec();
if valid_urls.is_empty() {
return Err(ClientInitError::InvalidUrls(url));
return Err(invalid_option_error!("no valid url provided, got {url:?}"));
} else {
tracing::debug!(
"schema registry client will use url {:?} to connect",
Expand All @@ -110,7 +98,7 @@ impl Client {

let inner = reqwest::Client::builder()
.build()
.map_err(ClientInitError::Builder)?;
.map_err(|e| invalid_option_error!("build reqwest client failed {}", e.as_report()))?;

Ok(Client {
inner,
Expand Down
26 changes: 9 additions & 17 deletions src/connector/src/schema/schema_registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub use client::*;
use risingwave_pb::catalog::SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy;
pub(crate) use util::*;

use super::{invalid_option_error, InvalidOptionError};

pub fn name_strategy_from_str(value: &str) -> Option<PbSchemaRegistryNameStrategy> {
match value {
"topic_name_strategy" => Some(PbSchemaRegistryNameStrategy::Unspecified),
Expand All @@ -27,29 +29,19 @@ pub fn name_strategy_from_str(value: &str) -> Option<PbSchemaRegistryNameStrateg
}
}

#[derive(Debug, thiserror::Error)]
#[error("{name_strategy} expect non-empty field {record_option}")]
pub struct SubjectError {
name_strategy: &'static str,
record_option: &'static str,
}

impl From<SubjectError> for risingwave_common::error::RwError {
fn from(value: SubjectError) -> Self {
anyhow::anyhow!(value).into()
}
}

pub fn get_subject_by_strategy(
name_strategy: &PbSchemaRegistryNameStrategy,
topic: &str,
record: Option<&str>,
is_key: bool,
) -> Result<String, SubjectError> {
) -> Result<String, InvalidOptionError> {
let record_option_name = if is_key { "key.message" } else { "message" };
let build_error_lack_field = || SubjectError {
name_strategy: name_strategy.as_str_name(),
record_option: record_option_name,
let build_error_lack_field = || {
invalid_option_error!(
"{} expect non-empty field {}",
name_strategy.as_str_name(),
record_option_name,
)
};
match name_strategy {
PbSchemaRegistryNameStrategy::Unspecified => {
Expand Down
16 changes: 3 additions & 13 deletions src/connector/src/schema/schema_registry/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,9 @@ use serde::de::DeserializeOwned;
use serde_derive::Deserialize;
use url::{ParseError, Url};

#[derive(Debug, thiserror::Error)]
#[error("no valid url provided, got {errs:?}")]
pub struct SrUrlError {
errs: Vec<ParseError>,
}

impl From<SrUrlError> for risingwave_common::error::RwError {
fn from(value: SrUrlError) -> Self {
anyhow::anyhow!(value).into()
}
}
use crate::schema::{bail_invalid_option_error, InvalidOptionError};

pub fn handle_sr_list(addr: &str) -> Result<Vec<Url>, SrUrlError> {
pub fn handle_sr_list(addr: &str) -> Result<Vec<Url>, InvalidOptionError> {
let segment = addr.split(',').collect::<Vec<&str>>();
let mut errs: Vec<ParseError> = Vec::with_capacity(segment.len());
let mut urls = Vec::with_capacity(segment.len());
Expand All @@ -43,7 +33,7 @@ pub fn handle_sr_list(addr: &str) -> Result<Vec<Url>, SrUrlError> {
}
}
if urls.is_empty() {
return Err(SrUrlError { errs });
bail_invalid_option_error!("no valid url provided, got {errs:?}");
}
tracing::debug!(
"schema registry client will use url {:?} to connect, the rest failed because: {:?}",
Expand Down

0 comments on commit 2ed20c9

Please sign in to comment.