diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index d8bd4a9dbcd5..22cce134906c 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -215,7 +215,7 @@ impl<'a> MessageMeta<'a> { /// Extract the value for the given column. /// /// Returns `None` if the column is not a meta column. - fn value_for_column(self, desc: &SourceColumnDesc) -> Option> { + fn value_for_column(self, desc: &SourceColumnDesc) -> DatumRef<'a> { let datum: DatumRef<'_> = match desc.column_type { // Row id columns are filled with `NULL` here and will be filled with the real // row id generated by `RowIdGenExecutor` later. @@ -229,11 +229,7 @@ impl<'a> MessageMeta<'a> { KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected kafka meta column name" ); - kafka_meta.timestamp.map(|ts| { - risingwave_common::cast::i64_to_timestamptz(ts) - .unwrap() - .into() - }) + kafka_meta.extract_timestamp() } SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.meta => { assert_eq!( @@ -248,7 +244,7 @@ impl<'a> MessageMeta<'a> { SourceColumnType::Meta | SourceColumnType::Normal => return None, }; - Some(datum) + datum } } @@ -392,8 +388,7 @@ impl SourceStreamChunkRowWriter<'_> { Ok(A::output_for( self.row_meta .as_ref() - .and_then(|row_meta| row_meta.value_for_column(desc)) - .unwrap(), // handled all match cases in internal match, unwrap is safe + .and_then(|row_meta| row_meta.value_for_column(desc)), )) } (&SourceColumnType::Meta, _) @@ -403,12 +398,11 @@ impl SourceStreamChunkRowWriter<'_> { ) => { // SourceColumnType is for CDC source only. - return Ok(A::output_for( + Ok(A::output_for( self.row_meta .as_ref() - .and_then(|row_meta| row_meta.value_for_column(desc)) - .unwrap(), // handled all match cases in internal match, unwrap is safe - )); + .and_then(|row_meta| row_meta.value_for_column(desc)), + )) } ( @@ -434,9 +428,9 @@ impl SourceStreamChunkRowWriter<'_> { } } (_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta { - Some(row_meta) => Ok(A::output_for( - extreact_timestamp_from_meta(row_meta.meta).unwrap_or(None), - )), + Some(row_meta) => { + Ok(A::output_for(extreact_timestamp_from_meta(row_meta.meta))) + } None => parse_field(desc), // parse from payload }, (_, &Some(AdditionalColumnType::CollectionName(_))) => { diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 590ba927854d..ef748ec0e2eb 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -116,10 +116,11 @@ pub(super) async fn bytes_from_url( } } -pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option> { +pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> DatumRef<'_> { match meta { SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(), - SourceMeta::DebeziumCdc(cdc_meta) => Some(cdc_meta.extract_timestamp()), + SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta.extract_timestamp(), + SourceMeta::Kinesis(kinesis_meta) => kinesis_meta.extract_timestamp(), _ => None, } } diff --git a/src/connector/src/source/kafka/source/message.rs b/src/connector/src/source/kafka/source/message.rs index 247166a15676..38c781deab64 100644 --- a/src/connector/src/source/kafka/source/message.rs +++ b/src/connector/src/source/kafka/source/message.rs @@ -16,7 +16,7 @@ use std::borrow::Cow; use itertools::Itertools; use rdkafka::message::{BorrowedMessage, Headers, OwnedHeaders}; -use rdkafka::Message; +use rdkafka::{Message, Timestamp}; use risingwave_common::types::{ Datum, DatumCow, DatumRef, ListValue, ScalarImpl, ScalarRefImpl, StructValue, }; @@ -29,18 +29,15 @@ use crate::source::SourceMeta; #[derive(Debug, Clone)] pub struct KafkaMeta { - // timestamp(milliseconds) of message append in mq - pub timestamp: Option, + pub timestamp: Timestamp, pub headers: Option, } impl KafkaMeta { - pub fn extract_timestamp(&self) -> Option> { - self.timestamp.map(|ts| { - Some(ScalarRefImpl::Timestamptz( - risingwave_common::cast::i64_to_timestamptz(ts).unwrap(), - )) - }) + pub fn extract_timestamp(&self) -> DatumRef<'_> { + Some( + risingwave_common::types::Timestamptz::from_millis(self.timestamp.to_millis()?)?.into(), + ) } pub fn extract_header_inner<'a>( @@ -101,7 +98,7 @@ impl SourceMessage { offset: message.offset().to_string(), split_id: message.partition().to_string().into(), meta: SourceMeta::Kafka(KafkaMeta { - timestamp: message.timestamp().to_millis(), + timestamp: message.timestamp(), headers: if require_header { message.headers().map(|headers| headers.detach()) } else { diff --git a/src/connector/src/source/kinesis/source/message.rs b/src/connector/src/source/kinesis/source/message.rs index c12679437c1f..599609634048 100644 --- a/src/connector/src/source/kinesis/source/message.rs +++ b/src/connector/src/source/kinesis/source/message.rs @@ -13,15 +13,24 @@ // limitations under the License. use aws_sdk_kinesis::types::Record; +use aws_smithy_types::DateTime; use aws_smithy_types_convert::date_time::DateTimeExt; +use risingwave_common::types::{DatumRef, ScalarRefImpl}; use crate::source::{SourceMessage, SourceMeta, SplitId}; #[derive(Clone, Debug)] pub struct KinesisMeta { // from `approximate_arrival_timestamp` of type `Option` - #[expect(dead_code)] - timestamp: Option, + timestamp: Option, +} + +impl KinesisMeta { + pub fn extract_timestamp(&self) -> DatumRef<'_> { + Some(ScalarRefImpl::Timestamptz( + self.timestamp?.to_chrono_utc().ok()?.into(), + )) + } } pub fn from_kinesis_record(value: &Record, split_id: SplitId) -> SourceMessage { @@ -31,9 +40,7 @@ pub fn from_kinesis_record(value: &Record, split_id: SplitId) -> SourceMessage { offset: value.sequence_number.clone(), split_id, meta: SourceMeta::Kinesis(KinesisMeta { - timestamp: value - .approximate_arrival_timestamp - .map(|dt| dt.to_chrono_utc().unwrap().timestamp_millis()), + timestamp: value.approximate_arrival_timestamp, }), } }