From 096570d106ee275244e2de3ff7df4a8705db6832 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 1 Feb 2024 16:01:37 +0800 Subject: [PATCH] refactor(connector): avoid anyhow in `AccessError` and avoid using `RwError` if possible (#14874) Signed-off-by: Bugen Zhao --- src/common/src/error.rs | 24 +++--- .../src/parser/debezium/debezium_parser.rs | 2 +- .../src/parser/debezium/mongo_json_parser.rs | 6 +- src/connector/src/parser/protobuf/parser.rs | 82 ++++++++++--------- src/connector/src/parser/unified/avro.rs | 42 +++++----- src/connector/src/parser/unified/debezium.rs | 44 ++++++---- src/connector/src/parser/unified/mod.rs | 14 ++-- src/connector/src/parser/unified/protobuf.rs | 18 ++-- 8 files changed, 123 insertions(+), 109 deletions(-) diff --git a/src/common/src/error.rs b/src/common/src/error.rs index 791dd786633db..1f3a782914299 100644 --- a/src/common/src/error.rs +++ b/src/common/src/error.rs @@ -109,7 +109,7 @@ pub enum ErrorCode { InternalError(String), // TODO: unify with the above #[error(transparent)] - InternalErrorAnyhow( + Uncategorized( #[from] #[backtrace] anyhow::Error, @@ -236,6 +236,12 @@ pub enum ErrorCode { ), } +impl RwError { + pub fn uncategorized(err: impl Into) -> Self { + Self::from(ErrorCode::Uncategorized(err.into())) + } +} + impl From for tonic::Status { fn from(err: RwError) -> Self { use tonic::Code; @@ -278,13 +284,13 @@ impl From for RwError { impl From for RwError { fn from(join_error: JoinError) -> Self { - anyhow::anyhow!(join_error).into() + Self::uncategorized(join_error) } } impl From for RwError { fn from(addr_parse_error: std::net::AddrParseError) -> Self { - anyhow::anyhow!(addr_parse_error).into() + Self::uncategorized(addr_parse_error) } } @@ -456,7 +462,7 @@ mod tests { use anyhow::anyhow; use super::*; - use crate::error::ErrorCode::InternalErrorAnyhow; + use crate::error::ErrorCode::Uncategorized; #[test] fn test_display_internal_error() { @@ -477,7 +483,7 @@ mod tests { .unwrap_err(); assert_eq!( - RwError::from(InternalErrorAnyhow(anyhow!(err_msg))).to_string(), + RwError::from(Uncategorized(anyhow!(err_msg))).to_string(), error.to_string(), ); } @@ -490,7 +496,7 @@ mod tests { })() .unwrap_err(); assert_eq!( - RwError::from(InternalErrorAnyhow(anyhow!(err_msg))).to_string(), + RwError::from(Uncategorized(anyhow!(err_msg))).to_string(), error.to_string() ); } @@ -502,11 +508,7 @@ mod tests { })() .unwrap_err(); assert_eq!( - RwError::from(InternalErrorAnyhow(anyhow!( - "error msg with args: {}", - "xx" - ))) - .to_string(), + RwError::from(Uncategorized(anyhow!("error msg with args: {}", "xx"))).to_string(), error.to_string() ); } diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 0f79677860f8d..8ff32804a96d4 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -109,7 +109,7 @@ impl DebeziumParser { Err(err) => { // Only try to access transaction control message if the row operation access failed // to make it a fast path. - if let Ok(transaction_control) = + if let Some(transaction_control) = row_op.transaction_control(&self.source_ctx.connector_props) { Ok(ParseResult::TransactionControl(transaction_control)) diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index 899068a4cccb5..21129a816e0a3 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -50,14 +50,14 @@ impl DebeziumMongoJsonParser { ) }) .ok_or_else(|| RwError::from(ProtocolError( - "Debezuim Mongo needs a `_id` column with supported types (Varchar Jsonb int32 int64) in table".into(), + "Debezium Mongo needs a `_id` column with supported types (Varchar Jsonb int32 int64) in table".into(), )))?.clone(); let payload_column = rw_columns .iter() .find(|desc| desc.name == "payload" && matches!(desc.data_type, DataType::Jsonb)) .ok_or_else(|| { RwError::from(ProtocolError( - "Debezuim Mongo needs a `payload` column with supported types Jsonb in table" + "Debezium Mongo needs a `payload` column with supported types Jsonb in table" .into(), )) })? @@ -66,7 +66,7 @@ impl DebeziumMongoJsonParser { // _rw_{connector}_file/partition & _rw_{connector}_offset are created automatically. if rw_columns.iter().filter(|desc| desc.is_visible()).count() != 2 { return Err(RwError::from(ProtocolError( - "Debezuim Mongo needs no more columns except `_id` and `payload` in table".into(), + "Debezium Mongo needs no more columns except `_id` and `payload` in table".into(), ))); } diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 36b12dded6029..13da5c5b86b2d 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -20,15 +20,19 @@ use prost_reflect::{ ReflectMessage, Value, }; use risingwave_common::array::{ListValue, StructValue}; -use risingwave_common::error::ErrorCode::{InternalError, ProtocolError}; +use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; use risingwave_common::try_match_expand; use risingwave_common::types::{DataType, Datum, Decimal, JsonbVal, ScalarImpl, F32, F64}; use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion}; +use thiserror::Error; +use thiserror_ext::{AsReport, Macro}; use super::schema_resolver::*; use crate::parser::unified::protobuf::ProtobufAccess; -use crate::parser::unified::AccessImpl; +use crate::parser::unified::{ + bail_uncategorized, uncategorized, AccessError, AccessImpl, AccessResult, +}; use crate::parser::util::bytes_from_url; use crate::parser::{AccessBuilder, EncodingProperties}; use crate::schema::schema_registry::{ @@ -157,7 +161,8 @@ impl ProtobufParserConfig { index: &mut i32, parse_trace: &mut Vec, ) -> Result { - let field_type = protobuf_type_mapping(field_descriptor, parse_trace)?; + let field_type = + protobuf_type_mapping(field_descriptor, parse_trace).map_err(RwError::uncategorized)?; if let Kind::Message(m) = field_descriptor.kind() { let field_descs = if let DataType::List { .. } = field_type { vec![] @@ -192,15 +197,22 @@ impl ProtobufParserConfig { } } -fn detect_loop_and_push(trace: &mut Vec, fd: &FieldDescriptor) -> Result<()> { +#[derive(Error, Debug, Macro)] +#[error("{0}")] +struct ProtobufTypeError(#[message] String); + +fn detect_loop_and_push( + trace: &mut Vec, + fd: &FieldDescriptor, +) -> std::result::Result<(), ProtobufTypeError> { let identifier = format!("{}({})", fd.name(), fd.full_name()); if trace.iter().any(|s| s == identifier.as_str()) { - return Err(RwError::from(ProtocolError(format!( + bail_protobuf_type_error!( "circular reference detected: {}, conflict with {}, kind {:?}", - trace.iter().join("->"), + trace.iter().format("->"), identifier, fd.kind(), - )))); + ); } trace.push(identifier); Ok(()) @@ -341,7 +353,9 @@ pub fn from_protobuf_value( field_desc: &FieldDescriptor, value: &Value, descriptor_pool: &Arc, -) -> Result { +) -> AccessResult { + let kind = field_desc.kind(); + let v = match value { Value::Bool(v) => ScalarImpl::Bool(*v), Value::I32(i) => ScalarImpl::Int32(*i), @@ -352,17 +366,13 @@ pub fn from_protobuf_value( Value::F64(f) => ScalarImpl::Float64(F64::from(*f)), Value::String(s) => ScalarImpl::Utf8(s.as_str().into()), Value::EnumNumber(idx) => { - let kind = field_desc.kind(); - let enum_desc = kind.as_enum().ok_or_else(|| { - let err_msg = format!("protobuf parse error.not a enum desc {:?}", field_desc); - RwError::from(ProtocolError(err_msg)) + let enum_desc = kind.as_enum().ok_or_else(|| AccessError::TypeError { + expected: "enum".to_owned(), + got: format!("{kind:?}"), + value: value.to_string(), })?; let enum_symbol = enum_desc.get_value(*idx).ok_or_else(|| { - let err_msg = format!( - "protobuf parse error.unknown enum index {} of enum {:?}", - idx, enum_desc - ); - RwError::from(ProtocolError(err_msg)) + uncategorized!("unknown enum index {} of enum {:?}", idx, enum_desc) })?; ScalarImpl::Utf8(enum_symbol.name().into()) } @@ -389,18 +399,14 @@ pub fn from_protobuf_value( let Some(ScalarImpl::Bytea(payload)) = from_protobuf_value(&payload_field_desc, &payload, descriptor_pool)? else { - let err_msg = "Expected ScalarImpl::Bytea for payload".to_string(); - return Err(RwError::from(ProtocolError(err_msg))); + bail_uncategorized!("expected bytes for dynamic message payload"); }; // Get the corresponding schema from the descriptor pool let msg_desc = descriptor_pool .get_message_by_name(&type_url) .ok_or_else(|| { - ProtocolError(format!( - "Cannot find message {} in from_protobuf_value.\nDescriptor pool is {:#?}", - type_url, descriptor_pool - )) + uncategorized!("message `{type_url}` not found in descriptor pool") })?; let f = msg_desc @@ -439,11 +445,10 @@ pub fn from_protobuf_value( if !dyn_msg.has_field(&field_desc) && field_desc.cardinality() == Cardinality::Required { - let err_msg = format!( - "protobuf parse error.missing required field {:?}", - field_desc - ); - return Err(RwError::from(ProtocolError(err_msg))); + return Err(AccessError::Undefined { + name: field_desc.name().to_owned(), + path: dyn_msg.descriptor().full_name().to_owned(), + }); } // use default value if dyn_msg doesn't has this field let value = dyn_msg.get_field(&field_desc); @@ -453,7 +458,8 @@ pub fn from_protobuf_value( } } Value::List(values) => { - let data_type = protobuf_type_mapping(field_desc, &mut vec![])?; + let data_type = protobuf_type_mapping(field_desc, &mut vec![]) + .map_err(|e| uncategorized!("{}", e.to_report_string()))?; let mut builder = data_type.as_list().create_array_builder(values.len()); for value in values { builder.append(from_protobuf_value(field_desc, value, descriptor_pool)?); @@ -462,11 +468,9 @@ pub fn from_protobuf_value( } Value::Bytes(value) => ScalarImpl::Bytea(value.to_vec().into_boxed_slice()), _ => { - let err_msg = format!( - "protobuf parse error.unsupported type {:?}, value {:?}", - field_desc, value - ); - return Err(RwError::from(InternalError(err_msg))); + return Err(AccessError::UnsupportedType { + ty: format!("{kind:?}"), + }); } }; Ok(Some(v)) @@ -476,7 +480,7 @@ pub fn from_protobuf_value( fn protobuf_type_mapping( field_descriptor: &FieldDescriptor, parse_trace: &mut Vec, -) -> Result { +) -> std::result::Result { detect_loop_and_push(parse_trace, field_descriptor)?; let field_type = field_descriptor.kind(); let mut t = match field_type { @@ -494,7 +498,7 @@ fn protobuf_type_mapping( let fields = m .fields() .map(|f| protobuf_type_mapping(&f, parse_trace)) - .collect::>>()?; + .try_collect()?; let field_names = m.fields().map(|f| f.name().to_string()).collect_vec(); // Note that this part is useful for actual parsing @@ -513,10 +517,10 @@ fn protobuf_type_mapping( Kind::Bytes => DataType::Bytea, }; if field_descriptor.is_map() { - return Err(RwError::from(ProtocolError(format!( - "map type is unsupported (field: '{}')", + bail_protobuf_type_error!( + "protobuf map type (on field `{}`) is not supported", field_descriptor.full_name() - )))); + ); } if field_descriptor.cardinality() == Cardinality::Repeated { t = DataType::List(Box::new(t)) diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs index af5658331270d..1cebb0fd60dac 100644 --- a/src/connector/src/parser/unified/avro.rs +++ b/src/connector/src/parser/unified/avro.rs @@ -15,7 +15,6 @@ use std::str::FromStr; use std::sync::LazyLock; -use anyhow::anyhow; use apache_avro::schema::{DecimalSchema, RecordSchema}; use apache_avro::types::Value; use apache_avro::{Decimal as AvroDecimal, Schema}; @@ -30,7 +29,7 @@ use risingwave_common::types::{ }; use risingwave_common::util::iter_util::ZipEqFast; -use super::{Access, AccessError, AccessResult}; +use super::{bail_uncategorized, uncategorized, Access, AccessError, AccessResult}; #[derive(Clone)] /// Options for parsing an `AvroValue` into Datum, with an optional avro schema. pub struct AvroParseOptions<'a> { @@ -136,26 +135,25 @@ impl<'a> AvroParseOptions<'a> { .iter() .find(|field| field.0 == field_name) .map(|field| &field.1) + .ok_or_else(|| { + uncategorized!("`{field_name}` field not found in VariableScaleDecimal") + }) }; - let scale = match find_in_records("scale").ok_or_else(|| { - AccessError::Other(anyhow!("scale field not found in VariableScaleDecimal")) - })? { - Value::Int(scale) => Ok(*scale), - avro_value => Err(AccessError::Other(anyhow!( + let scale = match find_in_records("scale")? { + Value::Int(scale) => *scale, + avro_value => bail_uncategorized!( "scale field in VariableScaleDecimal is not int, got {:?}", avro_value - ))), - }?; - - let value: BigInt = match find_in_records("value").ok_or_else(|| { - AccessError::Other(anyhow!("value field not found in VariableScaleDecimal")) - })? { - Value::Bytes(bytes) => Ok(BigInt::from_signed_bytes_be(bytes)), - avro_value => Err(AccessError::Other(anyhow!( + ), + }; + + let value: BigInt = match find_in_records("value")? { + Value::Bytes(bytes) => BigInt::from_signed_bytes_be(bytes), + avro_value => bail_uncategorized!( "value field in VariableScaleDecimal is not bytes, got {:?}", avro_value - ))), - }?; + ), + }; let negative = value.sign() == Sign::Minus; let (lo, mid, hi) = extract_decimal(value.to_bytes_be().1)?; @@ -196,9 +194,9 @@ impl<'a> AvroParseOptions<'a> { // ---- TimestampTz ----- (Some(DataType::Timestamptz) | None, Value::TimestampMillis(ms)) => { Timestamptz::from_millis(*ms) - .ok_or(AccessError::Other(anyhow!( - "timestamptz with milliseconds {ms} * 1000 is out of range", - )))? + .ok_or_else(|| { + uncategorized!("timestamptz with milliseconds {ms} * 1000 is out of range") + })? .into() } (Some(DataType::Timestamptz) | None, Value::TimestampMicros(us)) => { @@ -350,7 +348,7 @@ pub(crate) fn avro_decimal_to_rust_decimal( )) } -pub(crate) fn extract_decimal(bytes: Vec) -> anyhow::Result<(u32, u32, u32)> { +pub(crate) fn extract_decimal(bytes: Vec) -> AccessResult<(u32, u32, u32)> { match bytes.len() { len @ 0..=4 => { let mut pad = vec![0; 4 - len]; @@ -383,7 +381,7 @@ pub(crate) fn extract_decimal(bytes: Vec) -> anyhow::Result<(u32, u32, u32)> let lo = u32::from_be_bytes(bytes[mid_end..].to_owned().try_into().unwrap()); Ok((lo, mid, hi)) } - _ => Err(anyhow!("decimal bytes len: {:?} > 12", bytes.len())), + _ => bail_uncategorized!("invalid decimal bytes length {}", bytes.len()), } } diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 7291b1b359735..86d7e3d4e5e61 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; use risingwave_common::types::{DataType, Datum, ScalarImpl}; -use super::{Access, AccessError, ChangeEvent, ChangeEventOperation}; +use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation}; +use crate::parser::unified::uncategorized; use crate::parser::TransactionControl; use crate::source::{ConnectorProperties, SourceColumnDesc}; @@ -41,7 +41,7 @@ pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END"; pub fn parse_transaction_meta( accessor: &impl Access, connector_props: &ConnectorProperties, -) -> std::result::Result { +) -> AccessResult { if let (Some(ScalarImpl::Utf8(status)), Some(ScalarImpl::Utf8(id))) = ( accessor.access(&[TRANSACTION_STATUS], Some(&DataType::Varchar))?, accessor.access(&[TRANSACTION_ID], Some(&DataType::Varchar))?, @@ -103,13 +103,12 @@ where pub(crate) fn transaction_control( &self, connector_props: &ConnectorProperties, - ) -> Result { - let Some(accessor) = &self.value_accessor else { - return Err(AccessError::Other(anyhow!( - "value_accessor must be provided to parse transaction metadata" - ))); - }; - parse_transaction_meta(accessor, connector_props) + ) -> Option { + // Ignore if `value_accessor` is not provided or there's any error when + // trying to parse the transaction metadata. + self.value_accessor + .as_ref() + .and_then(|accessor| parse_transaction_meta(accessor, connector_props).ok()) } } @@ -164,10 +163,25 @@ pub struct MongoProjection { accessor: A, } -pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> anyhow::Result { +pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> AccessResult { let id_field = bson_doc .get("_id") - .ok_or_else(|| anyhow::format_err!("Debezuim Mongo requires document has a `_id` field"))?; + .ok_or_else(|| uncategorized!("Debezium Mongo requires document has a `_id` field"))?; + + let type_error = || AccessError::TypeError { + expected: id_type.to_string(), + got: match id_field { + serde_json::Value::Null => "null", + serde_json::Value::Bool(_) => "bool", + serde_json::Value::Number(_) => "number", + serde_json::Value::String(_) => "string", + serde_json::Value::Array(_) => "array", + serde_json::Value::Object(_) => "object", + } + .to_owned(), + value: id_field.to_string(), + }; + let id: Datum = match id_type { DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(), DataType::Varchar => match id_field { @@ -175,7 +189,7 @@ pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> anyh serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8( obj["$oid"].as_str().to_owned().unwrap_or_default().into(), )), - _ => anyhow::bail!("Can not convert bson {:?} to {:?}", id_field, id_type), + _ => return Err(type_error()), }, DataType::Int32 => { if let serde_json::Value::Object(ref obj) = id_field @@ -184,7 +198,7 @@ pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> anyh let int_str = obj["$numberInt"].as_str().unwrap_or_default(); Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default())) } else { - anyhow::bail!("Can not convert bson {:?} to {:?}", id_field, id_type) + return Err(type_error()); } } DataType::Int64 => { @@ -194,7 +208,7 @@ pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> anyh let int_str = obj["$numberLong"].as_str().unwrap_or_default(); Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default())) } else { - anyhow::bail!("Can not convert bson {:?} to {:?}", id_field, id_type) + return Err(type_error()); } } _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."), diff --git a/src/connector/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs index bd52f4628ef2f..33c46f359a6ca 100644 --- a/src/connector/src/parser/unified/mod.rs +++ b/src/connector/src/parser/unified/mod.rs @@ -17,6 +17,7 @@ use auto_impl::auto_impl; use risingwave_common::types::{DataType, Datum}; use thiserror::Error; +use thiserror_ext::Macro; use self::avro::AvroAccess; use self::bytes::BytesAccess; @@ -86,7 +87,8 @@ where } } -#[derive(Error, Debug)] +#[derive(Error, Debug, Macro)] +#[thiserror_ext(macro(mangle))] pub enum AccessError { #[error("Undefined field `{name}` at `{path}`")] Undefined { name: String, path: String }, @@ -98,10 +100,8 @@ pub enum AccessError { }, #[error("Unsupported data type `{ty}`")] UnsupportedType { ty: String }, - #[error(transparent)] - Other( - #[from] - #[backtrace] - anyhow::Error, - ), + + /// Errors that are not categorized into variants above. + #[error("{message}")] + Uncategorized { message: String }, } diff --git a/src/connector/src/parser/unified/protobuf.rs b/src/connector/src/parser/unified/protobuf.rs index 3fdef073faa23..cd9178c7dd08d 100644 --- a/src/connector/src/parser/unified/protobuf.rs +++ b/src/connector/src/parser/unified/protobuf.rs @@ -14,16 +14,14 @@ use std::sync::{Arc, LazyLock}; -use anyhow::anyhow; use prost_reflect::{DescriptorPool, DynamicMessage, ReflectMessage}; -use risingwave_common::error::ErrorCode::ProtocolError; -use risingwave_common::error::RwError; use risingwave_common::log::LogSuppresser; use risingwave_common::types::DataType; +use thiserror_ext::AsReport; use super::{Access, AccessResult}; use crate::parser::from_protobuf_value; -use crate::parser::unified::AccessError; +use crate::parser::unified::{uncategorized, AccessError}; pub struct ProtobufAccess { message: DynamicMessage, @@ -46,18 +44,16 @@ impl Access for ProtobufAccess { .message .descriptor() .get_field_by_name(path[0]) - .ok_or_else(|| { - let err_msg = format!("protobuf schema don't have field {}", path[0]); + .ok_or_else(|| uncategorized!("protobuf schema don't have field {}", path[0])) + .inspect_err(|e| { static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!(suppressed_count, err_msg); + tracing::error!(suppressed_count, "{}", e.as_report()); } - RwError::from(ProtocolError(err_msg)) - }) - .map_err(|e| AccessError::Other(anyhow!(e)))?; + })?; let value = self.message.get_field(&field_desc); + from_protobuf_value(&field_desc, &value, &self.descriptor_pool) - .map_err(|e| AccessError::Other(anyhow!(e))) } }