diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 5f06db0d2596..661559ee741f 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -55,6 +55,7 @@ use crate::parser::maxwell::MaxwellParser; use crate::parser::simd_json_parser::DebeziumMongoJsonAccessBuilder; use crate::parser::util::{ extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta, + extreact_timestamp_from_meta, }; use crate::schema::schema_registry::SchemaRegistryAuth; use crate::schema::InvalidOptionError; @@ -406,10 +407,18 @@ impl SourceStreamChunkRowWriter<'_> { | &Some(ref col @ AdditionalColumnType::TableName(_)) | &Some(ref col @ AdditionalColumnType::Timestamp(_)), ) => match self.row_meta { - Some(row_meta) => Ok(A::output_for( - extract_cdc_meta_column(row_meta.meta, col, desc.name.as_str())? - .unwrap_or(None), - )), + Some(row_meta) => { + if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.meta { + Ok(A::output_for( + extract_cdc_meta_column(cdc_meta, col, desc.name.as_str())? + .unwrap_or(None), + )) + } else { + Ok(A::output_for( + extreact_timestamp_from_meta(row_meta.meta).unwrap_or(None), + )) + } + } None => parse_field(desc), // parse from payload }, (_, &Some(AdditionalColumnType::CollectionName(_))) => { diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 5b2a2e01ac08..f423c8639b68 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -43,6 +43,7 @@ use risingwave_pb::plan_common::additional_column; use risingwave_pb::plan_common::additional_column::ColumnType; use crate::parser::{AccessError, AccessResult}; +use crate::source::cdc::DebeziumCdcMeta; /// get kafka topic name pub(super) fn get_kafka_topic(props: &HashMap) -> ConnectorResult<&String> { @@ -132,18 +133,18 @@ pub(super) async fn bytes_from_url( } } +pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option { + match meta { + SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(), + _ => None, + } +} + pub fn extract_cdc_meta_column( - meta: &SourceMeta, + cdc_meta: &DebeziumCdcMeta, column_type: &additional_column::ColumnType, column_name: &str, ) -> AccessResult> { - assert_matches!(meta, &SourceMeta::DebeziumCdc(_)); - - let cdc_meta = match meta { - SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta, - _ => unreachable!(), - }; - match column_type { ColumnType::Timestamp(_) => Ok(cdc_meta.extract_timestamp()), ColumnType::DatabaseName(_) => Ok(cdc_meta.extract_database_name()),