Skip to content

Commit

Permalink
simplify ConcurrentRequestError msg composition
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Jan 17, 2024
1 parent 2ed20c9 commit 12c0d2b
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 15 deletions.
10 changes: 2 additions & 8 deletions src/connector/src/schema/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,10 @@ async fn fetch_schema_inner(
let client = Client::new(urls, client_config)?;

let key_subject = get_subject_by_strategy(name_strategy, topic, key_record_name, true)?;
let key_schema = client
.get_schema_by_subject(&key_subject)
.await
.map_err(SchemaFetchError::Request)?;
let key_schema = client.get_schema_by_subject(&key_subject).await?;

let val_subject = get_subject_by_strategy(name_strategy, topic, val_record_name, false)?;
let val_schema = client
.get_schema_by_subject(&val_subject)
.await
.map_err(SchemaFetchError::Request)?;
let val_schema = client.get_schema_by_subject(&val_subject).await?;

Ok((key_schema, val_schema))
}
4 changes: 2 additions & 2 deletions src/connector/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ impl From<InvalidOptionError> for risingwave_common::error::RwError {
pub enum SchemaFetchError {
#[error(transparent)]
InvalidOption(#[from] InvalidOptionError),
#[error("schema registry client error")]
Request(#[source] schema_registry::ConcurrentRequestError),
#[error(transparent)]
Request(#[from] schema_registry::ConcurrentRequestError),
#[error("schema compilation error")]
SchemaCompile(#[source] risingwave_common::error::BoxedError),
#[error(transparent)]
Expand Down
7 changes: 2 additions & 5 deletions src/connector/src/schema/schema_registry/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct Client {
}

#[derive(Debug, thiserror::Error)]
#[error("{context}, errs: {errs:?}")]
#[error("all request confluent registry all timeout, {context}, errs: {errs:?}")]
pub struct ConcurrentRequestError {
errs: Vec<itertools::Either<RequestError, tokio::task::JoinError>>,
context: String,
Expand Down Expand Up @@ -147,10 +147,7 @@ impl Client {

Err(ConcurrentRequestError {
errs,
context: format!(
"all request confluent registry all timeout, req path {:?}, urls {:?}",
path, self.url
),
context: format!("req path {:?}, urls {:?}", path, self.url),
})
}

Expand Down

0 comments on commit 12c0d2b

Please sign in to comment.