diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 7c1beac58ebb2..af9f18dcbf8f6 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -589,7 +589,8 @@ impl P { /// /// A [`ChunkSourceStream`] which is a stream of parsed messages. pub fn into_stream(self, data_stream: BoxSourceStream) -> impl ChunkSourceStream { - let source_info = self.source_ctx().source_info.clone(); + let actor_id = self.source_ctx().actor_id; + let source_id = self.source_ctx().source_id.table_id(); // Ensure chunk size is smaller than rate limit let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit { @@ -600,13 +601,8 @@ impl P { // The parser stream will be long-lived. We use `instrument_with` here to create // a new span for the polling of each chunk. - into_chunk_stream(self, data_stream).instrument_with(move || { - tracing::info_span!( - "source_parse_chunk", - actor_id = source_info.actor_id, - source_id = source_info.source_id.table_id() - ) - }) + into_chunk_stream(self, data_stream) + .instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id)) } } @@ -718,9 +714,9 @@ async fn into_chunk_stream(mut parser: P, data_stream let context = parser.source_ctx(); GLOBAL_ERROR_METRICS.user_source_error.report([ error.variant_name().to_string(), - context.source_info.source_id.to_string(), - context.source_info.source_name.clone(), - context.source_info.fragment_id.to_string(), + context.source_id.to_string(), + context.source_name.clone(), + context.fragment_id.to_string(), ]); } } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index d6a8f441286c2..5fa12b5c515ab 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -161,7 +161,11 @@ pub struct SourceEnumeratorInfo { #[derive(Debug, Default)] pub struct SourceContext { pub connector_client: Option, - pub source_info: SourceInfo, + pub actor_id: u32, + pub source_id: TableId, + // There should be a 1-1 mapping between `source_id` & `fragment_id` + pub fragment_id: u32, + pub source_name: String, pub metrics: Arc, pub source_ctrl_opts: SourceCtrlOpts, pub connector_props: ConnectorProperties, @@ -170,7 +174,7 @@ pub struct SourceContext { impl SourceContext { pub fn new( actor_id: u32, - table_id: TableId, + source_id: TableId, fragment_id: u32, metrics: Arc, source_ctrl_opts: SourceCtrlOpts, @@ -180,12 +184,10 @@ impl SourceContext { ) -> Self { Self { connector_client, - source_info: SourceInfo { - actor_id, - source_id: table_id, - fragment_id, - source_name, - }, + actor_id, + source_id, + fragment_id, + source_name, metrics, source_ctrl_opts, connector_props, @@ -193,15 +195,6 @@ impl SourceContext { } } -#[derive(Clone, Debug, Default)] -pub struct SourceInfo { - pub actor_id: u32, - pub source_id: TableId, - // There should be a 1-1 mapping between `source_id` & `fragment_id` - pub fragment_id: u32, - pub source_name: String, -} - #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] pub enum SourceFormat { #[default] diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index e8ee3bbd2bbd3..029be2a6e30ea 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -217,8 +217,8 @@ impl CommonSplitReader for CdcSplitReader { GLOBAL_ERROR_METRICS.user_source_error.report([ "cdc_source".to_owned(), source_id.clone(), - self.source_ctx.source_info.source_name.clone(), - self.source_ctx.source_info.fragment_id.to_string(), + self.source_ctx.source_name.clone(), + self.source_ctx.fragment_id.to_string(), ]); Err(e)?; } diff --git a/src/connector/src/source/common.rs b/src/connector/src/source/common.rs index c145bd7f403da..8fcd1318401f2 100644 --- a/src/connector/src/source/common.rs +++ b/src/connector/src/source/common.rs @@ -30,10 +30,10 @@ pub(crate) async fn into_chunk_stream( parser_config: ParserConfig, source_ctx: SourceContextRef, ) { - let actor_id = source_ctx.source_info.actor_id.to_string(); - let fragment_id = source_ctx.source_info.fragment_id.to_string(); - let source_id = source_ctx.source_info.source_id.to_string(); - let source_name = source_ctx.source_info.source_name.to_string(); + let actor_id = source_ctx.actor_id.to_string(); + let fragment_id = source_ctx.fragment_id.to_string(); + let source_id = source_ctx.source_id.to_string(); + let source_name = source_ctx.source_name.to_string(); let metrics = source_ctx.metrics.clone(); let data_stream = reader.into_data_stream(); diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index 0b522c4e7c938..0836fef581a3c 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -149,10 +149,10 @@ impl SplitReader for DatagenSplitReader { &self.parser_config.specific.encoding_config, ) { (ProtocolProperties::Native, EncodingProperties::Native) => { - let actor_id = self.source_ctx.source_info.actor_id.to_string(); - let fragment_id = self.source_ctx.source_info.fragment_id.to_string(); - let source_id = self.source_ctx.source_info.source_id.to_string(); - let source_name = self.source_ctx.source_info.source_name.to_string(); + let actor_id = self.source_ctx.actor_id.to_string(); + let fragment_id = self.source_ctx.fragment_id.to_string(); + let source_id = self.source_ctx.source_id.to_string(); + let source_name = self.source_ctx.source_name.to_string(); let split_id = self.split_id.to_string(); let metrics = self.source_ctx.metrics.clone(); spawn_data_generation_stream( diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 99173327be76f..79a4925f4bd24 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -71,10 +71,10 @@ impl OpendalReader { #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)] async fn into_chunk_stream(self) { for split in self.splits { - let actor_id = self.source_ctx.source_info.actor_id.to_string(); - let fragment_id = self.source_ctx.source_info.fragment_id.to_string(); - let source_id = self.source_ctx.source_info.source_id.to_string(); - let source_name = self.source_ctx.source_info.source_name.to_string(); + let actor_id = self.source_ctx.actor_id.to_string(); + let fragment_id = self.source_ctx.fragment_id.to_string(); + let source_id = self.source_ctx.source_id.to_string(); + let source_name = self.source_ctx.source_name.to_string(); let source_ctx = self.source_ctx.clone(); let split_id = split.id(); @@ -114,10 +114,10 @@ impl OpendalReader { split: OpendalFsSplit, source_ctx: SourceContextRef, ) { - let actor_id = source_ctx.source_info.actor_id.to_string(); - let fragment_id = source_ctx.source_info.fragment_id.to_string(); - let source_id = source_ctx.source_info.source_id.to_string(); - let source_name = source_ctx.source_info.source_name.to_string(); + let actor_id = source_ctx.actor_id.to_string(); + let fragment_id = source_ctx.fragment_id.to_string(); + let source_id = source_ctx.source_id.to_string(); + let source_name = source_ctx.source_name.to_string(); let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size; let split_id = split.id(); diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 3f485dc22383a..272bf41c1d0e9 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -62,10 +62,10 @@ impl S3FileReader { split: FsSplit, source_ctx: SourceContextRef, ) { - let actor_id = source_ctx.source_info.actor_id.to_string(); - let fragment_id = source_ctx.source_info.fragment_id.to_string(); - let source_id = source_ctx.source_info.source_id.to_string(); - let source_name = source_ctx.source_info.source_name.to_string(); + let actor_id = source_ctx.actor_id.to_string(); + let fragment_id = source_ctx.fragment_id.to_string(); + let source_id = source_ctx.source_id.to_string(); + let source_name = source_ctx.source_name.to_string(); let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size; let split_id = split.id(); @@ -212,10 +212,10 @@ impl S3FileReader { #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)] async fn into_chunk_stream(self) { for split in self.splits { - let actor_id = self.source_ctx.source_info.actor_id.to_string(); - let fragment_id = self.source_ctx.source_info.fragment_id.to_string(); - let source_id = self.source_ctx.source_info.source_id.to_string(); - let source_name = self.source_ctx.source_info.source_name.to_string(); + let actor_id = self.source_ctx.actor_id.to_string(); + let fragment_id = self.source_ctx.fragment_id.to_string(); + let source_id = self.source_ctx.source_id.to_string(); + let source_name = self.source_ctx.source_name.to_string(); let source_ctx = self.source_ctx.clone(); let split_id = split.id(); diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index d67e45fee3837..ea1d2d847f51c 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -79,7 +79,7 @@ impl SplitReader for KafkaSplitReader { "group.id", format!( "rw-consumer-{}-{}", - source_ctx.source_info.fragment_id, source_ctx.source_info.actor_id + source_ctx.fragment_id, source_ctx.actor_id ), ); @@ -87,9 +87,7 @@ impl SplitReader for KafkaSplitReader { broker_rewrite_map, Some(format!( "fragment-{}-source-{}-actor-{}", - source_ctx.source_info.fragment_id, - source_ctx.source_info.source_id, - source_ctx.source_info.actor_id + source_ctx.fragment_id, source_ctx.source_id, source_ctx.actor_id )), // thread consumer will keep polling in the background, we don't need to call `poll` // explicitly @@ -160,8 +158,8 @@ impl KafkaSplitReader { .latest_message_id .with_label_values(&[ // source name is not available here - &self.source_ctx.source_info.source_id.to_string(), - &self.source_ctx.source_info.actor_id.to_string(), + &self.source_ctx.source_id.to_string(), + &self.source_ctx.actor_id.to_string(), split_id, ]) .set(offset); diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs index fd68348d6faf6..dbed90d99ae26 100644 --- a/src/connector/src/source/nexmark/source/reader.rs +++ b/src/connector/src/source/nexmark/source/reader.rs @@ -107,10 +107,10 @@ impl SplitReader for NexmarkSplitReader { } fn into_stream(self) -> BoxChunkSourceStream { - let actor_id = self.source_ctx.source_info.actor_id.to_string(); - let fragment_id = self.source_ctx.source_info.fragment_id.to_string(); - let source_id = self.source_ctx.source_info.source_id.to_string(); - let source_name = self.source_ctx.source_info.source_name.to_string(); + let actor_id = self.source_ctx.actor_id.to_string(); + let fragment_id = self.source_ctx.fragment_id.to_string(); + let source_id = self.source_ctx.source_id.to_string(); + let source_name = self.source_ctx.source_name.to_string(); let split_id = self.split_id.clone(); let metrics = self.source_ctx.metrics.clone();