From 8d6b742a8cee1b0c02c6e63a487a753f6aedc030 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 19 Sep 2024 10:19:15 +0800 Subject: [PATCH] feat: improve error of building key encoder (#18563) --- risedev.yml | 3 +++ src/connector/src/sink/encoder/avro.rs | 22 +++++++++++----------- src/connector/src/sink/encoder/mod.rs | 2 +- src/connector/src/sink/encoder/proto.rs | 8 ++++---- src/connector/src/sink/formatter/mod.rs | 10 +++++++--- src/connector/src/sink/mod.rs | 2 +- 6 files changed, 27 insertions(+), 20 deletions(-) diff --git a/risedev.yml b/risedev.yml index 3c7f8e0e09be4..c0e89c5bfa27c 100644 --- a/risedev.yml +++ b/risedev.yml @@ -46,6 +46,9 @@ profile: # - use: kafka # persist-data: true + # To enable Confluent schema registry, uncomment the following line + # - use: schema-registry + default-v6: steps: - use: meta-node diff --git a/src/connector/src/sink/encoder/avro.rs b/src/connector/src/sink/encoder/avro.rs index 3c45c8f572cc5..d658f78492925 100644 --- a/src/connector/src/sink/encoder/avro.rs +++ b/src/connector/src/sink/encoder/avro.rs @@ -825,7 +825,7 @@ mod tests { } ] }"#, - "encode q error: avro name ref unsupported yet", + "encode 'q' error: avro name ref unsupported yet", ); test_err( @@ -836,7 +836,7 @@ mod tests { i64::MAX, ))), r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#, - "encode error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration", + "encode '' error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration", ); let avro_schema = AvroSchema::parse_str( @@ -911,7 +911,7 @@ mod tests { }; assert_eq!( err.to_string(), - "Encode error: encode req error: field not present but required" + "Encode error: encode 'req' error: field not present but required" ); let schema = Schema::new(vec![ @@ -924,7 +924,7 @@ mod tests { }; assert_eq!( err.to_string(), - "Encode error: encode extra error: field not in avro" + "Encode error: encode 'extra' error: field not in avro" ); let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap(); @@ -934,14 +934,14 @@ mod tests { }; assert_eq!( err.to_string(), - r#"Encode error: encode error: expect avro record but got ["null","long"]"# + r#"Encode error: encode '' error: expect avro record but got ["null","long"]"# ); test_err( &DataType::Struct(StructType::new(vec![("f0", DataType::Boolean)])), (), r#"{"type": "record", "name": "T", "fields": [{"name": "f0", "type": "int"}]}"#, - "encode f0 error: cannot encode boolean column as \"int\" field", + "encode 'f0' error: cannot encode boolean column as \"int\" field", ); } @@ -963,7 +963,7 @@ mod tests { &DataType::List(DataType::Int32.into()), Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(), avro_schema, - "encode error: found null but required", + "encode '' error: found null but required", ); test_ok( @@ -1002,7 +1002,7 @@ mod tests { &DataType::List(DataType::Boolean.into()), (), r#"{"type": "array", "items": "int"}"#, - "encode error: cannot encode boolean column as \"int\" field", + "encode '' error: cannot encode boolean column as \"int\" field", ); } @@ -1036,14 +1036,14 @@ mod tests { t, datum.to_datum_ref(), both, - r#"encode error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#, + r#"encode '' error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#, ); test_err( t, datum.to_datum_ref(), empty, - "encode error: cannot encode timestamp with time zone column as [] field", + "encode '' error: cannot encode timestamp with time zone column as [] field", ); test_ok( @@ -1052,7 +1052,7 @@ mod tests { one, Value::Union(0, Value::TimestampMillis(1).into()), ); - test_err(t, None, one, "encode error: found null but required"); + test_err(t, None, one, "encode '' error: found null but required"); test_ok( t, diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 0a8a9e5abce73..dc00a89143c57 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -201,7 +201,7 @@ impl std::fmt::Display for FieldEncodeError { write!( f, - "encode {} error: {}", + "encode '{}' error: {}", self.rev_path.iter().rev().join("."), self.message ) diff --git a/src/connector/src/sink/encoder/proto.rs b/src/connector/src/sink/encoder/proto.rs index df24cdc875884..2eee8efb33369 100644 --- a/src/connector/src/sink/encoder/proto.rs +++ b/src/connector/src/sink/encoder/proto.rs @@ -529,7 +529,7 @@ mod tests { .unwrap_err(); assert_eq!( err.to_string(), - "encode repeated_int_field error: cannot encode integer[] column as int32 field" + "encode 'repeated_int_field' error: cannot encode integer[] column as int32 field" ); let schema = Schema::new(vec![Field::with_name( @@ -554,7 +554,7 @@ mod tests { .unwrap_err(); assert_eq!( err.to_string(), - "encode repeated_int_field error: array containing null not allowed as repeated field" + "encode 'repeated_int_field' error: array containing null not allowed as repeated field" ); } @@ -573,7 +573,7 @@ mod tests { .unwrap_err(); assert_eq!( err.to_string(), - "encode not_exists error: field not in proto" + "encode 'not_exists' error: field not in proto" ); let err = validate_fields( @@ -583,7 +583,7 @@ mod tests { .unwrap_err(); assert_eq!( err.to_string(), - "encode map_field error: field not in proto" + "encode 'map_field' error: field not in proto" ); } } diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 9ac7d2114e458..bb0a41f63c33d 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use risingwave_common::array::StreamChunk; use crate::sink::{Result, SinkError}; @@ -279,8 +279,12 @@ impl FormatterBuild for AppendOnlyFormatter< impl FormatterBuild for UpsertFormatter { async fn build(b: FormatterParams<'_>) -> Result { - let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices)).await?; - let val_encoder = VE::build(b.builder, None).await?; + let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices)) + .await + .with_context(|| "Failed to build key encoder")?; + let val_encoder = VE::build(b.builder, None) + .await + .with_context(|| "Failed to build value encoder")?; Ok(UpsertFormatter::new(key_encoder, val_encoder)) } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 0cef07be55200..9bc6599871881 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -643,7 +643,7 @@ pub enum SinkError { #[backtrace] anyhow::Error, ), - #[error("Internal error: {0}")] + #[error(transparent)] Internal( #[from] #[backtrace]