Skip to content

Commit

Permalink
test(connector): add benchmark for parsing combined (struct-field) ne…
Browse files Browse the repository at this point in the history
…xmark events
  • Loading branch information
BugenZhao committed Jun 10, 2024
1 parent 3e01795 commit fea625f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 24 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ redundant_explicit_links = "allow"
[profile.dev]
lto = 'off'

[profile.bench]
lto = 'off'

[profile.release]
debug = "full"
split-debuginfo = "packed"
Expand Down
78 changes: 54 additions & 24 deletions src/connector/benches/nexmark_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use futures::{FutureExt, StreamExt, TryStreamExt};
use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::ColumnId;
use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::types::DataType;
use risingwave_connector::parser::{
ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig, SpecificParserConfig,
Expand All @@ -37,15 +37,12 @@ use risingwave_connector::source::{
use tracing::Level;
use tracing_subscriber::prelude::*;

static BATCH: LazyLock<Vec<SourceMessage>> = LazyLock::new(make_batch);
static BATCH: LazyLock<Vec<SourceMessage>> = LazyLock::new(|| make_batch(false));
static STRUCT_BATCH: LazyLock<Vec<SourceMessage>> = LazyLock::new(|| make_batch(true));

fn make_batch() -> Vec<SourceMessage> {
fn make_batch(use_struct: bool) -> Vec<SourceMessage> {
let mut generator = nexmark::EventGenerator::default()
.with_type_filter(nexmark::event::EventType::Bid)
.map(|e| match e {
nexmark::event::Event::Bid(bid) => bid, // extract the bid event
_ => unreachable!(),
})
.enumerate();

let message_base = SourceMessage {
Expand All @@ -59,8 +56,15 @@ fn make_batch() -> Vec<SourceMessage> {
generator
.by_ref()
.take(16384)
.map(|(i, e)| {
let payload = serde_json::to_vec(&e).unwrap();
.map(|(i, event)| {
let payload = if use_struct {
serde_json::to_vec(&event).unwrap()
} else {
let nexmark::event::Event::Bid(bid) = event else {
unreachable!()
};
serde_json::to_vec(&bid).unwrap()
};
SourceMessage {
payload: Some(payload),
offset: i.to_string(),
Expand All @@ -70,26 +74,42 @@ fn make_batch() -> Vec<SourceMessage> {
.collect_vec()
}

fn make_data_stream() -> BoxSourceStream {
futures::future::ready(Ok(BATCH.clone()))
.into_stream()
.boxed()
fn make_data_stream(use_struct: bool) -> BoxSourceStream {
futures::future::ready(Ok(if use_struct {
STRUCT_BATCH.clone()
} else {
BATCH.clone()
}))
.into_stream()
.boxed()
}

fn make_parser() -> ByteStreamSourceParserImpl {
let rw_columns = [
fn make_parser(use_struct: bool) -> ByteStreamSourceParserImpl {
let fields = vec![
("auction", DataType::Int64),
("bidder", DataType::Int64),
("price", DataType::Int64),
("channel", DataType::Varchar),
("url", DataType::Varchar),
("date_time", DataType::Timestamp),
("extra", DataType::Varchar),
]
.into_iter()
.enumerate()
.map(|(i, (n, t))| SourceColumnDesc::simple(n, t, ColumnId::new(i as _)))
.collect_vec();
];

let rw_columns = if use_struct {
let fields = fields
.into_iter()
.enumerate()
.map(|(i, (n, t))| ColumnDesc::named(n, ColumnId::new(i as _), t))
.collect();
let struct_col = ColumnDesc::new_struct("bid", 114514, "bid", fields);
vec![(&struct_col).into()]
} else {
fields
.into_iter()
.enumerate()
.map(|(i, (n, t))| SourceColumnDesc::simple(n, t, ColumnId::new(i as _)))
.collect_vec()
};

let config = ParserConfig {
common: CommonParserConfig { rw_columns },
Expand All @@ -99,8 +119,10 @@ fn make_parser() -> ByteStreamSourceParserImpl {
ByteStreamSourceParserImpl::create_for_test(config).unwrap()
}

fn make_stream_iter() -> impl Iterator<Item = StreamChunk> {
let mut stream: BoxChunkSourceStream = make_parser().into_stream(make_data_stream()).boxed();
fn make_stream_iter(use_struct: bool) -> impl Iterator<Item = StreamChunk> {
let mut stream: BoxChunkSourceStream = make_parser(use_struct)
.into_stream(make_data_stream(use_struct))
.boxed();

std::iter::from_fn(move || {
stream
Expand All @@ -116,7 +138,7 @@ fn make_stream_iter() -> impl Iterator<Item = StreamChunk> {
fn bench(c: &mut Criterion) {
c.bench_function("parse_nexmark", |b| {
b.iter_batched(
make_stream_iter,
|| make_stream_iter(false),
|mut iter| iter.next().unwrap(),
BatchSize::SmallInput,
)
Expand All @@ -135,11 +157,19 @@ fn bench(c: &mut Criterion) {
.into();

b.iter_batched(
make_stream_iter,
|| make_stream_iter(false),
|mut iter| tracing::dispatcher::with_default(&dispatch, || iter.next().unwrap()),
BatchSize::SmallInput,
)
});

c.bench_function("parse_nexmark_struct_type", |b| {
b.iter_batched(
|| make_stream_iter(true),
|mut iter| iter.next().unwrap(),
BatchSize::SmallInput,
)
});
}

criterion_group!(benches, bench);
Expand Down

0 comments on commit fea625f

Please sign in to comment.