From 3445741da6136c4c0e99876794fe9f4e33889487 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 18 Apr 2024 18:14:41 +0800 Subject: [PATCH] refactor(source): remove `Default` impl from `SourceContext` and `SourceEnumeratorContext` (#16379) Signed-off-by: Richard Chien --- src/batch/src/executor/source.rs | 2 +- src/bench/sink_bench/main.rs | 16 ++++--- src/compute/tests/integration_tests.rs | 5 +- src/connector/benches/json_parser.rs | 2 +- src/connector/benches/nexmark_integration.rs | 5 +- src/connector/benches/parser.rs | 4 +- src/connector/src/lib.rs | 1 + src/connector/src/parser/avro/parser.rs | 4 +- src/connector/src/parser/bytes_parser.rs | 3 +- .../src/parser/canal/simd_json_parser.rs | 6 +-- src/connector/src/parser/csv_parser.rs | 4 +- .../src/parser/debezium/avro_parser.rs | 11 +++-- .../src/parser/debezium/debezium_parser.rs | 13 ++--- .../src/parser/debezium/mongo_json_parser.rs | 6 ++- .../src/parser/debezium/simd_json_parser.rs | 33 ++++++------- src/connector/src/parser/json_parser.rs | 17 +++---- .../src/parser/maxwell/simd_json_parser.rs | 4 +- src/connector/src/parser/plain_parser.rs | 4 +- src/connector/src/sink/kafka.rs | 2 +- src/connector/src/source/base.rs | 47 ++++++++++++++----- .../src/source/datagen/source/reader.rs | 7 +-- .../src/source/filesystem/s3/enumerator.rs | 2 +- .../src/source/filesystem/s3/source/reader.rs | 8 ++-- .../src/source/kinesis/source/reader.rs | 7 +-- .../src/source/nexmark/source/reader.rs | 10 ++-- src/frontend/src/scheduler/plan_fragmenter.rs | 4 +- src/meta/service/src/cloud_service.rs | 2 +- src/meta/src/rpc/ddl_controller.rs | 2 +- .../src/executor/backfill/cdc/cdc_backfill.rs | 2 +- .../src/executor/source/fetch_executor.rs | 2 +- .../src/executor/source/fs_source_executor.rs | 2 +- .../source/source_backfill_executor.rs | 2 +- .../src/executor/source/source_executor.rs | 16 +++++-- .../src/from_proto/source/trad_source.rs | 5 +- 34 files changed, 156 insertions(+), 104 deletions(-) diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 9a348b25a4025..cbc69444af7f9 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -171,10 +171,10 @@ impl SourceExecutor { u32::MAX, self.source_id, u32::MAX, + "NA".to_owned(), // source name was not passed in batch plan self.metrics, self.source_ctrl_opts.clone(), ConnectorProperties::default(), - "NA".to_owned(), // source name was not passed in batch plan )); let stream = self .source diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 577a8d21d613d..91ebe6bd44d0f 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -46,7 +46,9 @@ use risingwave_connector::sink::{ use risingwave_connector::source::datagen::{ DatagenProperties, DatagenSplitEnumerator, DatagenSplitReader, }; -use risingwave_connector::source::{Column, DataType, SplitEnumerator, SplitReader}; +use risingwave_connector::source::{ + Column, DataType, SourceContext, SourceEnumeratorContext, SplitEnumerator, SplitReader, +}; use risingwave_pb::connector_service::SinkPayloadFormat; use risingwave_stream::executor::test_utils::prelude::ColumnDesc; use risingwave_stream::executor::{Barrier, Message, MessageStreamItem, StreamExecutorError}; @@ -200,10 +202,12 @@ impl MockDatagenSource { rows_per_second, fields: HashMap::default(), }; - let mut datagen_enumerator = - DatagenSplitEnumerator::new(properties.clone(), Default::default()) - .await - .unwrap(); + let mut datagen_enumerator = DatagenSplitEnumerator::new( + properties.clone(), + SourceEnumeratorContext::dummy().into(), + ) + .await + .unwrap(); let parser_config = ParserConfig { specific: SpecificParserConfig { key_encoding_config: None, @@ -220,7 +224,7 @@ impl MockDatagenSource { properties.clone(), vec![splits], parser_config.clone(), - Default::default(), + SourceContext::dummy().into(), Some(source_schema.clone()), ) .await diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index e25027b61573a..49f54c18a5ff6 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -173,7 +173,10 @@ async fn test_table_materialize() -> StreamResult<()> { Arc::new(StreamingMetrics::unused()), barrier_rx, system_params_manager.get_params(), - SourceCtrlOpts::default(), + SourceCtrlOpts { + chunk_size: 1024, + rate_limit: None, + }, ) .boxed(), ); diff --git a/src/connector/benches/json_parser.rs b/src/connector/benches/json_parser.rs index ffbcbb1492444..90c78044d6221 100644 --- a/src/connector/benches/json_parser.rs +++ b/src/connector/benches/json_parser.rs @@ -132,7 +132,7 @@ fn bench_json_parser(c: &mut Criterion) { .build() .unwrap(); let records = generate_json_rows(); - let ctx = Arc::new(SourceContext::default()); + let ctx = Arc::new(SourceContext::dummy()); c.bench_function("json_parser", |b| { b.to_async(&rt).iter_batched( || records.clone(), diff --git a/src/connector/benches/nexmark_integration.rs b/src/connector/benches/nexmark_integration.rs index e6388ed4b0d25..1c05147eeafbb 100644 --- a/src/connector/benches/nexmark_integration.rs +++ b/src/connector/benches/nexmark_integration.rs @@ -26,7 +26,8 @@ use risingwave_connector::parser::{ ByteStreamSourceParser, JsonParser, SourceParserIntoStreamExt, SpecificParserConfig, }; use risingwave_connector::source::{ - BoxChunkSourceStream, BoxSourceStream, SourceColumnDesc, SourceMessage, SourceMeta, + BoxChunkSourceStream, BoxSourceStream, SourceColumnDesc, SourceContext, SourceMessage, + SourceMeta, }; use tracing::Level; use tracing_subscriber::prelude::*; @@ -87,7 +88,7 @@ fn make_parser() -> impl ByteStreamSourceParser { let props = SpecificParserConfig::DEFAULT_PLAIN_JSON; - JsonParser::new(props, columns, Default::default()).unwrap() + JsonParser::new(props, columns, SourceContext::dummy().into()).unwrap() } fn make_stream_iter() -> impl Iterator { diff --git a/src/connector/benches/parser.rs b/src/connector/benches/parser.rs index f0527e119086a..21ce72dd1b2b1 100644 --- a/src/connector/benches/parser.rs +++ b/src/connector/benches/parser.rs @@ -20,7 +20,7 @@ use risingwave_connector::parser::{ EncodingProperties, JsonParser, JsonProperties, ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig, }; -use risingwave_connector::source::SourceColumnDesc; +use risingwave_connector::source::{SourceColumnDesc, SourceContext}; use serde_json::json; use tokio::runtime::Runtime; @@ -81,7 +81,7 @@ fn create_parser( }), protocol_config: ProtocolProperties::Plain, }; - let parser = JsonParser::new(props, desc.clone(), Default::default()).unwrap(); + let parser = JsonParser::new(props, desc.clone(), SourceContext::dummy().into()).unwrap(); let input = gen_input(mode, chunk_size, chunk_num); (parser, desc, input) } diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 1153a133eb1d0..f28f3cdce5e77 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -32,6 +32,7 @@ #![feature(iterator_try_collect)] #![feature(try_blocks)] #![feature(error_generic_member_access)] +#![feature(negative_impls)] #![feature(register_tool)] #![register_tool(rw)] #![recursion_limit = "256"] diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 6125e87d069bf..2e9a2ef6c18a9 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -201,7 +201,7 @@ mod test { use crate::parser::{ AccessBuilderImpl, EncodingType, SourceStreamChunkBuilder, SpecificParserConfig, }; - use crate::source::SourceColumnDesc; + use crate::source::{SourceColumnDesc, SourceContext}; fn test_data_path(file_name: &str) -> String { let curr_dir = env::current_dir().unwrap().into_os_string(); @@ -279,7 +279,7 @@ mod test { EncodingType::Value, )?), rw_columns: Vec::default(), - source_ctx: Default::default(), + source_ctx: SourceContext::dummy().into(), transaction_meta_builder: None, }) } diff --git a/src/connector/src/parser/bytes_parser.rs b/src/connector/src/parser/bytes_parser.rs index 4f353ce2c60e6..255c3ef829c79 100644 --- a/src/connector/src/parser/bytes_parser.rs +++ b/src/connector/src/parser/bytes_parser.rs @@ -54,6 +54,7 @@ mod tests { BytesProperties, EncodingProperties, ProtocolProperties, SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig, }; + use crate::source::SourceContext; fn get_payload() -> Vec> { vec![br#"t"#.to_vec(), br#"random"#.to_vec()] @@ -66,7 +67,7 @@ mod tests { encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }), protocol_config: ProtocolProperties::Plain, }; - let mut parser = PlainParser::new(props, descs.clone(), Default::default()) + let mut parser = PlainParser::new(props, descs.clone(), SourceContext::dummy().into()) .await .unwrap(); diff --git a/src/connector/src/parser/canal/simd_json_parser.rs b/src/connector/src/parser/canal/simd_json_parser.rs index 75e6656fd7a7a..9bc236392eb48 100644 --- a/src/connector/src/parser/canal/simd_json_parser.rs +++ b/src/connector/src/parser/canal/simd_json_parser.rs @@ -158,7 +158,7 @@ mod tests { ]; let parser = CanalJsonParser::new( descs.clone(), - Default::default(), + SourceContext::dummy().into(), &JsonProperties::default(), ) .unwrap(); @@ -229,7 +229,7 @@ mod tests { let parser = CanalJsonParser::new( descs.clone(), - Default::default(), + SourceContext::dummy().into(), &JsonProperties::default(), ) .unwrap(); @@ -283,7 +283,7 @@ mod tests { let parser = CanalJsonParser::new( descs.clone(), - Default::default(), + SourceContext::dummy().into(), &JsonProperties::default(), ) .unwrap(); diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs index 8a8bb211da327..eaf09ab839891 100644 --- a/src/connector/src/parser/csv_parser.rs +++ b/src/connector/src/parser/csv_parser.rs @@ -192,7 +192,7 @@ mod tests { delimiter: b',', has_header: false, }, - Default::default(), + SourceContext::dummy().into(), ) .unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); @@ -299,7 +299,7 @@ mod tests { delimiter: b',', has_header: true, }, - Default::default(), + SourceContext::dummy().into(), ) .unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index ca1574af3d6b2..29d9139d221cf 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -154,7 +154,7 @@ mod tests { use crate::parser::{ DebeziumAvroParserConfig, DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig, }; - use crate::source::SourceColumnDesc; + use crate::source::{SourceColumnDesc, SourceContext}; const DEBEZIUM_AVRO_DATA: &[u8] = b"\x00\x00\x00\x00\x06\x00\x02\xd2\x0f\x0a\x53\x61\x6c\x6c\x79\x0c\x54\x68\x6f\x6d\x61\x73\x2a\x73\x61\x6c\x6c\x79\x2e\x74\x68\x6f\x6d\x61\x73\x40\x61\x63\x6d\x65\x2e\x63\x6f\x6d\x16\x32\x2e\x31\x2e\x32\x2e\x46\x69\x6e\x61\x6c\x0a\x6d\x79\x73\x71\x6c\x12\x64\x62\x73\x65\x72\x76\x65\x72\x31\xc0\xb4\xe8\xb7\xc9\x61\x00\x30\x66\x69\x72\x73\x74\x5f\x69\x6e\x5f\x64\x61\x74\x61\x5f\x63\x6f\x6c\x6c\x65\x63\x74\x69\x6f\x6e\x12\x69\x6e\x76\x65\x6e\x74\x6f\x72\x79\x00\x02\x12\x63\x75\x73\x74\x6f\x6d\x65\x72\x73\x00\x00\x20\x6d\x79\x73\x71\x6c\x2d\x62\x69\x6e\x2e\x30\x30\x30\x30\x30\x33\x8c\x06\x00\x00\x00\x02\x72\x02\x92\xc3\xe8\xb7\xc9\x61\x00"; @@ -367,9 +367,12 @@ mod tests { .map(CatColumnDesc::from) .map(|c| SourceColumnDesc::from(&c)) .collect_vec(); - let parser = - DebeziumParser::new(parser_config, columns.clone(), Arc::new(Default::default())) - .await?; + let parser = DebeziumParser::new( + parser_config, + columns.clone(), + SourceContext::dummy().into(), + ) + .await?; let [(op, row)]: [_; 1] = parse_one(parser, columns, DEBEZIUM_AVRO_DATA.to_vec()) .await .try_into() diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 7fb497c399c61..52d1e4e4a15a2 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -24,9 +24,8 @@ use crate::parser::unified::debezium::DebeziumChangeEvent; use crate::parser::unified::json::TimestamptzHandling; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ - AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, JsonProperties, - ParseResult, ParserFormat, ProtocolProperties, SourceStreamChunkRowWriter, - SpecificParserConfig, + AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParseResult, + ParserFormat, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -112,6 +111,8 @@ impl DebeziumParser { } pub async fn new_for_test(rw_columns: Vec) -> ConnectorResult { + use crate::parser::JsonProperties; + let props = SpecificParserConfig { key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { @@ -120,7 +121,7 @@ impl DebeziumParser { }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), }; - Self::new(props, rw_columns, Default::default()).await + Self::new(props, rw_columns, SourceContext::dummy().into()).await } pub async fn parse_inner( @@ -199,7 +200,7 @@ mod tests { use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; use super::*; - use crate::parser::{SourceStreamChunkBuilder, TransactionControl}; + use crate::parser::{JsonProperties, SourceStreamChunkBuilder, TransactionControl}; use crate::source::{ConnectorProperties, DataType}; #[tokio::test] @@ -228,7 +229,7 @@ mod tests { }; let source_ctx = SourceContext { connector_props: ConnectorProperties::PostgresCdc(Box::default()), - ..Default::default() + ..SourceContext::dummy() }; let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx)) .await diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index ad0bc9e7f1475..2de46a9aa5460 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -182,7 +182,8 @@ mod tests { SourceColumnDesc::simple("_id", DataType::Varchar, ColumnId::from(0)), SourceColumnDesc::simple("payload", DataType::Jsonb, ColumnId::from(1)), ]; - let mut parser = DebeziumMongoJsonParser::new(columns.clone(), Default::default()).unwrap(); + let mut parser = + DebeziumMongoJsonParser::new(columns.clone(), SourceContext::dummy().into()).unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 3); let writer = builder.row_writer(); parser @@ -218,7 +219,8 @@ mod tests { ]; for data in input { let mut parser = - DebeziumMongoJsonParser::new(columns.clone(), Default::default()).unwrap(); + DebeziumMongoJsonParser::new(columns.clone(), SourceContext::dummy().into()) + .unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 3); diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 72c1688d95e75..63cb939d81238 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -115,7 +115,7 @@ mod tests { DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties, SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig, }; - use crate::source::SourceContextRef; + use crate::source::SourceContext; fn assert_json_eq(parse_result: &Option, json_str: &str) { if let Some(ScalarImpl::Jsonb(json_val)) = parse_result { @@ -130,10 +130,7 @@ mod tests { } } - async fn build_parser( - rw_columns: Vec, - source_ctx: SourceContextRef, - ) -> DebeziumParser { + async fn build_parser(rw_columns: Vec) -> DebeziumParser { let props = SpecificParserConfig { key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { @@ -142,7 +139,7 @@ mod tests { }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), }; - DebeziumParser::new(props, rw_columns, source_ctx) + DebeziumParser::new(props, rw_columns, SourceContext::dummy().into()) .await .unwrap() } @@ -197,7 +194,7 @@ mod tests { let columns = get_test1_columns(); for data in input { - let parser = build_parser(columns.clone(), Default::default()).await; + let parser = build_parser(columns.clone()).await; let [(_op, row)]: [_; 1] = parse_one(parser, columns.clone(), data) .await .try_into() @@ -228,7 +225,7 @@ mod tests { let columns = get_test1_columns(); for data in input { - let parser = build_parser(columns.clone(), Default::default()).await; + let parser = build_parser(columns.clone()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data) .await .try_into() @@ -259,7 +256,7 @@ mod tests { for data in input { let columns = get_test1_columns(); - let parser = build_parser(columns.clone(), Default::default()).await; + let parser = build_parser(columns.clone()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data) .await .try_into() @@ -297,7 +294,7 @@ mod tests { let columns = get_test1_columns(); for data in input { - let parser = build_parser(columns.clone(), Default::default()).await; + let parser = build_parser(columns.clone()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data) .await .try_into() @@ -356,7 +353,7 @@ mod tests { let columns = get_test2_columns(); - let parser = build_parser(columns.clone(), Default::default()).await; + let parser = build_parser(columns.clone()).await; let [(_op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await @@ -391,7 +388,7 @@ mod tests { let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678088861000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":789,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678088861249,"transaction":null}}"#; let columns = get_test2_columns(); - let parser = build_parser(columns.clone(), Default::default()).await; + let parser = build_parser(columns.clone()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await .try_into() @@ -426,7 +423,7 @@ mod tests { let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\":\"v1_updated\",\"k2\":33}"},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090653000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1643,"row":0,"thread":4,"query":null},"op":"d","ts_ms":1678090653611,"transaction":null}}"#; let columns = get_test2_columns(); - let parser = build_parser(columns.clone(), Default::default()).await; + let parser = build_parser(columns.clone()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await .try_into() @@ -463,7 +460,7 @@ mod tests { let columns = get_test2_columns(); - let parser = build_parser(columns.clone(), Default::default()).await; + let parser = build_parser(columns.clone()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await .try_into() @@ -506,7 +503,7 @@ mod tests { SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)), SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)), ]; - let mut parser = build_parser(columns.clone(), Default::default()).await; + let mut parser = build_parser(columns.clone()).await; let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); @@ -647,7 +644,7 @@ mod tests { // this test covers an insert event on the table above let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_time_0":40271000000,"o_time_6":40271000010,"o_timez_0":"11:11:11Z","o_timez_6":"11:11:11.00001Z","o_timestamp_0":1321009871000,"o_timestamp_6":1321009871123456,"o_timestampz_0":"2011-11-11T03:11:11Z","o_timestampz_6":"2011-11-11T03:11:11.123456Z","o_interval":"P1Y2M3DT4H5M6.78S","o_date":"1999-09-09"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684733351963,"snapshot":"last","db":"test","sequence":"[null,\"26505352\"]","schema":"public","table":"orders","txId":729,"lsn":26505352,"xmin":null},"op":"r","ts_ms":1684733352110,"transaction":null}}"#; let columns = get_temporal_test_columns(); - let parser = build_parser(columns.clone(), Default::default()).await; + let parser = build_parser(columns.clone()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await .try_into() @@ -702,7 +699,7 @@ mod tests { // this test covers an insert event on the table above let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_smallint":32767,"o_integer":2147483647,"o_bigint":9223372036854775807,"o_real":9.999,"o_double":9.999999,"o_numeric":123456.789,"o_numeric_6_3":123.456,"o_money":123.12},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684404343201,"snapshot":"last","db":"test","sequence":"[null,\"26519216\"]","schema":"public","table":"orders","txId":729,"lsn":26519216,"xmin":null},"op":"r","ts_ms":1684404343349,"transaction":null}}"#; let columns = get_numeric_test_columns(); - let parser = build_parser(columns.clone(), Default::default()).await; + let parser = build_parser(columns.clone()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await .try_into() @@ -741,7 +738,7 @@ mod tests { // this test covers an insert event on the table above let data = br#"{"payload":{"before":null,"after":{"o_key":1,"o_boolean":false,"o_bit":true,"o_bytea":"ASNFZ4mrze8=","o_json":"{\"k1\": \"v1\", \"k2\": 11}","o_xml":"","o_uuid":"60f14fe2-f857-404a-b586-3b5375b3259f","o_point":{"x":1.0,"y":2.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAABA","srid":null},"o_enum":"polar","o_char":"h","o_varchar":"ha","o_character":"h","o_character_varying":"hahaha"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684743927178,"snapshot":"last","db":"test","sequence":"[null,\"26524528\"]","schema":"public","table":"orders","txId":730,"lsn":26524528,"xmin":null},"op":"r","ts_ms":1684743927343,"transaction":null}}"#; let columns = get_other_types_test_columns(); - let parser = build_parser(columns.clone(), Default::default()).await; + let parser = build_parser(columns.clone()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await .try_into() diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 05e689f07be72..f010b8e6b7df6 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -106,10 +106,11 @@ impl JsonParser { }) } + #[cfg(test)] pub fn new_for_test(rw_columns: Vec) -> ConnectorResult { Ok(Self { rw_columns, - source_ctx: Default::default(), + source_ctx: SourceContext::dummy().into(), payload_start_idx: 0, }) } @@ -218,7 +219,7 @@ mod tests { EncodingProperties, JsonProperties, ProtocolProperties, SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig, }; - use crate::source::SourceColumnType; + use crate::source::{SourceColumnType, SourceContext}; fn get_payload() -> Vec> { vec![ @@ -251,7 +252,7 @@ mod tests { let parser = JsonParser::new( SpecificParserConfig::DEFAULT_PLAIN_JSON, descs.clone(), - Default::default(), + SourceContext::dummy().into(), ) .unwrap(); @@ -361,7 +362,7 @@ mod tests { let parser = JsonParser::new( SpecificParserConfig::DEFAULT_PLAIN_JSON, descs.clone(), - Default::default(), + SourceContext::dummy().into(), ) .unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 3); @@ -432,7 +433,7 @@ mod tests { let parser = JsonParser::new( SpecificParserConfig::DEFAULT_PLAIN_JSON, descs.clone(), - Default::default(), + SourceContext::dummy().into(), ) .unwrap(); let payload = br#" @@ -504,7 +505,7 @@ mod tests { let parser = JsonParser::new( SpecificParserConfig::DEFAULT_PLAIN_JSON, descs.clone(), - Default::default(), + SourceContext::dummy().into(), ) .unwrap(); let payload = br#" @@ -550,7 +551,7 @@ mod tests { let parser = JsonParser::new( SpecificParserConfig::DEFAULT_PLAIN_JSON, descs.clone(), - Default::default(), + SourceContext::dummy().into(), ) .unwrap(); let payload = br#" @@ -614,7 +615,7 @@ mod tests { }), protocol_config: ProtocolProperties::Upsert, }; - let mut parser = UpsertParser::new(props, descs.clone(), Default::default()) + let mut parser = UpsertParser::new(props, descs.clone(), SourceContext::dummy().into()) .await .unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); diff --git a/src/connector/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs index d018ef73b0506..5db6cdd52e90c 100644 --- a/src/connector/src/parser/maxwell/simd_json_parser.rs +++ b/src/connector/src/parser/maxwell/simd_json_parser.rs @@ -23,6 +23,8 @@ mod tests { EncodingProperties, JsonProperties, ProtocolProperties, SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig, }; + use crate::source::SourceContext; + #[tokio::test] async fn test_json_parser() { let descs = vec![ @@ -40,7 +42,7 @@ mod tests { }), protocol_config: ProtocolProperties::Maxwell, }; - let mut parser = MaxwellParser::new(props, descs.clone(), Default::default()) + let mut parser = MaxwellParser::new(props, descs.clone(), SourceContext::dummy().into()) .await .unwrap(); diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index b50f0d57645fe..c526366905938 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -195,7 +195,7 @@ mod tests { let source_ctx = SourceContext { connector_props: ConnectorProperties::PostgresCdc(Box::default()), - ..Default::default() + ..SourceContext::dummy() }; let source_ctx = Arc::new(source_ctx); // format plain encode json parser @@ -348,7 +348,7 @@ mod tests { // format plain encode json parser let source_ctx = SourceContext { connector_props: ConnectorProperties::MysqlCdc(Box::default()), - ..Default::default() + ..SourceContext::dummy() }; let mut parser = PlainParser::new( SpecificParserConfig::DEFAULT_PLAIN_JSON, diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index c6598f163a977..8d56441e53488 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -363,7 +363,7 @@ impl Sink for KafkaSink { // use enumerator to validate broker reachability and existence of topic let check = KafkaSplitEnumerator::new( KafkaProperties::from(self.config.clone()), - Arc::new(SourceEnumeratorContext::default()), + Arc::new(SourceEnumeratorContext::dummy()), ) .await?; if !check.check_reachability().await { diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 420194fe4a941..6f594c09366d8 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -138,27 +138,33 @@ pub struct SourceCtrlOpts { pub rate_limit: Option, } -impl Default for SourceCtrlOpts { - fn default() -> Self { - Self { - chunk_size: MAX_CHUNK_SIZE, - rate_limit: None, - } - } -} +// The options in `SourceCtrlOpts` are so important that we don't want to impl `Default` for it, +// so that we can prevent any unintentional use of the default value. +impl !Default for SourceCtrlOpts {} -#[derive(Debug, Default)] +#[derive(Debug)] pub struct SourceEnumeratorContext { pub info: SourceEnumeratorInfo, pub metrics: Arc, } -#[derive(Clone, Debug, Default)] +impl SourceEnumeratorContext { + /// Create a dummy `SourceEnumeratorContext` for testing purpose, or for the situation + /// where the real context doesn't matter. + pub fn dummy() -> SourceEnumeratorContext { + SourceEnumeratorContext { + info: SourceEnumeratorInfo { source_id: 0 }, + metrics: Arc::new(EnumeratorMetrics::default()), + } + } +} + +#[derive(Clone, Debug)] pub struct SourceEnumeratorInfo { pub source_id: u32, } -#[derive(Debug, Default)] +#[derive(Debug)] pub struct SourceContext { pub actor_id: u32, pub source_id: TableId, @@ -175,10 +181,10 @@ impl SourceContext { actor_id: u32, source_id: TableId, fragment_id: u32, + source_name: String, metrics: Arc, source_ctrl_opts: SourceCtrlOpts, connector_props: ConnectorProperties, - source_name: String, ) -> Self { Self { actor_id, @@ -190,6 +196,23 @@ impl SourceContext { connector_props, } } + + /// Create a dummy `SourceContext` for testing purpose, or for the situation + /// where the real context doesn't matter. + pub fn dummy() -> Self { + Self::new( + 0, + TableId::new(0), + 0, + "dummy".to_string(), + Arc::new(SourceMetrics::default()), + SourceCtrlOpts { + chunk_size: MAX_CHUNK_SIZE, + rate_limit: None, + }, + ConnectorProperties::default(), + ) + } } #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index 0836fef581a3c..87f798d59f38b 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -337,6 +337,7 @@ mod tests { use super::*; use crate::parser::SpecificParserConfig; + use crate::source::SourceContext; #[tokio::test] async fn test_generator() -> Result<()> { @@ -403,7 +404,7 @@ mod tests { }, ..Default::default() }, - Default::default(), + SourceContext::dummy().into(), Some(mock_datum), ) .await? @@ -465,7 +466,7 @@ mod tests { properties.clone(), state, parser_config.clone(), - Default::default(), + SourceContext::dummy().into(), Some(mock_datum.clone()), ) .await? @@ -482,7 +483,7 @@ mod tests { properties, state, parser_config, - Default::default(), + SourceContext::dummy().into(), Some(mock_datum), ) .await? diff --git a/src/connector/src/source/filesystem/s3/enumerator.rs b/src/connector/src/source/filesystem/s3/enumerator.rs index ae671ad2265f9..7491cac0df7c6 100644 --- a/src/connector/src/source/filesystem/s3/enumerator.rs +++ b/src/connector/src/source/filesystem/s3/enumerator.rs @@ -140,7 +140,7 @@ mod tests { endpoint_url: None, }; let mut enumerator = - S3SplitEnumerator::new(props.into(), SourceEnumeratorContext::default().into()) + S3SplitEnumerator::new(props.into(), SourceEnumeratorContext::dummy().into()) .await .unwrap(); let splits = enumerator.list_splits().await.unwrap(); diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index ff5dc9c9c1482..129b708a61521 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -266,7 +266,9 @@ mod tests { }; use crate::source::filesystem::s3::S3PropertiesCommon; use crate::source::filesystem::S3SplitEnumerator; - use crate::source::{SourceColumnDesc, SourceEnumeratorContext, SplitEnumerator}; + use crate::source::{ + SourceColumnDesc, SourceContext, SourceEnumeratorContext, SplitEnumerator, + }; #[tokio::test] #[ignore] @@ -281,7 +283,7 @@ mod tests { } .into(); let mut enumerator = - S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into()) + S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::dummy().into()) .await .unwrap(); let splits = enumerator.list_splits().await.unwrap(); @@ -307,7 +309,7 @@ mod tests { }, }; - let reader = S3FileReader::new(props, splits, config, Default::default(), None) + let reader = S3FileReader::new(props, splits, config, SourceContext::dummy().into(), None) .await .unwrap(); diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index 959940bd84415..c9026428d1df0 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -307,6 +307,7 @@ mod tests { use super::*; use crate::connector_common::KinesisCommon; use crate::source::kinesis::split::KinesisSplit; + use crate::source::SourceContext; #[tokio::test] async fn test_reject_redundant_seq_props() { @@ -335,7 +336,7 @@ mod tests { end_position: KinesisOffset::None, }], Default::default(), - Default::default(), + SourceContext::dummy().into(), None, ) .await; @@ -371,7 +372,7 @@ mod tests { end_position: KinesisOffset::None, }], Default::default(), - Default::default(), + SourceContext::dummy().into(), None, ) .await? @@ -389,7 +390,7 @@ mod tests { end_position: KinesisOffset::None, }], Default::default(), - Default::default(), + SourceContext::dummy().into(), None, ) .await? diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs index 7b402cbc13680..6441baa154ae4 100644 --- a/src/connector/src/source/nexmark/source/reader.rs +++ b/src/connector/src/source/nexmark/source/reader.rs @@ -211,7 +211,7 @@ impl NexmarkSplitReader { mod tests { use super::*; use crate::source::nexmark::{NexmarkProperties, NexmarkSplitEnumerator}; - use crate::source::{SourceEnumeratorContext, SplitEnumerator}; + use crate::source::{SourceContext, SourceEnumeratorContext, SplitEnumerator}; #[tokio::test] async fn test_nexmark_split_reader() -> crate::error::ConnectorResult<()> { @@ -224,7 +224,7 @@ mod tests { }; let mut enumerator = - NexmarkSplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into()) + NexmarkSplitEnumerator::new(props.clone(), SourceEnumeratorContext::dummy().into()) .await?; let list_splits_resp: Vec<_> = enumerator.list_splits().await?.into_iter().collect(); @@ -236,7 +236,7 @@ mod tests { props.clone(), state, Default::default(), - Default::default(), + SourceContext::dummy().into(), None, ) .await? @@ -261,7 +261,7 @@ mod tests { }; let mut enumerator = - NexmarkSplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into()) + NexmarkSplitEnumerator::new(props.clone(), SourceEnumeratorContext::dummy().into()) .await?; let list_splits_resp: Vec<_> = enumerator.list_splits().await?.into_iter().collect(); @@ -271,7 +271,7 @@ mod tests { props.clone(), state, Default::default(), - Default::default(), + SourceContext::dummy().into(), None, ) .await? diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index a578fd30d2539..9fc9316d02814 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -294,7 +294,7 @@ impl SourceScanInfo { match fetch_info.connector { ConnectorProperties::Kafka(prop) => { let mut kafka_enumerator = - KafkaSplitEnumerator::new(*prop, SourceEnumeratorContext::default().into()) + KafkaSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into()) .await?; let split_info = kafka_enumerator .list_splits_batch(fetch_info.timebound.0, fetch_info.timebound.1) @@ -329,7 +329,7 @@ impl SourceScanInfo { } ConnectorProperties::Iceberg(prop) => { let iceberg_enumerator = - IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::default().into()) + IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into()) .await?; let time_travel_info = match fetch_info.as_of { diff --git a/src/meta/service/src/cloud_service.rs b/src/meta/service/src/cloud_service.rs index 9c213bd7cb9e2..b77b751b281e7 100644 --- a/src/meta/service/src/cloud_service.rs +++ b/src/meta/service/src/cloud_service.rs @@ -159,7 +159,7 @@ impl CloudService for CloudServiceImpl { async fn new_enumerator( props: P, ) -> ConnectorResult { - P::SplitEnumerator::new(props, SourceEnumeratorContext::default().into()).await + P::SplitEnumerator::new(props, SourceEnumeratorContext::dummy().into()).await } dispatch_source_prop!(props.unwrap(), props, { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 1c551f46b6fc1..c0aa62750facb 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -845,7 +845,7 @@ impl DdlController { async fn new_enumerator_for_validate( source_props: P, ) -> Result { - P::SplitEnumerator::new(source_props, SourceEnumeratorContext::default().into()).await + P::SplitEnumerator::new(source_props, SourceEnumeratorContext::dummy().into()).await } for actor in &stream_scan_fragment.actors { diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 234eceea9fc9b..96a53ace0b25b 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -626,7 +626,7 @@ pub async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) { let mut parser = DebeziumParser::new( props, get_rw_columns(schema), - Arc::new(SourceContext::default()), + Arc::new(SourceContext::dummy()), ) .await .map_err(StreamExecutorError::connector_error)?; diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 1d6e409c63a24..c555fb036d94c 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -172,10 +172,10 @@ impl FsFetchExecutor { self.actor_ctx.id, source_id, self.actor_ctx.fragment_id, + source_name.to_owned(), source_desc.metrics.clone(), self.source_ctrl_opts.clone(), source_desc.source.config.clone(), - source_name.to_owned(), ) } diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 13b876c05c2be..36b7783950232 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -100,10 +100,10 @@ impl FsSourceExecutor { self.actor_ctx.id, self.stream_source_core.source_id, self.actor_ctx.fragment_id, + self.stream_source_core.source_name.clone(), source_desc.metrics.clone(), self.source_ctrl_opts.clone(), source_desc.source.config.clone(), - self.stream_source_core.source_name.clone(), ); let stream = source_desc .source diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index be752b4598a3a..7b3c594cab5af 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -210,10 +210,10 @@ impl SourceBackfillExecutorInner { self.actor_ctx.id, self.stream_source_core.source_id, self.actor_ctx.fragment_id, + self.stream_source_core.source_name.clone(), source_desc.metrics.clone(), self.source_ctrl_opts.clone(), source_desc.source.config.clone(), - self.stream_source_core.source_name.clone(), ); let stream = source_desc .source diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index d86a85ec5bde4..cf54166afd396 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -160,14 +160,14 @@ impl SourceExecutor { self.actor_ctx.id, self.stream_source_core.as_ref().unwrap().source_id, self.actor_ctx.fragment_id, - source_desc.metrics.clone(), - self.source_ctrl_opts.clone(), - source_desc.source.config.clone(), self.stream_source_core .as_ref() .unwrap() .source_name .clone(), + source_desc.metrics.clone(), + self.source_ctrl_opts.clone(), + source_desc.source.config.clone(), ); let stream = source_desc .source @@ -839,7 +839,10 @@ mod tests { Arc::new(StreamingMetrics::unused()), barrier_rx, system_params_manager.get_params(), - SourceCtrlOpts::default(), + SourceCtrlOpts { + chunk_size: 1024, + rate_limit: None, + }, ); let mut executor = executor.boxed().execute(); @@ -927,7 +930,10 @@ mod tests { Arc::new(StreamingMetrics::unused()), barrier_rx, system_params_manager.get_params(), - SourceCtrlOpts::default(), + SourceCtrlOpts { + chunk_size: 1024, + rate_limit: None, + }, ); let mut handler = executor.boxed().execute(); diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index ca057ced81f6f..3610c8a95caf0 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -269,7 +269,10 @@ impl ExecutorBuilder for SourceExecutorBuilder { barrier_receiver, system_params, // we don't expect any data in, so no need to set chunk_sizes - SourceCtrlOpts::default(), + SourceCtrlOpts { + chunk_size: 0, + rate_limit: None, + }, ); Ok((params.info, exec).into()) }