diff --git a/src/connector/benches/json_vs_plain_parser.rs b/src/connector/benches/json_vs_plain_parser.rs index 5e904c88786e..a176e3b2b020 100644 --- a/src/connector/benches/json_vs_plain_parser.rs +++ b/src/connector/benches/json_vs_plain_parser.rs @@ -20,10 +20,90 @@ mod json_common; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use futures::executor::block_on; use json_common::*; +use old_json_parser::JsonParser; use risingwave_connector::parser::plain_parser::PlainParser; -use risingwave_connector::parser::{JsonParser, SourceStreamChunkBuilder, SpecificParserConfig}; +use risingwave_connector::parser::{SourceStreamChunkBuilder, SpecificParserConfig}; use risingwave_connector::source::SourceContext; +// The original implementation used to parse JSON prior to #13707. +mod old_json_parser { + use anyhow::Context as _; + use itertools::{Either, Itertools as _}; + use risingwave_common::{bail, try_match_expand}; + use risingwave_connector::error::ConnectorResult; + use risingwave_connector::parser::{ + Access as _, EncodingProperties, JsonAccess, SourceStreamChunkRowWriter, + }; + use risingwave_connector::source::{SourceColumnDesc, SourceContextRef}; + + use super::*; + + /// Parser for JSON format + #[derive(Debug)] + pub struct JsonParser { + _rw_columns: Vec, + _source_ctx: SourceContextRef, + // If schema registry is used, the starting index of payload is 5. + payload_start_idx: usize, + } + + impl JsonParser { + pub fn new( + props: SpecificParserConfig, + rw_columns: Vec, + source_ctx: SourceContextRef, + ) -> ConnectorResult { + let json_config = try_match_expand!(props.encoding_config, EncodingProperties::Json)?; + let payload_start_idx = if json_config.use_schema_registry { + 5 + } else { + 0 + }; + Ok(Self { + _rw_columns: rw_columns, + _source_ctx: source_ctx, + payload_start_idx, + }) + } + + #[allow(clippy::unused_async)] + pub async fn parse_inner( + &self, + mut payload: Vec, + mut writer: SourceStreamChunkRowWriter<'_>, + ) -> ConnectorResult<()> { + let value = simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..]) + .context("failed to parse json payload")?; + let values = if let simd_json::BorrowedValue::Array(arr) = value { + Either::Left(arr.into_iter()) + } else { + Either::Right(std::iter::once(value)) + }; + + let mut errors = Vec::new(); + for value in values { + let accessor = JsonAccess::new(value); + match writer + .insert(|column| accessor.access(&[&column.name], Some(&column.data_type))) + { + Ok(_) => {} + Err(err) => errors.push(err), + } + } + + if errors.is_empty() { + Ok(()) + } else { + bail!( + "failed to parse {} row(s) in a single json message: {}", + errors.len(), + errors.iter().format(", ") + ); + } + } + } +} + fn generate_json_rows() -> Vec> { let mut rng = rand::thread_rng(); let mut records = Vec::with_capacity(NUM_RECORDS); diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index f9f5b1c848c4..701fa7832296 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -12,29 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Note on this file: +// +// There's no struct named `JsonParser` anymore since #13707. `ENCODE JSON` will be +// dispatched to `PlainParser` or `UpsertParser` with `JsonAccessBuilder` instead. +// +// This file now only contains utilities and tests for JSON parsing. Also, to avoid +// rely on the internal implementation and allow that to be changed, the tests use +// `ByteStreamSourceParserImpl` to create a parser instance. + use std::collections::HashMap; use anyhow::Context as _; use apache_avro::Schema; -use itertools::{Either, Itertools}; use jst::{convert_avro, Context}; -use risingwave_common::{bail, try_match_expand}; use risingwave_pb::plan_common::ColumnDesc; use super::avro::schema_resolver::ConfluentSchemaCache; -use super::unified::Access; use super::util::{bytes_from_url, get_kafka_topic}; -use super::{EncodingProperties, JsonProperties, SchemaRegistryAuth, SpecificParserConfig}; +use super::{JsonProperties, SchemaRegistryAuth}; use crate::error::ConnectorResult; -use crate::only_parse_payload; use crate::parser::avro::util::avro_schema_to_column_descs; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::AccessImpl; -use crate::parser::{ - AccessBuilder, ByteStreamSourceParser, ParserFormat, SourceStreamChunkRowWriter, -}; +use crate::parser::AccessBuilder; use crate::schema::schema_registry::{handle_sr_list, Client}; -use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; #[derive(Debug)] pub struct JsonAccessBuilder { @@ -78,80 +80,6 @@ impl JsonAccessBuilder { } } -/// Parser for JSON format -#[derive(Debug)] -pub struct JsonParser { - rw_columns: Vec, - source_ctx: SourceContextRef, - // If schema registry is used, the starting index of payload is 5. - payload_start_idx: usize, -} - -impl JsonParser { - pub fn new( - props: SpecificParserConfig, - rw_columns: Vec, - source_ctx: SourceContextRef, - ) -> ConnectorResult { - let json_config = try_match_expand!(props.encoding_config, EncodingProperties::Json)?; - let payload_start_idx = if json_config.use_schema_registry { - 5 - } else { - 0 - }; - Ok(Self { - rw_columns, - source_ctx, - payload_start_idx, - }) - } - - #[cfg(test)] - pub fn new_for_test(rw_columns: Vec) -> ConnectorResult { - Ok(Self { - rw_columns, - source_ctx: SourceContext::dummy().into(), - payload_start_idx: 0, - }) - } - - #[allow(clippy::unused_async)] - pub async fn parse_inner( - &self, - mut payload: Vec, - mut writer: SourceStreamChunkRowWriter<'_>, - ) -> ConnectorResult<()> { - let value = simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..]) - .context("failed to parse json payload")?; - let values = if let simd_json::BorrowedValue::Array(arr) = value { - Either::Left(arr.into_iter()) - } else { - Either::Right(std::iter::once(value)) - }; - - let mut errors = Vec::new(); - for value in values { - let accessor = JsonAccess::new(value); - match writer.insert(|column| accessor.access(&[&column.name], Some(&column.data_type))) - { - Ok(_) => {} - Err(err) => errors.push(err), - } - } - - if errors.is_empty() { - Ok(()) - } else { - // TODO(error-handling): multiple errors - bail!( - "failed to parse {} row(s) in a single json message: {}", - errors.len(), - errors.iter().format(", ") - ); - } - } -} - pub async fn schema_to_columns( schema_location: &str, schema_registry_auth: Option, @@ -179,29 +107,6 @@ pub async fn schema_to_columns( avro_schema_to_column_descs(&schema, None) } -impl ByteStreamSourceParser for JsonParser { - fn columns(&self) -> &[SourceColumnDesc] { - &self.rw_columns - } - - fn source_ctx(&self) -> &SourceContext { - &self.source_ctx - } - - fn parser_format(&self) -> ParserFormat { - ParserFormat::Json - } - - async fn parse_one<'a>( - &'a mut self, - _key: Option>, - payload: Option>, - writer: SourceStreamChunkRowWriter<'a>, - ) -> ConnectorResult<()> { - only_parse_payload!(self, payload, writer) - } -} - #[cfg(test)] mod tests { use std::vec; @@ -215,13 +120,31 @@ mod tests { use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{AdditionalColumn, AdditionalColumnKey}; - use super::JsonParser; - use crate::parser::upsert_parser::UpsertParser; + use crate::parser::test_utils::ByteStreamSourceParserImplTestExt as _; use crate::parser::{ - EncodingProperties, JsonProperties, ProtocolProperties, SourceColumnDesc, - SourceStreamChunkBuilder, SpecificParserConfig, + ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig, ProtocolProperties, + SourceColumnDesc, SpecificParserConfig, }; - use crate::source::{SourceColumnType, SourceContext}; + use crate::source::SourceColumnType; + + fn make_parser(rw_columns: Vec) -> ByteStreamSourceParserImpl { + ByteStreamSourceParserImpl::create_for_test(ParserConfig { + common: CommonParserConfig { rw_columns }, + specific: SpecificParserConfig::DEFAULT_PLAIN_JSON, + }) + .unwrap() + } + + fn make_upsert_parser(rw_columns: Vec) -> ByteStreamSourceParserImpl { + ByteStreamSourceParserImpl::create_for_test(ParserConfig { + common: CommonParserConfig { rw_columns }, + specific: SpecificParserConfig { + protocol_config: ProtocolProperties::Upsert, + ..SpecificParserConfig::DEFAULT_PLAIN_JSON + }, + }) + .unwrap() + } fn get_payload() -> Vec> { vec![ @@ -251,21 +174,8 @@ mod tests { SourceColumnDesc::simple("interval", DataType::Interval, 11.into()), ]; - let parser = JsonParser::new( - SpecificParserConfig::DEFAULT_PLAIN_JSON, - descs.clone(), - SourceContext::dummy().into(), - ) - .unwrap(); - - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2); - - for payload in get_payload() { - let writer = builder.row_writer(); - parser.parse_inner(payload, writer).await.unwrap(); - } - - let chunk = builder.finish(); + let parser = make_parser(descs); + let chunk = parser.parse(get_payload()).await; let mut rows = chunk.rows(); @@ -361,38 +271,20 @@ mod tests { SourceColumnDesc::simple("v2", DataType::Int16, 1.into()), SourceColumnDesc::simple("v3", DataType::Varchar, 2.into()), ]; - let parser = JsonParser::new( - SpecificParserConfig::DEFAULT_PLAIN_JSON, - descs.clone(), - SourceContext::dummy().into(), - ) - .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 3); - - // Parse a correct record. - { - let writer = builder.row_writer(); - let payload = br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(); - parser.parse_inner(payload, writer).await.unwrap(); - } - // Parse an incorrect record. - { - let writer = builder.row_writer(); + let parser = make_parser(descs); + let payloads = vec![ + // Parse a correct record. + br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(), + // Parse an incorrect record. // `v2` overflowed. - let payload = br#"{"v1": 1, "v2": 65536, "v3": "3"}"#.to_vec(); // ignored the error, and fill None at v2. - parser.parse_inner(payload, writer).await.unwrap(); - } - - // Parse a correct record. - { - let writer = builder.row_writer(); - let payload = br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(); - parser.parse_inner(payload, writer).await.unwrap(); - } + br#"{"v1": 1, "v2": 65536, "v3": "3"}"#.to_vec(), + // Parse a correct record. + br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(), + ]; + let chunk = parser.parse(payloads).await; - let chunk = builder.finish(); assert!(chunk.valid()); assert_eq!(chunk.cardinality(), 3); @@ -432,12 +324,7 @@ mod tests { .map(SourceColumnDesc::from) .collect_vec(); - let parser = JsonParser::new( - SpecificParserConfig::DEFAULT_PLAIN_JSON, - descs.clone(), - SourceContext::dummy().into(), - ) - .unwrap(); + let parser = make_parser(descs); let payload = br#" { "data": { @@ -456,12 +343,8 @@ mod tests { "VarcharCastToI64": "1598197865760800768" } "#.to_vec(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1); - { - let writer = builder.row_writer(); - parser.parse_inner(payload, writer).await.unwrap(); - } - let chunk = builder.finish(); + let chunk = parser.parse(vec![payload]).await; + let (op, row) = chunk.rows().next().unwrap(); assert_eq!(op, Op::Insert); let row = row.into_owned_row().into_inner(); @@ -504,24 +387,15 @@ mod tests { .map(SourceColumnDesc::from) .collect_vec(); - let parser = JsonParser::new( - SpecificParserConfig::DEFAULT_PLAIN_JSON, - descs.clone(), - SourceContext::dummy().into(), - ) - .unwrap(); + let parser = make_parser(descs); let payload = br#" { "struct": "{\"varchar\": \"varchar\", \"boolean\": true}" } "# .to_vec(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1); - { - let writer = builder.row_writer(); - parser.parse_inner(payload, writer).await.unwrap(); - } - let chunk = builder.finish(); + let chunk = parser.parse(vec![payload]).await; + let (op, row) = chunk.rows().next().unwrap(); assert_eq!(op, Op::Insert); let row = row.into_owned_row().into_inner(); @@ -550,12 +424,7 @@ mod tests { .map(SourceColumnDesc::from) .collect_vec(); - let parser = JsonParser::new( - SpecificParserConfig::DEFAULT_PLAIN_JSON, - descs.clone(), - SourceContext::dummy().into(), - ) - .unwrap(); + let parser = make_parser(descs); let payload = br#" { "struct": { @@ -564,12 +433,8 @@ mod tests { } "# .to_vec(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1); - { - let writer = builder.row_writer(); - parser.parse_inner(payload, writer).await.unwrap(); - } - let chunk = builder.finish(); + let chunk = parser.parse(vec![payload]).await; + let (op, row) = chunk.rows().next().unwrap(); assert_eq!(op, Op::Insert); let row = row.into_owned_row().into_inner(); @@ -591,7 +456,10 @@ mod tests { (r#"{"a":2}"#, r#"{"a":2,"b":2}"#), (r#"{"a":2}"#, r#""#), ] - .to_vec(); + .into_iter() + .map(|(k, v)| (k.as_bytes().to_vec(), v.as_bytes().to_vec())) + .collect_vec(); + let key_column_desc = SourceColumnDesc { name: "rw_key".into(), data_type: DataType::Bytea, @@ -609,34 +477,9 @@ mod tests { SourceColumnDesc::simple("b", DataType::Int32, 1.into()), key_column_desc, ]; - let props = SpecificParserConfig { - key_encoding_config: None, - encoding_config: EncodingProperties::Json(JsonProperties { - use_schema_registry: false, - timestamptz_handling: None, - }), - protocol_config: ProtocolProperties::Upsert, - }; - let mut parser = UpsertParser::new(props, descs.clone(), SourceContext::dummy().into()) - .await - .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); - for item in items { - parser - .parse_inner( - Some(item.0.as_bytes().to_vec()), - if !item.1.is_empty() { - Some(item.1.as_bytes().to_vec()) - } else { - None - }, - builder.row_writer(), - ) - .await - .unwrap(); - } - let chunk = builder.finish(); + let parser = make_upsert_parser(descs); + let chunk = parser.parse_upsert(items).await; // expected chunk // +---+---+---+------------------+ diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 2c0643af6710..be697d990a39 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -45,7 +45,8 @@ pub use self::mysql::mysql_row_to_owned_row; use self::plain_parser::PlainParser; pub use self::postgres::postgres_row_to_owned_row; use self::simd_json_parser::DebeziumJsonAccessBuilder; -pub use self::unified::json::TimestamptzHandling; +pub use self::unified::json::{JsonAccess, TimestamptzHandling}; +pub use self::unified::Access; use self::unified::AccessImpl; use self::upsert_parser::UpsertParser; use self::util::get_kafka_topic; @@ -868,7 +869,6 @@ impl AccessBuilderImpl { #[derive(Debug)] pub enum ByteStreamSourceParserImpl { Csv(CsvParser), - Json(JsonParser), Debezium(DebeziumParser), Plain(PlainParser), Upsert(UpsertParser), @@ -883,7 +883,6 @@ impl ByteStreamSourceParserImpl { #[auto_enum(futures03::Stream)] let stream = match self { Self::Csv(parser) => parser.into_stream(msg_stream), - Self::Json(parser) => parser.into_stream(msg_stream), Self::Debezium(parser) => parser.into_stream(msg_stream), Self::DebeziumMongoJson(parser) => parser.into_stream(msg_stream), Self::Maxwell(parser) => parser.into_stream(msg_stream), @@ -944,6 +943,53 @@ impl ByteStreamSourceParserImpl { } } +/// Test utilities for [`ByteStreamSourceParserImpl`]. +#[cfg(test)] +pub mod test_utils { + use futures::StreamExt as _; + use itertools::Itertools as _; + + use super::*; + + #[easy_ext::ext(ByteStreamSourceParserImplTestExt)] + pub(crate) impl ByteStreamSourceParserImpl { + /// Parse the given payloads into a [`StreamChunk`]. + async fn parse(self, payloads: Vec>) -> StreamChunk { + let source_messages = payloads + .into_iter() + .map(|p| SourceMessage { + payload: (!p.is_empty()).then_some(p), + ..SourceMessage::dummy() + }) + .collect_vec(); + + self.into_stream(futures::stream::once(async { Ok(source_messages) }).boxed()) + .next() + .await + .unwrap() + .unwrap() + } + + /// Parse the given key-value pairs into a [`StreamChunk`]. + async fn parse_upsert(self, kvs: Vec<(Vec, Vec)>) -> StreamChunk { + let source_messages = kvs + .into_iter() + .map(|(k, v)| SourceMessage { + key: (!k.is_empty()).then_some(k), + payload: (!v.is_empty()).then_some(v), + ..SourceMessage::dummy() + }) + .collect_vec(); + + self.into_stream(futures::stream::once(async { Ok(source_messages) }).boxed()) + .next() + .await + .unwrap() + .unwrap() + } + } +} + #[derive(Debug, Clone, Default)] pub struct ParserConfig { pub common: CommonParserConfig, diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index b670568dc6e4..a4996eabbf82 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -316,8 +316,16 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result /// Stream of [`SourceMessage`]. pub type BoxSourceStream = BoxStream<'static, crate::error::ConnectorResult>>; -pub trait ChunkSourceStream = - Stream> + Send + 'static; +// Manually expand the trait alias to improve IDE experience. +pub trait ChunkSourceStream: + Stream> + Send + 'static +{ +} +impl ChunkSourceStream for T where + T: Stream> + Send + 'static +{ +} + pub type BoxChunkSourceStream = BoxStream<'static, crate::error::ConnectorResult>; pub type BoxTryStream = BoxStream<'static, crate::error::ConnectorResult>;