diff --git a/Cargo.lock b/Cargo.lock index 9fc9ca7f523eb..dadff8e950e0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11094,6 +11094,8 @@ dependencies = [ "apache-avro 0.16.0", "chrono", "easy-ext", + "expect-test", + "hex", "itertools 0.12.1", "jsonbb", "jsonschema-transpiler", diff --git a/docs/developer-guide.md b/docs/developer-guide.md index 9b03c60f3d9a1..0f012a1452f5a 100644 --- a/docs/developer-guide.md +++ b/docs/developer-guide.md @@ -33,20 +33,21 @@ http://ecotrust-canada.github.io/markdown-toc/ * [Logging](#logging) - [Test your code changes](#test-your-code-changes) * [Lint](#lint) - * [Unit tests](#unit-tests) + * [Unit and integration tests](#unit-and-integration-tests) * [Planner tests](#planner-tests) * [End-to-end tests](#end-to-end-tests) * [End-to-end tests on CI](#end-to-end-tests-on-ci) * [Fuzzing tests](#fuzzing-tests) * [DocSlt tests](#docslt-tests) * [Deterministic simulation tests](#deterministic-simulation-tests) + * [Deterministic Simulation Integration tests](#deterministic-simulation-integration-tests) + * [Backwards Compatibility tests](#backwards-compatibility-tests) - [Miscellaneous checks](#miscellaneous-checks) - [Update Grafana dashboard](#update-grafana-dashboard) - [Add new files](#add-new-files) - [Add new dependencies](#add-new-dependencies) - [Submit PRs](#submit-prs) -- [Profiling](#benchmarking-and-profiling) -- [Understanding RisingWave Macros](#understanding-risingwave-macros) +- [Benchmarking and Profiling](#benchmarking-and-profiling) - [CI Labels Guide](#ci-labels-guide) ## Read the design docs @@ -305,7 +306,7 @@ RisingWave requires all code to pass fmt, clippy, sort and hakari checks. Run th ./risedev c # Run all checks. Shortcut for ./risedev check ``` -### Unit tests +### Unit and integration tests RiseDev runs unit tests with cargo-nextest. To run unit tests: @@ -320,10 +321,13 @@ If you want to see the coverage report, run this command: ./risedev test-cov ``` -Some unit tests will not work if the `/tmp` directory is on a TmpFS file system: these unit tests will fail with this -error message: `Attempting to create cache file on a TmpFS file system. TmpFS cannot be used because it does not support Direct IO.`. -If this happens you can override the use of `/tmp` by setting the environment variable `RISINGWAVE_TEST_DIR` to a -directory that is on a non-TmpFS filesystem, the unit tests will then place temporary files under your specified path. +Some ideas and caveats for writing tests: +- Use [expect_test](https://github.com/rust-analyzer/expect-test) to write data driven tests that can automatically update results. +- It's recommended to write new tests as *integration tests* (i.e. in `tests/` directory) instead of *unit tests* (i.e. in `src/` directory). + + Besides, put integration tests under `tests/integration_tests/*.rs`, instead of `tests/*.rs`. See [Delete Cargo Integration Tests](https://matklad.github.io/2021/02/27/delete-cargo-integration-tests.html) and [#9878](https://github.com/risingwavelabs/risingwave/issues/9878), for more details. + +You might want to read [How to Test](https://matklad.github.io/2021/05/31/how-to-test.html) for more good ideas on testing. ### Planner tests @@ -562,4 +566,4 @@ https://github.com/risingwavelabs/risingwave/pull/17197 To run `e2e-test` and `e2e-source-test` for `main-cron` in your pull request: 1. Add `ci/run-e2e-test`. 2. Add `ci/run-e2e-source-tests`. -3. Add `ci/main-cron/run-selected` to skip all other steps which were not selected with `ci/run-xxx`. \ No newline at end of file +3. Add `ci/main-cron/run-selected` to skip all other steps which were not selected with `ci/run-xxx`. diff --git a/src/connector/codec/Cargo.toml b/src/connector/codec/Cargo.toml index d9e6e274f0fef..1878a799a0a5e 100644 --- a/src/connector/codec/Cargo.toml +++ b/src/connector/codec/Cargo.toml @@ -35,8 +35,16 @@ thiserror-ext = { workspace = true } time = "0.3.30" tracing = "0.1" +[dev-dependencies] +expect-test = "1" +hex = "0.4" + [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../../workspace-hack" } [lints] workspace = true + +[lib] +test = false +doctest = false diff --git a/src/connector/codec/src/decoder/avro/schema.rs b/src/connector/codec/src/decoder/avro/schema.rs index 4bd0a469041ad..fe96495d089ea 100644 --- a/src/connector/codec/src/decoder/avro/schema.rs +++ b/src/connector/codec/src/decoder/avro/schema.rs @@ -74,6 +74,8 @@ impl MapHandling { /// This function expects resolved schema (no `Ref`). /// FIXME: require passing resolved schema here. +/// TODO: change `map_handling` to some `Config`, and also unify debezium. +/// TODO: use `ColumnDesc` in common instead of PB. pub fn avro_schema_to_column_descs( schema: &Schema, map_handling: Option, diff --git a/src/connector/codec/src/decoder/mod.rs b/src/connector/codec/src/decoder/mod.rs index 61d761410fae6..c7e04ab210a6e 100644 --- a/src/connector/codec/src/decoder/mod.rs +++ b/src/connector/codec/src/decoder/mod.rs @@ -44,7 +44,7 @@ pub enum AccessError { pub type AccessResult = std::result::Result; -/// Access to a field in the data structure. +/// Access to a field in the data structure. Created by `AccessBuilder`. pub trait Access { /// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data), /// and then converts it to RisingWave `Datum`. diff --git a/src/connector/codec/src/lib.rs b/src/connector/codec/src/lib.rs index 6b1335ab32e26..651fa84e109fb 100644 --- a/src/connector/codec/src/lib.rs +++ b/src/connector/codec/src/lib.rs @@ -44,6 +44,7 @@ pub mod decoder; pub use apache_avro::schema::Schema as AvroSchema; +pub use apache_avro::types::{Value as AvroValue, ValueKind as AvroValueKind}; pub use risingwave_pb::plan_common::ColumnDesc; pub struct JsonSchema(pub serde_json::Value); impl JsonSchema { diff --git a/src/connector/codec/tests/integration_tests/avro.rs b/src/connector/codec/tests/integration_tests/avro.rs new file mode 100644 index 0000000000000..9ded8d1b6de33 --- /dev/null +++ b/src/connector/codec/tests/integration_tests/avro.rs @@ -0,0 +1,400 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use apache_avro::from_avro_datum; +use risingwave_connector_codec::decoder::avro::{ + avro_schema_to_column_descs, AvroAccess, AvroParseOptions, MapHandling, ResolvedAvroSchema, +}; +use risingwave_connector_codec::decoder::Access; +use risingwave_connector_codec::AvroSchema; + +use crate::utils::*; + +/// Refer to `AvroAccessBuilder::parse_avro_value` for how Avro data looks like. +enum TestDataEncoding { + /// Each data is a JSON encoded Avro value. + /// + /// TODO: Not supported yet, because `apache_avro` doesn't support decoding JSON encoded avro.. + #[allow(dead_code)] + Json, + /// Each data is a binary encoded Avro value, converted to a hex string. + /// + /// Tool convert Avro JSON to hex string: https://xxchan-vercel-playground.vercel.app/avro + HexBinary, +} + +struct Config { + map_handling: Option, + data_encoding: TestDataEncoding, +} + +/// ## Arguments +/// - `avro_schema`: Avro schema in JSON format. +/// - `avro_data`: list of Avro data. According to [`TestDataEncoding`] +/// +/// ## Why not directly test the uppermost layer `AvroParserConfig` and `AvroAccessBuilder`? +/// +/// Because their interface are not clean enough, and have complex logic like schema registry. +/// We might need to separate logic to make them clenaer and then we can use it directly for testing. +/// +/// ## If we reimplement a similar logic here, what are we testing? +/// +/// Basically unit tests of `avro_schema_to_column_descs`, `convert_to_datum`, i.e., the type mapping. +/// +/// It makes some sense, as the data parsing logic is generally quite simple (one-liner), and the most +/// complex and error-prone part is the type mapping. +/// +/// ## Why test schema mapping and data mapping together? +/// +/// Because the expected data type for data mapping comes from the schema mapping. +fn check( + avro_schema: &str, + avro_data: &[&str], + config: Config, + expected_risingwave_schema: expect_test::Expect, + expected_risingwave_data: expect_test::Expect, +) { + // manually implement some logic in AvroParserConfig::map_to_columns + let avro_schema = AvroSchema::parse_str(avro_schema).expect("failed to parse Avro schema"); + let resolved_schema = + ResolvedAvroSchema::create(avro_schema.into()).expect("failed to resolve Avro schema"); + + let rw_schema = + avro_schema_to_column_descs(&resolved_schema.resolved_schema, config.map_handling) + .expect("failed to convert Avro schema to RisingWave schema") + .iter() + .map(ColumnDesc::from) + .collect_vec(); + expected_risingwave_schema.assert_eq(&format!( + "{:#?}", + rw_schema.iter().map(ColumnDescTestDisplay).collect_vec() + )); + + // manually implement some logic in AvroAccessBuilder, and some in PlainParser::parse_inner + let mut data_str = vec![]; + for data in avro_data { + let parser = AvroParseOptions::create(&resolved_schema.resolved_schema); + + match config.data_encoding { + TestDataEncoding::Json => todo!(), + TestDataEncoding::HexBinary => { + let data = hex::decode(data).expect("failed to decode hex string"); + let avro_data = + from_avro_datum(&resolved_schema.original_schema, &mut data.as_slice(), None) + .expect("failed to parse Avro data"); + let access = AvroAccess::new(&avro_data, parser); + + for col in &rw_schema { + let rw_data = access + .access(&[&col.name], &col.data_type) + .expect("failed to access"); + data_str.push(format!("{:?}", rw_data)); + } + } + } + } + expected_risingwave_data.assert_eq(&format!("{}", data_str.iter().format("\n"))); +} + +#[test] +fn test_simple() { + check( + r#" +{ + "name": "test_student", + "type": "record", + "fields": [ + { + "name": "id", + "type": "int", + "default": 0 + }, + { + "name": "sequence_id", + "type": "long", + "default": 0 + }, + { + "name": "name", + "type": ["null", "string"] + }, + { + "name": "score", + "type": "float", + "default": 0.0 + }, + { + "name": "avg_score", + "type": "double", + "default": 0.0 + }, + { + "name": "is_lasted", + "type": "boolean", + "default": false + }, + { + "name": "entrance_date", + "type": "int", + "logicalType": "date", + "default": 0 + }, + { + "name": "birthday", + "type": "long", + "logicalType": "timestamp-millis", + "default": 0 + }, + { + "name": "anniversary", + "type": "long", + "logicalType": "timestamp-micros", + "default": 0 + }, + { + "name": "passed", + "type": { + "name": "interval", + "type": "fixed", + "size": 12 + }, + "logicalType": "duration" + }, + { + "name": "bytes", + "type": "bytes", + "default": "" + } + ] +} + "#, + &[ + // {"id":32,"sequence_id":64,"name":{"string":"str_value"},"score":32.0,"avg_score":64.0,"is_lasted":true,"entrance_date":0,"birthday":0,"anniversary":0,"passed":"\u0001\u0000\u0000\u0000\u0001\u0000\u0000\u0000\u00E8\u0003\u0000\u0000","bytes":"\u0001\u0002\u0003\u0004\u0005"} + "40800102127374725f76616c7565000000420000000000005040010000000100000001000000e80300000a0102030405" + ], + Config { + map_handling: None, + data_encoding: TestDataEncoding::HexBinary, + }, + expect![[r#" + [ + id(#1): Int32, + sequence_id(#2): Int64, + name(#3): Varchar, + score(#4): Float32, + avg_score(#5): Float64, + is_lasted(#6): Boolean, + entrance_date(#7): Date, + birthday(#8): Timestamptz, + anniversary(#9): Timestamptz, + passed(#10): Interval, + bytes(#11): Bytea, + ]"#]], + expect![[r#" + Owned(Some(Int32(32))) + Owned(Some(Int64(64))) + Borrowed(Some(Utf8("str_value"))) + Owned(Some(Float32(OrderedFloat(32.0)))) + Owned(Some(Float64(OrderedFloat(64.0)))) + Owned(Some(Bool(true))) + Owned(Some(Date(Date(1970-01-01)))) + Owned(Some(Timestamptz(Timestamptz(0)))) + Owned(Some(Timestamptz(Timestamptz(0)))) + Owned(Some(Interval(Interval { months: 1, days: 1, usecs: 1000000 }))) + Borrowed(Some(Bytea([1, 2, 3, 4, 5])))"#]], + ) +} + +/// From `e2e_test/source_inline/kafka/avro/upsert_avro_json` +#[test] +fn test_1() { + check( + r#" +{ + "type": "record", + "name": "OBJ_ATTRIBUTE_VALUE", + "namespace": "CPLM", + "fields": [ + { + "name": "op_type", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "ID", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "CLASS_ID", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "ITEM_ID", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "ATTR_ID", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "ATTR_VALUE", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "ORG_ID", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "UNIT_INFO", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "UPD_TIME", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "DEC_VAL", + "type": [ + { + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 2 + }, + "null" + ], + "default": "\u00ff" + }, + { + "name": "REFERRED", + "type": [ + "null", + { + "type": "record", + "name": "REFERRED_TYPE", + "fields": [ + { + "name": "a", + "type": "string" + } + ] + } + ], + "default": null + }, + { + "name": "REF", + "type": [ + "null", + "REFERRED_TYPE" + ], + "default": null + }, + { + "name": "uuid", + "type": [ + "null", + { + "type": "string", + "logicalType": "uuid" + } + ], + "default": null + }, + { + "name": "rate", + "type": "double", + "default": "NaN" + } + ], + "connect.name": "CPLM.OBJ_ATTRIBUTE_VALUE" +} +"#, + &[ + // {"op_type": {"string": "update"}, "ID": {"string": "id1"}, "CLASS_ID": {"string": "1"}, "ITEM_ID": {"string": "6768"}, "ATTR_ID": {"string": "6970"}, "ATTR_VALUE": {"string": "value9"}, "ORG_ID": {"string": "7172"}, "UNIT_INFO": {"string": "info9"}, "UPD_TIME": {"string": "2021-05-18T07:59:58.714Z"}, "DEC_VAL": {"bytes": "\u0002\u0054\u000b\u00e3\u00ff"}} + "020c7570646174650206696431020231020836373638020836393730020c76616c756539020837313732020a696e666f390230323032312d30352d31385430373a35393a35382e3731345a000a02540be3ff000000000000000000f87f" + ], + Config { + map_handling: None, + data_encoding: TestDataEncoding::HexBinary, + }, + expect![[r#" + [ + op_type(#1): Varchar, + ID(#2): Varchar, + CLASS_ID(#3): Varchar, + ITEM_ID(#4): Varchar, + ATTR_ID(#5): Varchar, + ATTR_VALUE(#6): Varchar, + ORG_ID(#7): Varchar, + UNIT_INFO(#8): Varchar, + UPD_TIME(#9): Varchar, + DEC_VAL(#10): Decimal, + REFERRED(#11): Struct(StructType { field_names: ["a"], field_types: [Varchar] }), + REF(#12): Struct(StructType { field_names: ["a"], field_types: [Varchar] }), + uuid(#13): Varchar, + rate(#14): Float64, + ]"#]], + expect![[r#" + Borrowed(Some(Utf8("update"))) + Borrowed(Some(Utf8("id1"))) + Borrowed(Some(Utf8("1"))) + Borrowed(Some(Utf8("6768"))) + Borrowed(Some(Utf8("6970"))) + Borrowed(Some(Utf8("value9"))) + Borrowed(Some(Utf8("7172"))) + Borrowed(Some(Utf8("info9"))) + Borrowed(Some(Utf8("2021-05-18T07:59:58.714Z"))) + Owned(Some(Decimal(Normalized(99999999.99)))) + Owned(None) + Owned(None) + Owned(None) + Owned(Some(Float64(OrderedFloat(NaN))))"#]], + ); +} diff --git a/src/connector/codec/tests/integration_tests/main.rs b/src/connector/codec/tests/integration_tests/main.rs new file mode 100644 index 0000000000000..8c718f918d0a6 --- /dev/null +++ b/src/connector/codec/tests/integration_tests/main.rs @@ -0,0 +1,17 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod avro; + +pub mod utils; diff --git a/src/connector/codec/tests/integration_tests/utils.rs b/src/connector/codec/tests/integration_tests/utils.rs new file mode 100644 index 0000000000000..2046f2504de8a --- /dev/null +++ b/src/connector/codec/tests/integration_tests/utils.rs @@ -0,0 +1,59 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub use expect_test::{expect, Expect}; +pub use itertools::Itertools; +pub use risingwave_common::catalog::ColumnDesc; +use risingwave_pb::plan_common::AdditionalColumn; + +/// More concise display for ColumnDesc, to use in tests. +pub struct ColumnDescTestDisplay<'a>(pub &'a ColumnDesc); + +impl<'a> std::fmt::Debug for ColumnDescTestDisplay<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let ColumnDesc { + data_type, + column_id, + name, + field_descs, + type_name, + generated_or_default_column, + description, + additional_column: AdditionalColumn { column_type }, + version: _, + } = &self.0; + + write!(f, "{name}(#{column_id}): {data_type:?}")?; + if !type_name.is_empty() { + write!(f, ", type_name: {:?}", type_name)?; + } + if !field_descs.is_empty() { + write!(f, ", field_descs: {:?}", field_descs)?; + } + if let Some(generated_or_default_column) = generated_or_default_column { + write!( + f, + ", generated_or_default_column: {:?}", + generated_or_default_column + )?; + } + if let Some(description) = description { + write!(f, ", description: {:?}", description)?; + } + if let Some(column_type) = column_type { + write!(f, ", additional_column: {:?}", column_type)?; + } + Ok(()) + } +} diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index f6abe86a17f0d..5bd0038e3e839 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -72,6 +72,24 @@ impl AvroAccessBuilder { /// Note: we should use unresolved schema to parsing bytes into avro value. /// Otherwise it's an invalid schema and parsing will fail. (Avro error: Two named schema defined for same fullname) + /// + /// # Notes about how Avro data looks like + /// + /// First, it has two [serialization encodings: binary and JSON](https://avro.apache.org/docs/1.11.1/specification/#encodings). + /// They don't have magic bytes and cannot be distinguished on their own. + /// + /// But in different cases, it starts with different headers, or magic bytes, which can be confusing. + /// + /// ## `apache_avro` API and headers + /// + /// - `apache_avro::Reader`: [Object Container Files](https://avro.apache.org/docs/1.11.1/specification/#object-container-files): contains file header, starting with 4 bytes `Obj1`. This is a batch file encoding. We don't use it. + /// - `apache_avro::GenericSingleObjectReader`: [Single-object encoding](https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding): starts with 2 bytes `0xC301`. This is designed to be used in places like Kafka, but Confluent schema registry doesn't use it. + /// - `apache_avro::from_avro_datum`: no header, binary encoding. This is what we should use. + /// + /// ## Confluent schema registry + /// + /// - In Kafka ([Confluent schema registry wire format](https://docs.confluent.io/platform/7.6/schema-registry/fundamentals/serdes-develop/index.html#wire-format)): + /// starts with 5 bytes`0x00{schema_id:08x}` followed by Avro binary encoding. async fn parse_avro_value(&self, payload: &[u8]) -> ConnectorResult> { // parse payload to avro value // if use confluent schema, get writer schema from confluent schema registry @@ -84,6 +102,7 @@ impl AvroAccessBuilder { Some(&self.schema.original_schema), )?)) } else { + // FIXME: we should not use `Reader` (file header) here. See comment above and https://github.com/risingwavelabs/risingwave/issues/12871 let mut reader = Reader::with_schema(&self.schema.original_schema, payload)?; match reader.next() { Some(Ok(v)) => Ok(Some(v)), @@ -185,46 +204,17 @@ impl AvroParserConfig { #[cfg(test)] mod test { use std::env; - use std::fs::OpenOptions; - use std::io::Write; - use std::ops::Sub; - use std::path::PathBuf; - use apache_avro::schema::RecordSchema; - use apache_avro::types::Record; - use apache_avro::{Codec, Days, Duration, Millis, Months, Writer}; - use itertools::Itertools; - use risingwave_common::array::Op; - use risingwave_common::catalog::ColumnId; - use risingwave_common::row::Row; - use risingwave_common::types::{DataType, Date}; - use risingwave_common::util::iter_util::ZipEqFast; - use risingwave_pb::catalog::StreamSourceInfo; - use risingwave_pb::plan_common::{PbEncodeType, PbFormatType}; use url::Url; use super::*; use crate::connector_common::AwsAuthProps; - use crate::parser::plain_parser::PlainParser; - use crate::parser::{AccessBuilderImpl, SourceStreamChunkBuilder, SpecificParserConfig}; - use crate::source::{SourceColumnDesc, SourceContext}; fn test_data_path(file_name: &str) -> String { let curr_dir = env::current_dir().unwrap().into_os_string(); curr_dir.into_string().unwrap() + "/src/test_data/" + file_name } - fn e2e_file_path(file_name: &str) -> String { - let curr_dir = env::current_dir().unwrap().into_os_string(); - let binding = PathBuf::from(curr_dir); - let dir = binding.parent().unwrap().parent().unwrap(); - dir.join("scripts/source/test_data/") - .join(file_name) - .to_str() - .unwrap() - .to_string() - } - #[tokio::test] #[ignore] async fn test_load_schema_from_s3() { @@ -261,290 +251,4 @@ mod test { assert!(schema.is_ok()); println!("schema = {:?}", schema.unwrap()); } - - async fn new_avro_conf_from_local(file_name: &str) -> ConnectorResult { - let schema_path = format!("file://{}", test_data_path(file_name)); - let info = StreamSourceInfo { - row_schema_location: schema_path.clone(), - use_schema_registry: false, - format: PbFormatType::Plain.into(), - row_encode: PbEncodeType::Avro.into(), - ..Default::default() - }; - let parser_config = SpecificParserConfig::new(&info, &Default::default())?; - AvroParserConfig::new(parser_config.encoding_config).await - } - - async fn new_avro_parser_from_local(file_name: &str) -> ConnectorResult { - let conf = new_avro_conf_from_local(file_name).await?; - - Ok(PlainParser { - key_builder: None, - payload_builder: AccessBuilderImpl::Avro(AvroAccessBuilder::new( - conf, - EncodingType::Value, - )?), - rw_columns: Vec::default(), - source_ctx: SourceContext::dummy().into(), - transaction_meta_builder: None, - }) - } - - #[tokio::test] - async fn test_avro_parser() { - let mut parser = new_avro_parser_from_local("simple-schema.avsc") - .await - .unwrap(); - let builder = try_match_expand!(&parser.payload_builder, AccessBuilderImpl::Avro).unwrap(); - let schema = builder.schema.original_schema.clone(); - let record = build_avro_data(&schema); - assert_eq!(record.fields.len(), 11); - let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Snappy); - writer.append(record.clone()).unwrap(); - let flush = writer.flush().unwrap(); - assert!(flush > 0); - let input_data = writer.into_inner().unwrap(); - let columns = build_rw_columns(); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 1); - { - let writer = builder.row_writer(); - parser - .parse_inner(None, Some(input_data), writer) - .await - .unwrap(); - } - let chunk = builder.finish(); - let (op, row) = chunk.rows().next().unwrap(); - assert_eq!(op, Op::Insert); - - expect_test::expect![[r#" - ("id", Int(32)) => Some(Int32(32)) - ("sequence_id", Long(64)) => Some(Int64(64)) - ("name", Union(1, String("str_value"))) => Some(Utf8("str_value")) - ("score", Float(32.0)) => Some(Float32(OrderedFloat(32.0))) - ("avg_score", Double(64.0)) => Some(Float64(OrderedFloat(64.0))) - ("is_lasted", Boolean(true)) => Some(Bool(true)) - ("entrance_date", Date(0)) => Some(Date(Date(1970-01-01))) - ("birthday", TimestampMillis(0)) => Some(Timestamptz(Timestamptz(0))) - ("anniversary", TimestampMicros(0)) => Some(Timestamptz(Timestamptz(0))) - ("passed", Duration(Duration { months: Months(1), days: Days(1), millis: Millis(1000) })) => Some(Interval(Interval { months: 1, days: 1, usecs: 1000000 })) - ("bytes", Bytes([1, 2, 3, 4, 5])) => Some(Bytea([1, 2, 3, 4, 5]))"#]].assert_eq(&format!( - "{}", - record - .fields - .iter() - .zip_eq_fast(row.iter()) - .format_with("\n", |(avro, datum), f| { - f(&format_args!("{:?} => {:?}", avro, datum)) - }) - )); - } - - fn build_rw_columns() -> Vec { - vec![ - SourceColumnDesc::simple("id", DataType::Int32, ColumnId::from(0)), - SourceColumnDesc::simple("sequence_id", DataType::Int64, ColumnId::from(1)), - SourceColumnDesc::simple("name", DataType::Varchar, ColumnId::from(2)), - SourceColumnDesc::simple("score", DataType::Float32, ColumnId::from(3)), - SourceColumnDesc::simple("avg_score", DataType::Float64, ColumnId::from(4)), - SourceColumnDesc::simple("is_lasted", DataType::Boolean, ColumnId::from(5)), - SourceColumnDesc::simple("entrance_date", DataType::Date, ColumnId::from(6)), - SourceColumnDesc::simple("birthday", DataType::Timestamptz, ColumnId::from(7)), - SourceColumnDesc::simple("anniversary", DataType::Timestamptz, ColumnId::from(8)), - SourceColumnDesc::simple("passed", DataType::Interval, ColumnId::from(9)), - SourceColumnDesc::simple("bytes", DataType::Bytea, ColumnId::from(10)), - ] - } - - fn build_field(schema: &Schema) -> Option { - match schema { - Schema::String => Some(Value::String("str_value".to_string())), - Schema::Int => Some(Value::Int(32_i32)), - Schema::Long => Some(Value::Long(64_i64)), - Schema::Float => Some(Value::Float(32_f32)), - Schema::Double => Some(Value::Double(64_f64)), - Schema::Boolean => Some(Value::Boolean(true)), - Schema::Bytes => Some(Value::Bytes(vec![1, 2, 3, 4, 5])), - - Schema::Date => { - let original_date = Date::from_ymd_uncheck(1970, 1, 1).and_hms_uncheck(0, 0, 0); - let naive_date = Date::from_ymd_uncheck(1970, 1, 1).and_hms_uncheck(0, 0, 0); - let num_days = naive_date.0.sub(original_date.0).num_days() as i32; - Some(Value::Date(num_days)) - } - Schema::TimestampMillis => { - let datetime = Date::from_ymd_uncheck(1970, 1, 1).and_hms_uncheck(0, 0, 0); - let timestamp_mills = - Value::TimestampMillis(datetime.0.and_utc().timestamp() * 1_000); - Some(timestamp_mills) - } - Schema::TimestampMicros => { - let datetime = Date::from_ymd_uncheck(1970, 1, 1).and_hms_uncheck(0, 0, 0); - let timestamp_micros = - Value::TimestampMicros(datetime.0.and_utc().timestamp() * 1_000_000); - Some(timestamp_micros) - } - Schema::Duration => { - let months = Months::new(1); - let days = Days::new(1); - let millis = Millis::new(1000); - Some(Value::Duration(Duration::new(months, days, millis))) - } - - Schema::Union(union_schema) => { - let inner_schema = union_schema - .variants() - .iter() - .find_or_first(|s| !matches!(s, &&Schema::Null)) - .unwrap(); - - match build_field(inner_schema) { - None => { - let index_of_union = union_schema - .find_schema_with_known_schemata::<&Schema>(&Value::Null, None, &None) - .unwrap() - .0 as u32; - Some(Value::Union(index_of_union, Box::new(Value::Null))) - } - Some(value) => { - let index_of_union = union_schema - .find_schema_with_known_schemata::<&Schema>(&value, None, &None) - .unwrap() - .0 as u32; - Some(Value::Union(index_of_union, Box::new(value))) - } - } - } - _ => None, - } - } - - fn build_avro_data(schema: &Schema) -> Record<'_> { - let mut record = Record::new(schema).unwrap(); - if let Schema::Record(RecordSchema { - name: _, fields, .. - }) = schema.clone() - { - for field in &fields { - let value = build_field(&field.schema) - .unwrap_or_else(|| panic!("No value defined for field, {}", field.name)); - record.put(field.name.as_str(), value) - } - } - record - } - - #[tokio::test] - async fn test_map_to_columns() { - let conf = new_avro_conf_from_local("simple-schema.avsc") - .await - .unwrap(); - let columns = conf.map_to_columns().unwrap(); - assert_eq!(columns.len(), 11); - println!("{:?}", columns); - } - - #[tokio::test] - async fn test_new_avro_parser() { - let avro_parser_rs = new_avro_parser_from_local("simple-schema.avsc").await; - let avro_parser = avro_parser_rs.unwrap(); - println!("avro_parser = {:?}", avro_parser); - } - - #[tokio::test] - async fn test_avro_union_type() { - let parser = new_avro_parser_from_local("union-schema.avsc") - .await - .unwrap(); - let builder = try_match_expand!(&parser.payload_builder, AccessBuilderImpl::Avro).unwrap(); - let schema = &builder.schema.original_schema; - let mut null_record = Record::new(schema).unwrap(); - null_record.put("id", Value::Int(5)); - null_record.put("age", Value::Union(0, Box::new(Value::Null))); - null_record.put("sequence_id", Value::Union(0, Box::new(Value::Null))); - null_record.put("name", Value::Union(0, Box::new(Value::Null))); - null_record.put("score", Value::Union(1, Box::new(Value::Null))); - null_record.put("avg_score", Value::Union(0, Box::new(Value::Null))); - null_record.put("is_lasted", Value::Union(0, Box::new(Value::Null))); - null_record.put("entrance_date", Value::Union(0, Box::new(Value::Null))); - null_record.put("birthday", Value::Union(0, Box::new(Value::Null))); - null_record.put("anniversary", Value::Union(0, Box::new(Value::Null))); - - let mut writer = Writer::new(schema, Vec::new()); - writer.append(null_record).unwrap(); - writer.flush().unwrap(); - - let record = build_avro_data(schema); - writer.append(record).unwrap(); - writer.flush().unwrap(); - - let records = writer.into_inner().unwrap(); - - let reader: Vec<_> = Reader::with_schema(schema, &records[..]).unwrap().collect(); - assert_eq!(2, reader.len()); - let null_record_expected: Vec<(String, Value)> = vec![ - ("id".to_string(), Value::Int(5)), - ("age".to_string(), Value::Union(0, Box::new(Value::Null))), - ( - "sequence_id".to_string(), - Value::Union(0, Box::new(Value::Null)), - ), - ("name".to_string(), Value::Union(0, Box::new(Value::Null))), - ("score".to_string(), Value::Union(1, Box::new(Value::Null))), - ( - "avg_score".to_string(), - Value::Union(0, Box::new(Value::Null)), - ), - ( - "is_lasted".to_string(), - Value::Union(0, Box::new(Value::Null)), - ), - ( - "entrance_date".to_string(), - Value::Union(0, Box::new(Value::Null)), - ), - ( - "birthday".to_string(), - Value::Union(0, Box::new(Value::Null)), - ), - ( - "anniversary".to_string(), - Value::Union(0, Box::new(Value::Null)), - ), - ]; - let null_record_value = reader.first().unwrap().as_ref().unwrap(); - match null_record_value { - Value::Record(values) => { - assert_eq!(values, &null_record_expected) - } - _ => unreachable!(), - } - } - - // run this script when updating `simple-schema.avsc`, the script will generate new value in - // `avro_simple_schema_bin.1` - #[ignore] - #[tokio::test] - async fn update_avro_payload() { - let conf = new_avro_conf_from_local("simple-schema.avsc") - .await - .unwrap(); - let mut writer = Writer::new(conf.schema.original_schema.as_ref(), Vec::new()); - let record = build_avro_data(conf.schema.original_schema.as_ref()); - writer.append(record).unwrap(); - let encoded = writer.into_inner().unwrap(); - println!("path = {:?}", e2e_file_path("avro_simple_schema_bin.1")); - let mut file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(true) - .open(e2e_file_path("avro_simple_schema_bin.1")) - .unwrap(); - file.write_all(encoded.as_slice()).unwrap(); - println!( - "encoded = {:?}", - String::from_utf8_lossy(encoded.as_slice()) - ); - } } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index c11c833457a99..af9b1df16d6cf 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -842,6 +842,7 @@ async fn into_chunk_stream_inner( } } +/// Parses raw bytes into a specific format (avro, json, protobuf, ...), and then builds an [`Access`] from the parsed data. pub trait AccessBuilder { async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult>; } diff --git a/src/connector/src/schema/schema_registry/util.rs b/src/connector/src/schema/schema_registry/util.rs index 44b7a350e6823..a02d1deaf390d 100644 --- a/src/connector/src/schema/schema_registry/util.rs +++ b/src/connector/src/schema/schema_registry/util.rs @@ -53,11 +53,15 @@ pub enum WireFormatError { ParseMessageIndexes, } -/// extract the magic number and `schema_id` at the front of payload +/// Returns `(schema_id, payload)` /// -/// 0 -> magic number -/// 1-4 -> schema id -/// 5-... -> message payload +/// Refer to [Confluent schema registry wire format](https://docs.confluent.io/platform/7.6/schema-registry/fundamentals/serdes-develop/index.html#wire-format) +/// +/// | Bytes | Area | Description | +/// |-------|-------------|----------------------------------------------------------------------------------------------------| +/// | 0 | Magic Byte | Confluent serialization format version number; currently always `0`. | +/// | 1-4 | Schema ID | 4-byte schema ID as returned by Schema Registry. | +/// | 5-... | Data | Serialized data for the specified schema format (for example, binary encoding for Avro or Protobuf.| pub(crate) fn extract_schema_id(payload: &[u8]) -> Result<(i32, &[u8]), WireFormatError> { use byteorder::{BigEndian, ReadBytesExt as _}; diff --git a/src/stream/tests/README.md b/src/stream/tests/README.md deleted file mode 120000 index 9b94e2d61ad05..0000000000000 --- a/src/stream/tests/README.md +++ /dev/null @@ -1 +0,0 @@ -src/stream/README.md \ No newline at end of file