Skip to content

Commit

Permalink
cherrypick feat(sink): support sink map to protobuf (#18552) to branc…
Browse files Browse the repository at this point in the history
…h release-2.0 (#18616)

Signed-off-by: xxchan <[email protected]>
Co-authored-by: Eric Fu <[email protected]>
  • Loading branch information
xxchan and fuyufjh authored Sep 23, 2024
1 parent 73cb4ef commit da6dd46
Show file tree
Hide file tree
Showing 7 changed files with 490 additions and 104 deletions.
3 changes: 3 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ profile:
# - use: kafka
# persist-data: true

# To enable Confluent schema registry, uncomment the following line
# - use: schema-registry

default-v6:
steps:
- use: meta-node
Expand Down
Binary file modified src/connector/codec/tests/test_data/all-types.pb
Binary file not shown.
22 changes: 11 additions & 11 deletions src/connector/src/sink/encoder/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ mod tests {
}
]
}"#,
"encode q error: avro name ref unsupported yet",
"encode 'q' error: avro name ref unsupported yet",
);

test_err(
Expand All @@ -836,7 +836,7 @@ mod tests {
i64::MAX,
))),
r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
"encode error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
"encode '' error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
);

let avro_schema = AvroSchema::parse_str(
Expand Down Expand Up @@ -911,7 +911,7 @@ mod tests {
};
assert_eq!(
err.to_string(),
"Encode error: encode req error: field not present but required"
"Encode error: encode 'req' error: field not present but required"
);

let schema = Schema::new(vec![
Expand All @@ -924,7 +924,7 @@ mod tests {
};
assert_eq!(
err.to_string(),
"Encode error: encode extra error: field not in avro"
"Encode error: encode 'extra' error: field not in avro"
);

let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap();
Expand All @@ -934,14 +934,14 @@ mod tests {
};
assert_eq!(
err.to_string(),
r#"Encode error: encode error: expect avro record but got ["null","long"]"#
r#"Encode error: encode '' error: expect avro record but got ["null","long"]"#
);

test_err(
&DataType::Struct(StructType::new(vec![("f0", DataType::Boolean)])),
(),
r#"{"type": "record", "name": "T", "fields": [{"name": "f0", "type": "int"}]}"#,
"encode f0 error: cannot encode boolean column as \"int\" field",
"encode 'f0' error: cannot encode boolean column as \"int\" field",
);
}

Expand All @@ -963,7 +963,7 @@ mod tests {
&DataType::List(DataType::Int32.into()),
Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(),
avro_schema,
"encode error: found null but required",
"encode '' error: found null but required",
);

test_ok(
Expand Down Expand Up @@ -1002,7 +1002,7 @@ mod tests {
&DataType::List(DataType::Boolean.into()),
(),
r#"{"type": "array", "items": "int"}"#,
"encode error: cannot encode boolean column as \"int\" field",
"encode '' error: cannot encode boolean column as \"int\" field",
);
}

Expand Down Expand Up @@ -1036,14 +1036,14 @@ mod tests {
t,
datum.to_datum_ref(),
both,
r#"encode error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
r#"encode '' error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
);

test_err(
t,
datum.to_datum_ref(),
empty,
"encode error: cannot encode timestamp with time zone column as [] field",
"encode '' error: cannot encode timestamp with time zone column as [] field",
);

test_ok(
Expand All @@ -1052,7 +1052,7 @@ mod tests {
one,
Value::Union(0, Value::TimestampMillis(1).into()),
);
test_err(t, None, one, "encode error: found null but required");
test_err(t, None, one, "encode '' error: found null but required");

test_ok(
t,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl std::fmt::Display for FieldEncodeError {

write!(
f,
"encode {} error: {}",
"encode '{}' error: {}",
self.rev_path.iter().rev().join("."),
self.message
)
Expand Down
Loading

0 comments on commit da6dd46

Please sign in to comment.