Skip to content

Commit

Permalink
refactor(source): remove Default impl from SourceContext and `Sou…
Browse files Browse the repository at this point in the history
…rceEnumeratorContext` (#16379)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Apr 18, 2024
1 parent 9d9d205 commit 3445741
Show file tree
Hide file tree
Showing 34 changed files with 156 additions and 104 deletions.
2 changes: 1 addition & 1 deletion src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ impl SourceExecutor {
u32::MAX,
self.source_id,
u32::MAX,
"NA".to_owned(), // source name was not passed in batch plan
self.metrics,
self.source_ctrl_opts.clone(),
ConnectorProperties::default(),
"NA".to_owned(), // source name was not passed in batch plan
));
let stream = self
.source
Expand Down
16 changes: 10 additions & 6 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ use risingwave_connector::sink::{
use risingwave_connector::source::datagen::{
DatagenProperties, DatagenSplitEnumerator, DatagenSplitReader,
};
use risingwave_connector::source::{Column, DataType, SplitEnumerator, SplitReader};
use risingwave_connector::source::{
Column, DataType, SourceContext, SourceEnumeratorContext, SplitEnumerator, SplitReader,
};
use risingwave_pb::connector_service::SinkPayloadFormat;
use risingwave_stream::executor::test_utils::prelude::ColumnDesc;
use risingwave_stream::executor::{Barrier, Message, MessageStreamItem, StreamExecutorError};
Expand Down Expand Up @@ -200,10 +202,12 @@ impl MockDatagenSource {
rows_per_second,
fields: HashMap::default(),
};
let mut datagen_enumerator =
DatagenSplitEnumerator::new(properties.clone(), Default::default())
.await
.unwrap();
let mut datagen_enumerator = DatagenSplitEnumerator::new(
properties.clone(),
SourceEnumeratorContext::dummy().into(),
)
.await
.unwrap();
let parser_config = ParserConfig {
specific: SpecificParserConfig {
key_encoding_config: None,
Expand All @@ -220,7 +224,7 @@ impl MockDatagenSource {
properties.clone(),
vec![splits],
parser_config.clone(),
Default::default(),
SourceContext::dummy().into(),
Some(source_schema.clone()),
)
.await
Expand Down
5 changes: 4 additions & 1 deletion src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ async fn test_table_materialize() -> StreamResult<()> {
Arc::new(StreamingMetrics::unused()),
barrier_rx,
system_params_manager.get_params(),
SourceCtrlOpts::default(),
SourceCtrlOpts {
chunk_size: 1024,
rate_limit: None,
},
)
.boxed(),
);
Expand Down
2 changes: 1 addition & 1 deletion src/connector/benches/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ fn bench_json_parser(c: &mut Criterion) {
.build()
.unwrap();
let records = generate_json_rows();
let ctx = Arc::new(SourceContext::default());
let ctx = Arc::new(SourceContext::dummy());
c.bench_function("json_parser", |b| {
b.to_async(&rt).iter_batched(
|| records.clone(),
Expand Down
5 changes: 3 additions & 2 deletions src/connector/benches/nexmark_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use risingwave_connector::parser::{
ByteStreamSourceParser, JsonParser, SourceParserIntoStreamExt, SpecificParserConfig,
};
use risingwave_connector::source::{
BoxChunkSourceStream, BoxSourceStream, SourceColumnDesc, SourceMessage, SourceMeta,
BoxChunkSourceStream, BoxSourceStream, SourceColumnDesc, SourceContext, SourceMessage,
SourceMeta,
};
use tracing::Level;
use tracing_subscriber::prelude::*;
Expand Down Expand Up @@ -87,7 +88,7 @@ fn make_parser() -> impl ByteStreamSourceParser {

let props = SpecificParserConfig::DEFAULT_PLAIN_JSON;

JsonParser::new(props, columns, Default::default()).unwrap()
JsonParser::new(props, columns, SourceContext::dummy().into()).unwrap()
}

fn make_stream_iter() -> impl Iterator<Item = StreamChunk> {
Expand Down
4 changes: 2 additions & 2 deletions src/connector/benches/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_connector::parser::{
EncodingProperties, JsonParser, JsonProperties, ProtocolProperties, SourceStreamChunkBuilder,
SpecificParserConfig,
};
use risingwave_connector::source::SourceColumnDesc;
use risingwave_connector::source::{SourceColumnDesc, SourceContext};
use serde_json::json;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -81,7 +81,7 @@ fn create_parser(
}),
protocol_config: ProtocolProperties::Plain,
};
let parser = JsonParser::new(props, desc.clone(), Default::default()).unwrap();
let parser = JsonParser::new(props, desc.clone(), SourceContext::dummy().into()).unwrap();
let input = gen_input(mode, chunk_size, chunk_num);
(parser, desc, input)
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#![feature(iterator_try_collect)]
#![feature(try_blocks)]
#![feature(error_generic_member_access)]
#![feature(negative_impls)]
#![feature(register_tool)]
#![register_tool(rw)]
#![recursion_limit = "256"]
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ mod test {
use crate::parser::{
AccessBuilderImpl, EncodingType, SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::SourceColumnDesc;
use crate::source::{SourceColumnDesc, SourceContext};

fn test_data_path(file_name: &str) -> String {
let curr_dir = env::current_dir().unwrap().into_os_string();
Expand Down Expand Up @@ -279,7 +279,7 @@ mod test {
EncodingType::Value,
)?),
rw_columns: Vec::default(),
source_ctx: Default::default(),
source_ctx: SourceContext::dummy().into(),
transaction_meta_builder: None,
})
}
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ mod tests {
BytesProperties, EncodingProperties, ProtocolProperties, SourceColumnDesc,
SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::SourceContext;

fn get_payload() -> Vec<Vec<u8>> {
vec![br#"t"#.to_vec(), br#"random"#.to_vec()]
Expand All @@ -66,7 +67,7 @@ mod tests {
encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }),
protocol_config: ProtocolProperties::Plain,
};
let mut parser = PlainParser::new(props, descs.clone(), Default::default())
let mut parser = PlainParser::new(props, descs.clone(), SourceContext::dummy().into())
.await
.unwrap();

Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ mod tests {
];
let parser = CanalJsonParser::new(
descs.clone(),
Default::default(),
SourceContext::dummy().into(),
&JsonProperties::default(),
)
.unwrap();
Expand Down Expand Up @@ -229,7 +229,7 @@ mod tests {

let parser = CanalJsonParser::new(
descs.clone(),
Default::default(),
SourceContext::dummy().into(),
&JsonProperties::default(),
)
.unwrap();
Expand Down Expand Up @@ -283,7 +283,7 @@ mod tests {

let parser = CanalJsonParser::new(
descs.clone(),
Default::default(),
SourceContext::dummy().into(),
&JsonProperties::default(),
)
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/csv_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ mod tests {
delimiter: b',',
has_header: false,
},
Default::default(),
SourceContext::dummy().into(),
)
.unwrap();
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4);
Expand Down Expand Up @@ -299,7 +299,7 @@ mod tests {
delimiter: b',',
has_header: true,
},
Default::default(),
SourceContext::dummy().into(),
)
.unwrap();
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4);
Expand Down
11 changes: 7 additions & 4 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ mod tests {
use crate::parser::{
DebeziumAvroParserConfig, DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::SourceColumnDesc;
use crate::source::{SourceColumnDesc, SourceContext};

const DEBEZIUM_AVRO_DATA: &[u8] = b"\x00\x00\x00\x00\x06\x00\x02\xd2\x0f\x0a\x53\x61\x6c\x6c\x79\x0c\x54\x68\x6f\x6d\x61\x73\x2a\x73\x61\x6c\x6c\x79\x2e\x74\x68\x6f\x6d\x61\x73\x40\x61\x63\x6d\x65\x2e\x63\x6f\x6d\x16\x32\x2e\x31\x2e\x32\x2e\x46\x69\x6e\x61\x6c\x0a\x6d\x79\x73\x71\x6c\x12\x64\x62\x73\x65\x72\x76\x65\x72\x31\xc0\xb4\xe8\xb7\xc9\x61\x00\x30\x66\x69\x72\x73\x74\x5f\x69\x6e\x5f\x64\x61\x74\x61\x5f\x63\x6f\x6c\x6c\x65\x63\x74\x69\x6f\x6e\x12\x69\x6e\x76\x65\x6e\x74\x6f\x72\x79\x00\x02\x12\x63\x75\x73\x74\x6f\x6d\x65\x72\x73\x00\x00\x20\x6d\x79\x73\x71\x6c\x2d\x62\x69\x6e\x2e\x30\x30\x30\x30\x30\x33\x8c\x06\x00\x00\x00\x02\x72\x02\x92\xc3\xe8\xb7\xc9\x61\x00";

Expand Down Expand Up @@ -367,9 +367,12 @@ mod tests {
.map(CatColumnDesc::from)
.map(|c| SourceColumnDesc::from(&c))
.collect_vec();
let parser =
DebeziumParser::new(parser_config, columns.clone(), Arc::new(Default::default()))
.await?;
let parser = DebeziumParser::new(
parser_config,
columns.clone(),
SourceContext::dummy().into(),
)
.await?;
let [(op, row)]: [_; 1] = parse_one(parser, columns, DEBEZIUM_AVRO_DATA.to_vec())
.await
.try_into()
Expand Down
13 changes: 7 additions & 6 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use crate::parser::unified::debezium::DebeziumChangeEvent;
use crate::parser::unified::json::TimestamptzHandling;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, JsonProperties,
ParseResult, ParserFormat, ProtocolProperties, SourceStreamChunkRowWriter,
SpecificParserConfig,
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParseResult,
ParserFormat, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

Expand Down Expand Up @@ -112,6 +111,8 @@ impl DebeziumParser {
}

pub async fn new_for_test(rw_columns: Vec<SourceColumnDesc>) -> ConnectorResult<Self> {
use crate::parser::JsonProperties;

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
Expand All @@ -120,7 +121,7 @@ impl DebeziumParser {
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
Self::new(props, rw_columns, Default::default()).await
Self::new(props, rw_columns, SourceContext::dummy().into()).await
}

pub async fn parse_inner(
Expand Down Expand Up @@ -199,7 +200,7 @@ mod tests {
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};

use super::*;
use crate::parser::{SourceStreamChunkBuilder, TransactionControl};
use crate::parser::{JsonProperties, SourceStreamChunkBuilder, TransactionControl};
use crate::source::{ConnectorProperties, DataType};

#[tokio::test]
Expand Down Expand Up @@ -228,7 +229,7 @@ mod tests {
};
let source_ctx = SourceContext {
connector_props: ConnectorProperties::PostgresCdc(Box::default()),
..Default::default()
..SourceContext::dummy()
};
let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
.await
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/parser/debezium/mongo_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ mod tests {
SourceColumnDesc::simple("_id", DataType::Varchar, ColumnId::from(0)),
SourceColumnDesc::simple("payload", DataType::Jsonb, ColumnId::from(1)),
];
let mut parser = DebeziumMongoJsonParser::new(columns.clone(), Default::default()).unwrap();
let mut parser =
DebeziumMongoJsonParser::new(columns.clone(), SourceContext::dummy().into()).unwrap();
let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 3);
let writer = builder.row_writer();
parser
Expand Down Expand Up @@ -218,7 +219,8 @@ mod tests {
];
for data in input {
let mut parser =
DebeziumMongoJsonParser::new(columns.clone(), Default::default()).unwrap();
DebeziumMongoJsonParser::new(columns.clone(), SourceContext::dummy().into())
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 3);

Expand Down
Loading

0 comments on commit 3445741

Please sign in to comment.