diff --git a/Cargo.toml b/Cargo.toml index 2380045fb0c50..ed677ebc2b2f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -266,6 +266,9 @@ redundant_explicit_links = "allow" [profile.dev] lto = 'off' +[profile.bench] +lto = 'off' + [profile.release] debug = "full" split-debuginfo = "packed" diff --git a/src/connector/benches/nexmark_integration.rs b/src/connector/benches/nexmark_integration.rs index c2c735d1c2089..172931562efef 100644 --- a/src/connector/benches/nexmark_integration.rs +++ b/src/connector/benches/nexmark_integration.rs @@ -26,7 +26,7 @@ use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use futures::{FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; use risingwave_common::array::StreamChunk; -use risingwave_common::catalog::ColumnId; +use risingwave_common::catalog::{ColumnDesc, ColumnId}; use risingwave_common::types::DataType; use risingwave_connector::parser::{ ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig, SpecificParserConfig, @@ -37,15 +37,12 @@ use risingwave_connector::source::{ use tracing::Level; use tracing_subscriber::prelude::*; -static BATCH: LazyLock> = LazyLock::new(make_batch); +static BATCH: LazyLock> = LazyLock::new(|| make_batch(false)); +static STRUCT_BATCH: LazyLock> = LazyLock::new(|| make_batch(true)); -fn make_batch() -> Vec { +fn make_batch(use_struct: bool) -> Vec { let mut generator = nexmark::EventGenerator::default() .with_type_filter(nexmark::event::EventType::Bid) - .map(|e| match e { - nexmark::event::Event::Bid(bid) => bid, // extract the bid event - _ => unreachable!(), - }) .enumerate(); let message_base = SourceMessage { @@ -59,8 +56,15 @@ fn make_batch() -> Vec { generator .by_ref() .take(16384) - .map(|(i, e)| { - let payload = serde_json::to_vec(&e).unwrap(); + .map(|(i, event)| { + let payload = if use_struct { + serde_json::to_vec(&event).unwrap() + } else { + let nexmark::event::Event::Bid(bid) = event else { + unreachable!() + }; + serde_json::to_vec(&bid).unwrap() + }; SourceMessage { payload: Some(payload), offset: i.to_string(), @@ -70,14 +74,18 @@ fn make_batch() -> Vec { .collect_vec() } -fn make_data_stream() -> BoxSourceStream { - futures::future::ready(Ok(BATCH.clone())) - .into_stream() - .boxed() +fn make_data_stream(use_struct: bool) -> BoxSourceStream { + futures::future::ready(Ok(if use_struct { + STRUCT_BATCH.clone() + } else { + BATCH.clone() + })) + .into_stream() + .boxed() } -fn make_parser() -> ByteStreamSourceParserImpl { - let rw_columns = [ +fn make_parser(use_struct: bool) -> ByteStreamSourceParserImpl { + let fields = vec![ ("auction", DataType::Int64), ("bidder", DataType::Int64), ("price", DataType::Int64), @@ -85,11 +93,23 @@ fn make_parser() -> ByteStreamSourceParserImpl { ("url", DataType::Varchar), ("date_time", DataType::Timestamp), ("extra", DataType::Varchar), - ] - .into_iter() - .enumerate() - .map(|(i, (n, t))| SourceColumnDesc::simple(n, t, ColumnId::new(i as _))) - .collect_vec(); + ]; + + let rw_columns = if use_struct { + let fields = fields + .into_iter() + .enumerate() + .map(|(i, (n, t))| ColumnDesc::named(n, ColumnId::new(i as _), t)) + .collect(); + let struct_col = ColumnDesc::new_struct("bid", 114514, "bid", fields); + vec![(&struct_col).into()] + } else { + fields + .into_iter() + .enumerate() + .map(|(i, (n, t))| SourceColumnDesc::simple(n, t, ColumnId::new(i as _))) + .collect_vec() + }; let config = ParserConfig { common: CommonParserConfig { rw_columns }, @@ -99,8 +119,10 @@ fn make_parser() -> ByteStreamSourceParserImpl { ByteStreamSourceParserImpl::create_for_test(config).unwrap() } -fn make_stream_iter() -> impl Iterator { - let mut stream: BoxChunkSourceStream = make_parser().into_stream(make_data_stream()).boxed(); +fn make_stream_iter(use_struct: bool) -> impl Iterator { + let mut stream: BoxChunkSourceStream = make_parser(use_struct) + .into_stream(make_data_stream(use_struct)) + .boxed(); std::iter::from_fn(move || { stream @@ -116,7 +138,7 @@ fn make_stream_iter() -> impl Iterator { fn bench(c: &mut Criterion) { c.bench_function("parse_nexmark", |b| { b.iter_batched( - make_stream_iter, + || make_stream_iter(false), |mut iter| iter.next().unwrap(), BatchSize::SmallInput, ) @@ -135,11 +157,19 @@ fn bench(c: &mut Criterion) { .into(); b.iter_batched( - make_stream_iter, + || make_stream_iter(false), |mut iter| tracing::dispatcher::with_default(&dispatch, || iter.next().unwrap()), BatchSize::SmallInput, ) }); + + c.bench_function("parse_nexmark_struct_type", |b| { + b.iter_batched( + || make_stream_iter(true), + |mut iter| iter.next().unwrap(), + BatchSize::SmallInput, + ) + }); } criterion_group!(benches, bench);