diff --git a/src/connector/benches/json_parser.rs b/src/connector/benches/json_parser.rs index 90c78044d622..5a12dec735ca 100644 --- a/src/connector/benches/json_parser.rs +++ b/src/connector/benches/json_parser.rs @@ -23,7 +23,7 @@ use risingwave_common::catalog::ColumnId; use risingwave_common::types::{DataType, Date, Timestamp}; use risingwave_connector::parser::plain_parser::PlainParser; use risingwave_connector::parser::{ - DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig, + DebeziumParser, JsonParser, SourceStreamChunkBuilder, SpecificParserConfig, }; use risingwave_connector::source::{SourceColumnDesc, SourceContext}; @@ -137,13 +137,12 @@ fn bench_json_parser(c: &mut Criterion) { b.to_async(&rt).iter_batched( || records.clone(), |records| async { - let mut parser = rt - .block_on(PlainParser::new( - SpecificParserConfig::DEFAULT_PLAIN_JSON, - descs.clone(), - ctx.clone(), - )) - .unwrap(); + let mut parser = block_on(PlainParser::new( + SpecificParserConfig::DEFAULT_PLAIN_JSON, + descs.clone(), + ctx.clone(), + )) + .unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(descs.clone(), NUM_RECORDS); for record in records { @@ -159,9 +158,66 @@ fn bench_json_parser(c: &mut Criterion) { }); } +fn bench_plain_parser_and_json_parser(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let records = generate_json_rows(); + + let mut group = c.benchmark_group("plain parser and json parser comparison"); + + group.bench_function("plain_parser", |b| { + b.to_async(&rt).iter_batched( + || { + let parser = block_on(PlainParser::new( + SpecificParserConfig::DEFAULT_PLAIN_JSON, + get_descs(), + SourceContext::dummy().into(), + )) + .unwrap(); + (parser, records.clone()) + }, + |(mut parser, records)| async move { + let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS); + for record in records { + let writer = builder.row_writer(); + parser + .parse_inner(None, Some(record), writer) + .await + .unwrap(); + } + }, + BatchSize::SmallInput, + ) + }); + + group.bench_function("json_parser", |b| { + b.to_async(&rt).iter_batched( + || { + let parser = JsonParser::new( + SpecificParserConfig::DEFAULT_PLAIN_JSON, + get_descs(), + SourceContext::dummy().into(), + ) + .unwrap(); + (parser, records.clone()) + }, + |(parser, records)| async move { + let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS); + for record in records { + let writer = builder.row_writer(); + parser.parse_inner(record, writer).await.unwrap(); + } + }, + BatchSize::SmallInput, + ) + }); + + group.finish(); +} + criterion_group!( benches, bench_json_parser, + bench_plain_parser_and_json_parser, bench_debezium_json_parser_create, bench_debezium_json_parser_read, bench_debezium_json_parser_update,