diff --git a/e2e_test/source/basic/inlcude_key_as.slt b/e2e_test/source/basic/inlcude_key_as.slt index ccf0df9e0277..83cace5eae6f 100644 --- a/e2e_test/source/basic/inlcude_key_as.slt +++ b/e2e_test/source/basic/inlcude_key_as.slt @@ -86,6 +86,12 @@ WHERE key_col IS NOT NULL ---- 101 +# the input data is from scripts/source/prepare_ci_kafka.sh +# ``` +# for i in {0..100}; do echo "key$i:{\"a\": $i}" | ${KCAT_BIN} -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done +# ``` +# The command generates 101 messages with key `key0` to `key100` and value `{"a": 0}` to `{"a": 100}`, with fixed headers `header1=v1` and `header2=v2`. + query TT SELECT (header_col[1]).key AS key, (header_col[1]).value::text AS value FROM additional_columns limit 1; diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index cf2b59ad145c..4e0bd6f2f3f0 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -14,16 +14,13 @@ use std::collections::HashMap; use bytes::Bytes; -use itertools::Itertools; -use rdkafka::message::Headers; use reqwest::Url; use risingwave_common::error::ErrorCode::{InvalidParameterValue, ProtocolError}; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::{Datum, ListValue, Scalar, ScalarImpl, StructValue}; +use risingwave_common::types::Datum; use crate::aws_utils::load_file_descriptor_from_s3; use crate::common::AwsAuthProps; -use crate::parser::additional_columns::get_kafka_header_item_datatype; use crate::source::SourceMeta; /// get kafka topic name @@ -117,35 +114,14 @@ pub(super) async fn bytes_from_url(url: &Url, config: Option<&AwsAuthProps>) -> pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option { match meta { - SourceMeta::Kafka(kafka_meta) => kafka_meta - .timestamp - .map(|ts| { - risingwave_common::cast::i64_to_timestamptz(ts) - .unwrap() - .to_scalar_value() - }) - .into(), + SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(), _ => None, } } pub fn extract_headers_from_meta(meta: &SourceMeta) -> Option { match meta { - SourceMeta::Kafka(kafka_meta) => kafka_meta.headers.as_ref().map(|header| { - let header_item: Vec = header - .iter() - .map(|header| { - Some(ScalarImpl::Struct(StructValue::new(vec![ - Some(ScalarImpl::Utf8(header.key.to_string().into())), - header.value.map(|byte| ScalarImpl::Bytea(byte.into())), - ]))) - }) - .collect_vec(); - Some(ScalarImpl::List(ListValue::from_datum_iter( - &get_kafka_header_item_datatype(), - header_item, - ))) - }), + SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_headers(), _ => None, } } diff --git a/src/connector/src/source/kafka/source/message.rs b/src/connector/src/source/kafka/source/message.rs index 057b83c96198..52f972253313 100644 --- a/src/connector/src/source/kafka/source/message.rs +++ b/src/connector/src/source/kafka/source/message.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use rdkafka::message::{BorrowedMessage, OwnedHeaders}; +use itertools::Itertools; +use rdkafka::message::{BorrowedMessage, Headers, OwnedHeaders}; use rdkafka::Message; +use risingwave_common::types::{Datum, ListValue, Scalar, ScalarImpl, StructValue}; +use crate::parser::additional_columns::get_kafka_header_item_datatype; use crate::source::base::SourceMessage; use crate::source::SourceMeta; @@ -25,6 +28,36 @@ pub struct KafkaMeta { pub headers: Option, } +impl KafkaMeta { + pub fn extract_timestamp(&self) -> Option { + self.timestamp + .map(|ts| { + risingwave_common::cast::i64_to_timestamptz(ts) + .unwrap() + .to_scalar_value() + }) + .into() + } + + pub fn extract_headers(&self) -> Option { + self.headers.as_ref().map(|headers| { + let header_item: Vec = headers + .iter() + .map(|header| { + Some(ScalarImpl::Struct(StructValue::new(vec![ + Some(ScalarImpl::Utf8(header.key.to_string().into())), + header.value.map(|byte| ScalarImpl::Bytea(byte.into())), + ]))) + }) + .collect_vec(); + Some(ScalarImpl::List(ListValue::from_datum_iter( + &get_kafka_header_item_datatype(), + header_item, + ))) + }) + } +} + impl SourceMessage { pub fn from_kafka_message(message: &BorrowedMessage<'_>, require_header: bool) -> Self { SourceMessage {