Skip to content

Commit

Permalink
chore(bench): add plain_parser and json_parser comparison bench (#16526)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuefengze authored Apr 29, 2024
1 parent 45e9f77 commit fbc3a6c
Showing 1 changed file with 64 additions and 8 deletions.
72 changes: 64 additions & 8 deletions src/connector/benches/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::catalog::ColumnId;
use risingwave_common::types::{DataType, Date, Timestamp};
use risingwave_connector::parser::plain_parser::PlainParser;
use risingwave_connector::parser::{
DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig,
DebeziumParser, JsonParser, SourceStreamChunkBuilder, SpecificParserConfig,
};
use risingwave_connector::source::{SourceColumnDesc, SourceContext};

Expand Down Expand Up @@ -137,13 +137,12 @@ fn bench_json_parser(c: &mut Criterion) {
b.to_async(&rt).iter_batched(
|| records.clone(),
|records| async {
let mut parser = rt
.block_on(PlainParser::new(
SpecificParserConfig::DEFAULT_PLAIN_JSON,
descs.clone(),
ctx.clone(),
))
.unwrap();
let mut parser = block_on(PlainParser::new(
SpecificParserConfig::DEFAULT_PLAIN_JSON,
descs.clone(),
ctx.clone(),
))
.unwrap();
let mut builder =
SourceStreamChunkBuilder::with_capacity(descs.clone(), NUM_RECORDS);
for record in records {
Expand All @@ -159,9 +158,66 @@ fn bench_json_parser(c: &mut Criterion) {
});
}

fn bench_plain_parser_and_json_parser(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let records = generate_json_rows();

let mut group = c.benchmark_group("plain parser and json parser comparison");

group.bench_function("plain_parser", |b| {
b.to_async(&rt).iter_batched(
|| {
let parser = block_on(PlainParser::new(
SpecificParserConfig::DEFAULT_PLAIN_JSON,
get_descs(),
SourceContext::dummy().into(),
))
.unwrap();
(parser, records.clone())
},
|(mut parser, records)| async move {
let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS);
for record in records {
let writer = builder.row_writer();
parser
.parse_inner(None, Some(record), writer)
.await
.unwrap();
}
},
BatchSize::SmallInput,
)
});

group.bench_function("json_parser", |b| {
b.to_async(&rt).iter_batched(
|| {
let parser = JsonParser::new(
SpecificParserConfig::DEFAULT_PLAIN_JSON,
get_descs(),
SourceContext::dummy().into(),
)
.unwrap();
(parser, records.clone())
},
|(parser, records)| async move {
let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS);
for record in records {
let writer = builder.row_writer();
parser.parse_inner(record, writer).await.unwrap();
}
},
BatchSize::SmallInput,
)
});

group.finish();
}

criterion_group!(
benches,
bench_json_parser,
bench_plain_parser_and_json_parser,
bench_debezium_json_parser_create,
bench_debezium_json_parser_read,
bench_debezium_json_parser_update,
Expand Down

0 comments on commit fbc3a6c

Please sign in to comment.