Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion committed Jan 10, 2024
1 parent 671f365 commit b60ba68
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 28 deletions.
6 changes: 6 additions & 0 deletions e2e_test/source/basic/inlcude_key_as.slt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ WHERE key_col IS NOT NULL
----
101

# the input data is from scripts/source/prepare_ci_kafka.sh
# ```
# for i in {0..100}; do echo "key$i:{\"a\": $i}" | ${KCAT_BIN} -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done
# ```
# The command generates 101 messages with key `key0` to `key100` and value `{"a": 0}` to `{"a": 100}`, with fixed headers `header1=v1` and `header2=v2`.

query TT
SELECT (header_col[1]).key AS key, (header_col[1]).value::text AS value
FROM additional_columns limit 1;
Expand Down
30 changes: 3 additions & 27 deletions src/connector/src/parser/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,13 @@
use std::collections::HashMap;

use bytes::Bytes;
use itertools::Itertools;
use rdkafka::message::Headers;
use reqwest::Url;
use risingwave_common::error::ErrorCode::{InvalidParameterValue, ProtocolError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::{Datum, ListValue, Scalar, ScalarImpl, StructValue};
use risingwave_common::types::Datum;

use crate::aws_utils::load_file_descriptor_from_s3;
use crate::common::AwsAuthProps;
use crate::parser::additional_columns::get_kafka_header_item_datatype;
use crate::source::SourceMeta;

/// get kafka topic name
Expand Down Expand Up @@ -117,35 +114,14 @@ pub(super) async fn bytes_from_url(url: &Url, config: Option<&AwsAuthProps>) ->

pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option<Datum> {
match meta {
SourceMeta::Kafka(kafka_meta) => kafka_meta
.timestamp
.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
.unwrap()
.to_scalar_value()
})
.into(),
SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(),
_ => None,
}
}

pub fn extract_headers_from_meta(meta: &SourceMeta) -> Option<Datum> {
match meta {
SourceMeta::Kafka(kafka_meta) => kafka_meta.headers.as_ref().map(|header| {
let header_item: Vec<Datum> = header
.iter()
.map(|header| {
Some(ScalarImpl::Struct(StructValue::new(vec![
Some(ScalarImpl::Utf8(header.key.to_string().into())),
header.value.map(|byte| ScalarImpl::Bytea(byte.into())),
])))
})
.collect_vec();
Some(ScalarImpl::List(ListValue::from_datum_iter(
&get_kafka_header_item_datatype(),
header_item,
)))
}),
SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_headers(),
_ => None,
}
}
35 changes: 34 additions & 1 deletion src/connector/src/source/kafka/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use rdkafka::message::{BorrowedMessage, OwnedHeaders};
use itertools::Itertools;
use rdkafka::message::{BorrowedMessage, Headers, OwnedHeaders};
use rdkafka::Message;
use risingwave_common::types::{Datum, ListValue, Scalar, ScalarImpl, StructValue};

use crate::parser::additional_columns::get_kafka_header_item_datatype;
use crate::source::base::SourceMessage;
use crate::source::SourceMeta;

Expand All @@ -25,6 +28,36 @@ pub struct KafkaMeta {
pub headers: Option<OwnedHeaders>,
}

impl KafkaMeta {
pub fn extract_timestamp(&self) -> Option<Datum> {
self.timestamp
.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
.unwrap()
.to_scalar_value()
})
.into()
}

pub fn extract_headers(&self) -> Option<Datum> {
self.headers.as_ref().map(|headers| {
let header_item: Vec<Datum> = headers
.iter()
.map(|header| {
Some(ScalarImpl::Struct(StructValue::new(vec![
Some(ScalarImpl::Utf8(header.key.to_string().into())),
header.value.map(|byte| ScalarImpl::Bytea(byte.into())),
])))
})
.collect_vec();
Some(ScalarImpl::List(ListValue::from_datum_iter(
&get_kafka_header_item_datatype(),
header_item,
)))
})
}
}

impl SourceMessage {
pub fn from_kafka_message(message: &BorrowedMessage<'_>, require_header: bool) -> Self {
SourceMessage {
Expand Down

0 comments on commit b60ba68

Please sign in to comment.