From 9f9ccfe33e64acdb9c460fbcac176a082704984c Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 14 Apr 2023 18:18:53 +0800 Subject: [PATCH 1/3] add bench for json parser Signed-off-by: tabVersion --- Cargo.lock | 1 + src/connector/Cargo.toml | 5 ++ src/connector/benches/parser.rs | 131 ++++++++++++++++++++++++++++++++ 3 files changed, 137 insertions(+) create mode 100644 src/connector/benches/parser.rs diff --git a/Cargo.lock b/Cargo.lock index b74b79059712b..34c5e16a2cf7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6061,6 +6061,7 @@ dependencies = [ "byteorder", "bytes", "chrono", + "criterion", "csv", "duration-str", "enum-as-inner", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 7a485a0ff0277..d01a871a270be 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -99,6 +99,11 @@ urlencoding = "2" workspace-hack = { path = "../workspace-hack" } [dev-dependencies] +criterion = { version = "0.4", features = ["async_tokio", "async"] } rand = "0.8" tempfile = "3" wiremock = "0.5" + +[[bench]] +name = "parser" +harness = false diff --git a/src/connector/benches/parser.rs b/src/connector/benches/parser.rs new file mode 100644 index 0000000000000..110bf44b3ce83 --- /dev/null +++ b/src/connector/benches/parser.rs @@ -0,0 +1,131 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; +use maplit::hashmap; +use rand::Rng; +use risingwave_common::types::DataType; +use risingwave_connector::parser::{JsonParser, SourceStreamChunkBuilder}; +use risingwave_connector::source::SourceColumnDesc; +use serde_json::json; +use tokio::runtime::Runtime; + +fn gen_input(mode: &str, chunk_size: usize, chunk_num: usize) -> Vec>> { + let mut input = Vec::with_capacity(chunk_num); + for _ in 0..chunk_num { + let mut input_inner = Vec::with_capacity(chunk_size); + for _ in 0..chunk_size { + input_inner.push(match mode { + "match" => r#"{"alpha": 1, "bravo": 2, "charlie": 3, "delta": 4}"# + .as_bytes() + .to_vec(), + "mismatch" => { + let convert_case = |s: &str| -> String { + let mut rng = rand::thread_rng(); + let mut result = "".to_string(); + for char in s.chars() { + if rng.gen_bool(0.5) { + result.push(char.to_uppercase().to_string().parse().unwrap()); + } else { + result.push(char.to_lowercase().to_string().parse().unwrap()); + } + } + result + }; + let value = hashmap! { + convert_case("alpha") => json!(1), + convert_case("bravo") => json!(2), + convert_case("charlie") => json!(3), + convert_case("delta") => json!(4), + }; + serde_json::to_string(&value).unwrap().as_bytes().to_vec() + } + _ => unreachable!(), + }); + } + input.push(input_inner); + } + input +} + +fn create_parser( + chunk_size: usize, + chunk_num: usize, + mode: &str, +) -> (JsonParser, Vec, Vec>>) { + let desc = vec![ + SourceColumnDesc::simple("alpha", DataType::Int16, 0.into()), + SourceColumnDesc::simple("bravo", DataType::Int32, 1.into()), + SourceColumnDesc::simple("charlie", DataType::Int64, 2.into()), + SourceColumnDesc::simple("delta", DataType::Int256, 3.into()), + ]; + let parser = JsonParser::new(desc.clone(), Default::default()).unwrap(); + let input = gen_input(mode, chunk_size, chunk_num); + (parser, desc, input) +} + +async fn parse(parser: JsonParser, column_desc: Vec, input: Vec>>) { + for input_inner in input { + let mut builder = + SourceStreamChunkBuilder::with_capacity(column_desc.clone(), input_inner.len()); + for payload in input_inner { + let row_writer = builder.row_writer(); + parser.parse_inner(payload, row_writer).await.unwrap(); + } + builder.finish(); + } +} + +fn bench_parse_match_case(c: &mut Criterion) { + const TOTAL_SIZE: usize = 1024 * 1024usize; + let rt = Runtime::new().unwrap(); + + for chunk_size in &[32, 128, 512, 1024, 2048, 4096] { + c.bench_with_input( + BenchmarkId::new("ParseMatchCase", chunk_size), + chunk_size, + |b, &chunk_size| { + let chunk_num = TOTAL_SIZE / chunk_size; + b.to_async(&rt).iter_batched( + || create_parser(chunk_size, chunk_num, "match"), + |(parser, column_desc, input)| parse(parser, column_desc, input), + BatchSize::SmallInput, + ); + }, + ); + } +} + +fn bench_parse_not_match_case(c: &mut Criterion) { + const TOTAL_SIZE: usize = 1024 * 1024usize; + let rt = Runtime::new().unwrap(); + + for chunk_size in &[32, 128, 512, 1024, 2048, 4096] { + c.bench_with_input( + BenchmarkId::new("ParseMatchCase", chunk_size), + chunk_size, + |b, &chunk_size| { + let chunk_num = TOTAL_SIZE / chunk_size; + b.to_async(&rt).iter_batched( + || create_parser(chunk_size, chunk_num, "mismatch"), + |(parser, column_desc, input)| parse(parser, column_desc, input), + BatchSize::SmallInput, + ); + }, + ); + } +} + +criterion_group!(benches, bench_parse_match_case, bench_parse_not_match_case); +criterion_main!(benches); From ae35518622110b341561456787f18f6216e9dc65 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 14 Apr 2023 18:22:03 +0800 Subject: [PATCH 2/3] fix Signed-off-by: tabVersion --- src/connector/benches/parser.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/benches/parser.rs b/src/connector/benches/parser.rs index 110bf44b3ce83..b0456a33ac4a8 100644 --- a/src/connector/benches/parser.rs +++ b/src/connector/benches/parser.rs @@ -113,7 +113,7 @@ fn bench_parse_not_match_case(c: &mut Criterion) { for chunk_size in &[32, 128, 512, 1024, 2048, 4096] { c.bench_with_input( - BenchmarkId::new("ParseMatchCase", chunk_size), + BenchmarkId::new("ParseMisMatchCase", chunk_size), chunk_size, |b, &chunk_size| { let chunk_num = TOTAL_SIZE / chunk_size; From 6476bd2b9ddd63f47ed2022989eb9870ad066aa0 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 14 Apr 2023 19:25:00 +0800 Subject: [PATCH 3/3] fix Signed-off-by: tabVersion --- src/connector/benches/parser.rs | 30 +++++++++--------------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/src/connector/benches/parser.rs b/src/connector/benches/parser.rs index b0456a33ac4a8..5a2390c3e4fe8 100644 --- a/src/connector/benches/parser.rs +++ b/src/connector/benches/parser.rs @@ -68,7 +68,7 @@ fn create_parser( SourceColumnDesc::simple("alpha", DataType::Int16, 0.into()), SourceColumnDesc::simple("bravo", DataType::Int32, 1.into()), SourceColumnDesc::simple("charlie", DataType::Int64, 2.into()), - SourceColumnDesc::simple("delta", DataType::Int256, 3.into()), + SourceColumnDesc::simple("delta", DataType::Int64, 3.into()), ]; let parser = JsonParser::new(desc.clone(), Default::default()).unwrap(); let input = gen_input(mode, chunk_size, chunk_num); @@ -87,7 +87,7 @@ async fn parse(parser: JsonParser, column_desc: Vec, input: Ve } } -fn bench_parse_match_case(c: &mut Criterion) { +fn do_bench(c: &mut Criterion, mode: &str) { const TOTAL_SIZE: usize = 1024 * 1024usize; let rt = Runtime::new().unwrap(); @@ -98,7 +98,7 @@ fn bench_parse_match_case(c: &mut Criterion) { |b, &chunk_size| { let chunk_num = TOTAL_SIZE / chunk_size; b.to_async(&rt).iter_batched( - || create_parser(chunk_size, chunk_num, "match"), + || create_parser(chunk_size, chunk_num, mode), |(parser, column_desc, input)| parse(parser, column_desc, input), BatchSize::SmallInput, ); @@ -107,25 +107,13 @@ fn bench_parse_match_case(c: &mut Criterion) { } } -fn bench_parse_not_match_case(c: &mut Criterion) { - const TOTAL_SIZE: usize = 1024 * 1024usize; - let rt = Runtime::new().unwrap(); +fn bench_parse_match_case(c: &mut Criterion) { + do_bench(c, "match"); +} - for chunk_size in &[32, 128, 512, 1024, 2048, 4096] { - c.bench_with_input( - BenchmarkId::new("ParseMisMatchCase", chunk_size), - chunk_size, - |b, &chunk_size| { - let chunk_num = TOTAL_SIZE / chunk_size; - b.to_async(&rt).iter_batched( - || create_parser(chunk_size, chunk_num, "mismatch"), - |(parser, column_desc, input)| parse(parser, column_desc, input), - BatchSize::SmallInput, - ); - }, - ); - } +fn bench_parse_mismatch_case(c: &mut Criterion) { + do_bench(c, "mismatch"); } -criterion_group!(benches, bench_parse_match_case, bench_parse_not_match_case); +criterion_group!(benches, bench_parse_match_case, bench_parse_mismatch_case); criterion_main!(benches);