Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support Kinesis with include timestamp #19211

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions integration_tests/kinesis-s3-source/create_mv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ FROM
ai.ad_id
) AS ad_clicks ON ad_impressions.ad_id = ad_clicks.ad_id;

CREATE MATERIALIZED VIEW kinesis_timestamp AS
SELECT
ad_impression._rw_kinesis_timestamp AS timestamp
FROM
ad_impression
WHERE ad_impression._rw_kinesis_timestamp IS NOT NULL AND ad_impression._rw_kinesis_timestamp >= '2024-01-01 00:00:00+00'::TIMESTAMPTZ;

CREATE MATERIALIZED VIEW ad_ctr_5min AS
SELECT
ac.ad_id AS ad_id,
Expand Down
8 changes: 7 additions & 1 deletion integration_tests/kinesis-s3-source/create_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ CREATE SOURCE ad_impression (
bid_id BIGINT,
ad_id BIGINT,
impression_timestamp TIMESTAMPTZ
) WITH (
)
INCLUDE key
INCLUDE partition
INCLUDE offset
INCLUDE timestamp
INCLUDE payload
WITH (
connector = 'kinesis',
stream = 'ad-impression',
aws.region='us-east-1',
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/kinesis-s3-source/data_check
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ad_ctr,ad_ctr_5min
ad_ctr,ad_ctr_5min,kinesis_timestamp
26 changes: 10 additions & 16 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl<'a> MessageMeta<'a> {
/// Extract the value for the given column.
///
/// Returns `None` if the column is not a meta column.
fn value_for_column(self, desc: &SourceColumnDesc) -> Option<DatumRef<'a>> {
fn value_for_column(self, desc: &SourceColumnDesc) -> DatumRef<'a> {
let datum: DatumRef<'_> = match desc.column_type {
// Row id columns are filled with `NULL` here and will be filled with the real
// row id generated by `RowIdGenExecutor` later.
Expand All @@ -229,11 +229,7 @@ impl<'a> MessageMeta<'a> {
KAFKA_TIMESTAMP_COLUMN_NAME,
"unexpected kafka meta column name"
);
kafka_meta.timestamp.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
.unwrap()
.into()
})
kafka_meta.extract_timestamp()
}
SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.meta => {
assert_eq!(
Expand All @@ -248,7 +244,7 @@ impl<'a> MessageMeta<'a> {
SourceColumnType::Meta | SourceColumnType::Normal => return None,
};

Some(datum)
datum
}
}

Expand Down Expand Up @@ -392,8 +388,7 @@ impl SourceStreamChunkRowWriter<'_> {
Ok(A::output_for(
self.row_meta
.as_ref()
.and_then(|row_meta| row_meta.value_for_column(desc))
.unwrap(), // handled all match cases in internal match, unwrap is safe
.and_then(|row_meta| row_meta.value_for_column(desc)),
))
}
(&SourceColumnType::Meta, _)
Expand All @@ -403,12 +398,11 @@ impl SourceStreamChunkRowWriter<'_> {
) =>
{
// SourceColumnType is for CDC source only.
return Ok(A::output_for(
Ok(A::output_for(
self.row_meta
.as_ref()
.and_then(|row_meta| row_meta.value_for_column(desc))
.unwrap(), // handled all match cases in internal match, unwrap is safe
));
.and_then(|row_meta| row_meta.value_for_column(desc)),
))
}

(
Expand All @@ -434,9 +428,9 @@ impl SourceStreamChunkRowWriter<'_> {
}
}
(_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta {
Some(row_meta) => Ok(A::output_for(
extreact_timestamp_from_meta(row_meta.meta).unwrap_or(None),
)),
Some(row_meta) => {
Ok(A::output_for(extreact_timestamp_from_meta(row_meta.meta)))
}
None => parse_field(desc), // parse from payload
},
(_, &Some(AdditionalColumnType::CollectionName(_))) => {
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/parser/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,11 @@ pub(super) async fn bytes_from_url(
}
}

pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option<DatumRef<'_>> {
pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> DatumRef<'_> {
match meta {
SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(),
SourceMeta::DebeziumCdc(cdc_meta) => Some(cdc_meta.extract_timestamp()),
SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta.extract_timestamp(),
SourceMeta::Kinesis(kinesis_meta) => kinesis_meta.extract_timestamp(),
_ => None,
}
}
Expand Down
17 changes: 7 additions & 10 deletions src/connector/src/source/kafka/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::borrow::Cow;

use itertools::Itertools;
use rdkafka::message::{BorrowedMessage, Headers, OwnedHeaders};
use rdkafka::Message;
use rdkafka::{Message, Timestamp};
use risingwave_common::types::{
Datum, DatumCow, DatumRef, ListValue, ScalarImpl, ScalarRefImpl, StructValue,
};
Expand All @@ -29,18 +29,15 @@ use crate::source::SourceMeta;

#[derive(Debug, Clone)]
pub struct KafkaMeta {
// timestamp(milliseconds) of message append in mq
pub timestamp: Option<i64>,
pub timestamp: Timestamp,
pub headers: Option<OwnedHeaders>,
}

impl KafkaMeta {
pub fn extract_timestamp(&self) -> Option<DatumRef<'_>> {
self.timestamp.map(|ts| {
Some(ScalarRefImpl::Timestamptz(
risingwave_common::cast::i64_to_timestamptz(ts).unwrap(),
))
})
pub fn extract_timestamp(&self) -> DatumRef<'_> {
Some(
risingwave_common::types::Timestamptz::from_millis(self.timestamp.to_millis()?)?.into(),
)
}

pub fn extract_header_inner<'a>(
Expand Down Expand Up @@ -101,7 +98,7 @@ impl SourceMessage {
offset: message.offset().to_string(),
split_id: message.partition().to_string().into(),
meta: SourceMeta::Kafka(KafkaMeta {
timestamp: message.timestamp().to_millis(),
timestamp: message.timestamp(),
headers: if require_header {
message.headers().map(|headers| headers.detach())
} else {
Expand Down
17 changes: 12 additions & 5 deletions src/connector/src/source/kinesis/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,24 @@
// limitations under the License.

use aws_sdk_kinesis::types::Record;
use aws_smithy_types::DateTime;
use aws_smithy_types_convert::date_time::DateTimeExt;
use risingwave_common::types::{DatumRef, ScalarRefImpl};

use crate::source::{SourceMessage, SourceMeta, SplitId};

#[derive(Clone, Debug)]
pub struct KinesisMeta {
// from `approximate_arrival_timestamp` of type `Option<aws_smithy_types::DateTime>`
#[expect(dead_code)]
timestamp: Option<i64>,
timestamp: Option<DateTime>,
}

impl KinesisMeta {
pub fn extract_timestamp(&self) -> DatumRef<'_> {
Some(ScalarRefImpl::Timestamptz(
self.timestamp?.to_chrono_utc().ok()?.into(),
))
}
}

pub fn from_kinesis_record(value: &Record, split_id: SplitId) -> SourceMessage {
Expand All @@ -31,9 +40,7 @@ pub fn from_kinesis_record(value: &Record, split_id: SplitId) -> SourceMessage {
offset: value.sequence_number.clone(),
split_id,
meta: SourceMeta::Kinesis(KinesisMeta {
timestamp: value
.approximate_arrival_timestamp
.map(|dt| dt.to_chrono_utc().unwrap().timestamp_millis()),
timestamp: value.approximate_arrival_timestamp,
}),
}
}
Loading