Skip to content

Commit

Permalink
fix: support Kinesis with include timestamp (#19211)
Browse files Browse the repository at this point in the history
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
2 people authored and tabVersion committed Nov 5, 2024
1 parent ff2572f commit 624855a
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 35 deletions.
7 changes: 7 additions & 0 deletions integration_tests/kinesis-s3-source/create_mv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ FROM
ai.ad_id
) AS ad_clicks ON ad_impressions.ad_id = ad_clicks.ad_id;

CREATE MATERIALIZED VIEW kinesis_timestamp AS
SELECT
ad_impression._rw_kinesis_timestamp AS timestamp
FROM
ad_impression
WHERE ad_impression._rw_kinesis_timestamp IS NOT NULL AND ad_impression._rw_kinesis_timestamp >= '2024-01-01 00:00:00+00'::TIMESTAMPTZ;

CREATE MATERIALIZED VIEW ad_ctr_5min AS
SELECT
ac.ad_id AS ad_id,
Expand Down
8 changes: 7 additions & 1 deletion integration_tests/kinesis-s3-source/create_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ CREATE SOURCE ad_impression (
bid_id BIGINT,
ad_id BIGINT,
impression_timestamp TIMESTAMPTZ
) WITH (
)
INCLUDE key
INCLUDE partition
INCLUDE offset
INCLUDE timestamp
INCLUDE payload
WITH (
connector = 'kinesis',
stream = 'ad-impression',
aws.region='us-east-1',
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/kinesis-s3-source/data_check
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ad_ctr,ad_ctr_5min
ad_ctr,ad_ctr_5min,kinesis_timestamp
26 changes: 10 additions & 16 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl<'a> MessageMeta<'a> {
/// Extract the value for the given column.
///
/// Returns `None` if the column is not a meta column.
fn value_for_column(self, desc: &SourceColumnDesc) -> Option<DatumRef<'a>> {
fn value_for_column(self, desc: &SourceColumnDesc) -> DatumRef<'a> {
let datum: DatumRef<'_> = match desc.column_type {
// Row id columns are filled with `NULL` here and will be filled with the real
// row id generated by `RowIdGenExecutor` later.
Expand All @@ -229,11 +229,7 @@ impl<'a> MessageMeta<'a> {
KAFKA_TIMESTAMP_COLUMN_NAME,
"unexpected kafka meta column name"
);
kafka_meta.timestamp.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
.unwrap()
.into()
})
kafka_meta.extract_timestamp()
}
SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.meta => {
assert_eq!(
Expand All @@ -248,7 +244,7 @@ impl<'a> MessageMeta<'a> {
SourceColumnType::Meta | SourceColumnType::Normal => return None,
};

Some(datum)
datum
}
}

Expand Down Expand Up @@ -392,8 +388,7 @@ impl SourceStreamChunkRowWriter<'_> {
Ok(A::output_for(
self.row_meta
.as_ref()
.and_then(|row_meta| row_meta.value_for_column(desc))
.unwrap(), // handled all match cases in internal match, unwrap is safe
.and_then(|row_meta| row_meta.value_for_column(desc)),
))
}
(&SourceColumnType::Meta, _)
Expand All @@ -403,12 +398,11 @@ impl SourceStreamChunkRowWriter<'_> {
) =>
{
// SourceColumnType is for CDC source only.
return Ok(A::output_for(
Ok(A::output_for(
self.row_meta
.as_ref()
.and_then(|row_meta| row_meta.value_for_column(desc))
.unwrap(), // handled all match cases in internal match, unwrap is safe
));
.and_then(|row_meta| row_meta.value_for_column(desc)),
))
}

(
Expand All @@ -434,9 +428,9 @@ impl SourceStreamChunkRowWriter<'_> {
}
}
(_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta {
Some(row_meta) => Ok(A::output_for(
extreact_timestamp_from_meta(row_meta.meta).unwrap_or(None),
)),
Some(row_meta) => {
Ok(A::output_for(extreact_timestamp_from_meta(row_meta.meta)))
}
None => parse_field(desc), // parse from payload
},
(_, &Some(AdditionalColumnType::CollectionName(_))) => {
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/parser/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,11 @@ pub(super) async fn bytes_from_url(
}
}

pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option<DatumRef<'_>> {
pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> DatumRef<'_> {
match meta {
SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(),
SourceMeta::DebeziumCdc(cdc_meta) => Some(cdc_meta.extract_timestamp()),
SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta.extract_timestamp(),
SourceMeta::Kinesis(kinesis_meta) => kinesis_meta.extract_timestamp(),
_ => None,
}
}
Expand Down
17 changes: 7 additions & 10 deletions src/connector/src/source/kafka/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::borrow::Cow;

use itertools::Itertools;
use rdkafka::message::{BorrowedMessage, Headers, OwnedHeaders};
use rdkafka::Message;
use rdkafka::{Message, Timestamp};
use risingwave_common::types::{
Datum, DatumCow, DatumRef, ListValue, ScalarImpl, ScalarRefImpl, StructValue,
};
Expand All @@ -29,18 +29,15 @@ use crate::source::SourceMeta;

#[derive(Debug, Clone)]
pub struct KafkaMeta {
// timestamp(milliseconds) of message append in mq
pub timestamp: Option<i64>,
pub timestamp: Timestamp,
pub headers: Option<OwnedHeaders>,
}

impl KafkaMeta {
pub fn extract_timestamp(&self) -> Option<DatumRef<'_>> {
self.timestamp.map(|ts| {
Some(ScalarRefImpl::Timestamptz(
risingwave_common::cast::i64_to_timestamptz(ts).unwrap(),
))
})
pub fn extract_timestamp(&self) -> DatumRef<'_> {
Some(
risingwave_common::types::Timestamptz::from_millis(self.timestamp.to_millis()?)?.into(),
)
}

pub fn extract_header_inner<'a>(
Expand Down Expand Up @@ -101,7 +98,7 @@ impl SourceMessage {
offset: message.offset().to_string(),
split_id: message.partition().to_string().into(),
meta: SourceMeta::Kafka(KafkaMeta {
timestamp: message.timestamp().to_millis(),
timestamp: message.timestamp(),
headers: if require_header {
message.headers().map(|headers| headers.detach())
} else {
Expand Down
17 changes: 12 additions & 5 deletions src/connector/src/source/kinesis/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,24 @@
// limitations under the License.

use aws_sdk_kinesis::types::Record;
use aws_smithy_types::DateTime;
use aws_smithy_types_convert::date_time::DateTimeExt;
use risingwave_common::types::{DatumRef, ScalarRefImpl};

use crate::source::{SourceMessage, SourceMeta, SplitId};

#[derive(Clone, Debug)]
pub struct KinesisMeta {
// from `approximate_arrival_timestamp` of type `Option<aws_smithy_types::DateTime>`
#[expect(dead_code)]
timestamp: Option<i64>,
timestamp: Option<DateTime>,
}

impl KinesisMeta {
pub fn extract_timestamp(&self) -> DatumRef<'_> {
Some(ScalarRefImpl::Timestamptz(
self.timestamp?.to_chrono_utc().ok()?.into(),
))
}
}

pub fn from_kinesis_record(value: &Record, split_id: SplitId) -> SourceMessage {
Expand All @@ -31,9 +40,7 @@ pub fn from_kinesis_record(value: &Record, split_id: SplitId) -> SourceMessage {
offset: value.sequence_number.clone(),
split_id,
meta: SourceMeta::Kinesis(KinesisMeta {
timestamp: value
.approximate_arrival_timestamp
.map(|dt| dt.to_chrono_utc().unwrap().timestamp_millis()),
timestamp: value.approximate_arrival_timestamp,
}),
}
}

0 comments on commit 624855a

Please sign in to comment.