Skip to content

Commit

Permalink
feat: improve error of building key encoder (#18563)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored and xxchan committed Sep 20, 2024
1 parent 9b0c53a commit 8d6b742
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 20 deletions.
3 changes: 3 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ profile:
# - use: kafka
# persist-data: true

# To enable Confluent schema registry, uncomment the following line
# - use: schema-registry

default-v6:
steps:
- use: meta-node
Expand Down
22 changes: 11 additions & 11 deletions src/connector/src/sink/encoder/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ mod tests {
}
]
}"#,
"encode q error: avro name ref unsupported yet",
"encode 'q' error: avro name ref unsupported yet",
);

test_err(
Expand All @@ -836,7 +836,7 @@ mod tests {
i64::MAX,
))),
r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
"encode error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
"encode '' error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
);

let avro_schema = AvroSchema::parse_str(
Expand Down Expand Up @@ -911,7 +911,7 @@ mod tests {
};
assert_eq!(
err.to_string(),
"Encode error: encode req error: field not present but required"
"Encode error: encode 'req' error: field not present but required"
);

let schema = Schema::new(vec![
Expand All @@ -924,7 +924,7 @@ mod tests {
};
assert_eq!(
err.to_string(),
"Encode error: encode extra error: field not in avro"
"Encode error: encode 'extra' error: field not in avro"
);

let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap();
Expand All @@ -934,14 +934,14 @@ mod tests {
};
assert_eq!(
err.to_string(),
r#"Encode error: encode error: expect avro record but got ["null","long"]"#
r#"Encode error: encode '' error: expect avro record but got ["null","long"]"#
);

test_err(
&DataType::Struct(StructType::new(vec![("f0", DataType::Boolean)])),
(),
r#"{"type": "record", "name": "T", "fields": [{"name": "f0", "type": "int"}]}"#,
"encode f0 error: cannot encode boolean column as \"int\" field",
"encode 'f0' error: cannot encode boolean column as \"int\" field",
);
}

Expand All @@ -963,7 +963,7 @@ mod tests {
&DataType::List(DataType::Int32.into()),
Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(),
avro_schema,
"encode error: found null but required",
"encode '' error: found null but required",
);

test_ok(
Expand Down Expand Up @@ -1002,7 +1002,7 @@ mod tests {
&DataType::List(DataType::Boolean.into()),
(),
r#"{"type": "array", "items": "int"}"#,
"encode error: cannot encode boolean column as \"int\" field",
"encode '' error: cannot encode boolean column as \"int\" field",
);
}

Expand Down Expand Up @@ -1036,14 +1036,14 @@ mod tests {
t,
datum.to_datum_ref(),
both,
r#"encode error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
r#"encode '' error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
);

test_err(
t,
datum.to_datum_ref(),
empty,
"encode error: cannot encode timestamp with time zone column as [] field",
"encode '' error: cannot encode timestamp with time zone column as [] field",
);

test_ok(
Expand All @@ -1052,7 +1052,7 @@ mod tests {
one,
Value::Union(0, Value::TimestampMillis(1).into()),
);
test_err(t, None, one, "encode error: found null but required");
test_err(t, None, one, "encode '' error: found null but required");

test_ok(
t,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl std::fmt::Display for FieldEncodeError {

write!(
f,
"encode {} error: {}",
"encode '{}' error: {}",
self.rev_path.iter().rev().join("."),
self.message
)
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/sink/encoder/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ mod tests {
.unwrap_err();
assert_eq!(
err.to_string(),
"encode repeated_int_field error: cannot encode integer[] column as int32 field"
"encode 'repeated_int_field' error: cannot encode integer[] column as int32 field"
);

let schema = Schema::new(vec![Field::with_name(
Expand All @@ -554,7 +554,7 @@ mod tests {
.unwrap_err();
assert_eq!(
err.to_string(),
"encode repeated_int_field error: array containing null not allowed as repeated field"
"encode 'repeated_int_field' error: array containing null not allowed as repeated field"
);
}

Expand All @@ -573,7 +573,7 @@ mod tests {
.unwrap_err();
assert_eq!(
err.to_string(),
"encode not_exists error: field not in proto"
"encode 'not_exists' error: field not in proto"
);

let err = validate_fields(
Expand All @@ -583,7 +583,7 @@ mod tests {
.unwrap_err();
assert_eq!(
err.to_string(),
"encode map_field error: field not in proto"
"encode 'map_field' error: field not in proto"
);
}
}
10 changes: 7 additions & 3 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use risingwave_common::array::StreamChunk;

use crate::sink::{Result, SinkError};
Expand Down Expand Up @@ -279,8 +279,12 @@ impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for AppendOnlyFormatter<

impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for UpsertFormatter<KE, VE> {
async fn build(b: FormatterParams<'_>) -> Result<Self> {
let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices)).await?;
let val_encoder = VE::build(b.builder, None).await?;
let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices))
.await
.with_context(|| "Failed to build key encoder")?;
let val_encoder = VE::build(b.builder, None)
.await
.with_context(|| "Failed to build value encoder")?;
Ok(UpsertFormatter::new(key_encoder, val_encoder))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ pub enum SinkError {
#[backtrace]
anyhow::Error,
),
#[error("Internal error: {0}")]
#[error(transparent)]
Internal(
#[from]
#[backtrace]
Expand Down

0 comments on commit 8d6b742

Please sign in to comment.