Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: deprecate StreamChunkWithState #14524

Merged
merged 47 commits into from
Jan 25, 2024
Merged
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
976ed2a
add offset/partition cols upon building SourceDesc
Rossil2012 Jan 11, 2024
f2d9466
batch rename
Rossil2012 Jan 12, 2024
095f3a1
refactor nexmark reader
Rossil2012 Jan 12, 2024
2217dc6
fix nexmark
Rossil2012 Jan 12, 2024
9fd4670
refactor datagen
Rossil2012 Jan 12, 2024
e3ac101
fmt
Rossil2012 Jan 12, 2024
f696d25
refactor opendal reader
Rossil2012 Jan 12, 2024
3fe8df9
fix test of plain_parser
Rossil2012 Jan 12, 2024
df51578
refactor pulsar-iceberg source
Rossil2012 Jan 15, 2024
c40634e
refactor fetch_executor roughly
Rossil2012 Jan 15, 2024
5d570fc
refactor parser
Rossil2012 Jan 15, 2024
7035694
fix misc
Rossil2012 Jan 15, 2024
e8ea63f
fix executors
Rossil2012 Jan 15, 2024
0ffdf3b
fix misc
Rossil2012 Jan 15, 2024
a544eb0
const additional column name
Rossil2012 Jan 15, 2024
8c1f930
refactor fetch_executor
Rossil2012 Jan 15, 2024
633c710
simplify import
Rossil2012 Jan 15, 2024
d5203d2
fix comments
Rossil2012 Jan 16, 2024
4b70038
refactor column_catalogs_to_source_column_descs
Rossil2012 Jan 17, 2024
1df6e82
fix test_source_executor
Rossil2012 Jan 17, 2024
f8f3a45
prune chunk conditionally
Rossil2012 Jan 17, 2024
7098e96
fix e2e-source-test
Rossil2012 Jan 17, 2024
9fdc2a1
debug
Rossil2012 Jan 17, 2024
9fc26d2
debug
Rossil2012 Jan 17, 2024
b0a3b04
fix hidden columns
Rossil2012 Jan 19, 2024
830e389
fix debezuim mongo
Rossil2012 Jan 19, 2024
7f07678
Merge branch 'main' into kanzhen/deprecate_chunk_with_state
Rossil2012 Jan 19, 2024
40b5024
fix confict
Rossil2012 Jan 19, 2024
3216a23
fix mongo test
Rossil2012 Jan 19, 2024
2aae930
Merge branch 'main' into kanzhen/deprecate_chunk_with_state
Rossil2012 Jan 20, 2024
e569e5d
fix simulation test
Rossil2012 Jan 22, 2024
6e32500
fix simulation test
Rossil2012 Jan 22, 2024
91480d4
fix simulation test
Rossil2012 Jan 22, 2024
62df41d
determine hidden without name
Rossil2012 Jan 22, 2024
052f852
fix unit test
Rossil2012 Jan 22, 2024
a18f361
avoid building a source_desc in from_proto
Rossil2012 Jan 22, 2024
08f8f12
chores
Rossil2012 Jan 23, 2024
e5ce20d
fix comments
Rossil2012 Jan 25, 2024
8ac8f68
refactor datagen with StreamChunkBuilder
Rossil2012 Jan 25, 2024
ab1e53a
refactor nexmark with StreamChunkBuilder
Rossil2012 Jan 25, 2024
487fc1c
fix warning
Rossil2012 Jan 25, 2024
60aa198
resolve conflicts
Rossil2012 Jan 25, 2024
f78c256
fix unit test
Rossil2012 Jan 25, 2024
23f480c
fix conflict
Rossil2012 Jan 25, 2024
a2888cf
add comments
Rossil2012 Jan 25, 2024
3625684
fix conflict
Rossil2012 Jan 25, 2024
e39c486
fix conflicts
Rossil2012 Jan 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor datagen with StreamChunkBuilder
Rossil2012 committed Jan 25, 2024
commit 8ac8f68787bee8d54d8740398654a84cf148de47
26 changes: 13 additions & 13 deletions src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::Result;
use futures_async_stream::try_stream;
use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::error::RwError;
use risingwave_common::field_generator::FieldGeneratorImpl;
@@ -25,10 +26,6 @@ use risingwave_common::util::iter_util::ZipEqFast;
use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
use crate::source::{SourceMessage, SourceMeta, SplitId};

fn get_addition_key_name(field_name: &str) -> Option<&str> {
field_name.strip_prefix("_rw_datagen_")
}

pub enum FieldDesc {
// field is invisible, generate None
Invisible,
@@ -170,23 +167,25 @@ impl DatagenEventGenerator {
// generate `partition_rows_per_second` rows per second
interval.tick().await;
let mut rows_generated_this_second = 0;
let mut chunk_builder =
StreamChunkBuilder::new(MAX_ROWS_PER_YIELD as usize, self.data_types.clone());
while rows_generated_this_second < self.partition_rows_per_second {
let mut rows = vec![];
self.offset += 1;
let num_rows_to_generate = std::cmp::min(
MAX_ROWS_PER_YIELD,
self.partition_rows_per_second - rows_generated_this_second,
);
'outer: for _ in 0..num_rows_to_generate {
'outer: for i in 0..num_rows_to_generate {
let mut row = Vec::with_capacity(self.data_types.len());
for (field_generator, field_name) in
self.fields_vec.iter_mut().zip_eq_fast(&self.field_names)
{
let datum = match field_generator {
FieldDesc::Invisible => match get_addition_key_name(field_name) {
Some("partition") => {
FieldDesc::Invisible => match field_name.as_str() {
"_rw_datagen_partition" => {
Some(ScalarImpl::Utf8(self.split_id.as_ref().into()))
}
Some("offset") => {
"_rw_datagen_offset" => {
Some(ScalarImpl::Utf8(self.offset.to_string().into_boxed_str()))
}
_ => None,
@@ -210,13 +209,14 @@ impl DatagenEventGenerator {
row.push(datum);
}

self.offset += 1;
rows.push((Op::Insert, OwnedRow::new(row)));
rows_generated_this_second += 1;
if let Some(chunk) = chunk_builder.append_row(Op::Insert, OwnedRow::new(row)) {
yield chunk;
}
}

if !rows.is_empty() {
yield StreamChunk::from_rows(&rows, &self.data_types);
if let Some(chunk) = chunk_builder.take() {
yield chunk;
}

if reach_end {