From 79807e60780e7b972eb0e9dcd156278327c55b61 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 20 Dec 2023 22:13:38 +0800 Subject: [PATCH 01/20] stash --- src/connector/src/parser/mod.rs | 12 ++++++++++-- src/connector/src/parser/util.rs | 16 ++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 448c98ec571ae..788b24cf587dd 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -50,6 +50,7 @@ use self::util::get_kafka_topic; use crate::common::AwsAuthProps; use crate::parser::maxwell::MaxwellParser; use crate::parser::unified::AccessError; +use crate::parser::util::extreact_timestamp_from_meta; use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; use crate::source::{ @@ -340,10 +341,17 @@ impl SourceStreamChunkRowWriter<'_> { .unwrap(), // handled all match cases in internal match, unwrap is safe )); } + (_, &AdditionalColumnType::Timestamp) => { + return Ok(A::output_for( + self.row_meta + .as_ref() + .and_then(|ele| extreact_timestamp_from_meta(ele.meta)) + .unwrap(), + )) + } ( _, - &AdditionalColumnType::Timestamp - | &AdditionalColumnType::Partition + &AdditionalColumnType::Partition | &AdditionalColumnType::Filename | &AdditionalColumnType::Offset | &AdditionalColumnType::Header, diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index f793adcd6dae6..356a1f7fe0114 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -20,9 +20,11 @@ use risingwave_common::error::ErrorCode::{ InternalError, InvalidConfigValue, InvalidParameterValue, ProtocolError, }; use risingwave_common::error::{Result, RwError}; +use risingwave_common::types::{Datum, Scalar}; use crate::aws_utils::{default_conn_config, s3_client}; use crate::common::AwsAuthProps; +use crate::source::SourceMeta; const AVRO_SCHEMA_LOCATION_S3_REGION: &str = "region"; @@ -143,3 +145,17 @@ pub(super) async fn read_schema_from_s3(url: &Url, config: &AwsAuthProps) -> Res String::from_utf8(schema_bytes) .map_err(|e| RwError::from(InternalError(format!("Avro schema not valid utf8 {}", e)))) } + +pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option { + match meta { + SourceMeta::Kafka(kafka_meta) => kafka_meta + .timestamp + .map(|ts| { + risingwave_common::cast::i64_to_timestamptz(ts) + .unwrap() + .to_scalar_value() + }) + .into(), + _ => None, + } +} From 54d60b8b3956e722c0c7b28b644718c31ae43db6 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 21 Dec 2023 16:06:07 +0800 Subject: [PATCH 02/20] change table display --- src/sqlparser/src/ast/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index d6063b8d4872d..ad2ad724c5fb1 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1618,12 +1618,12 @@ impl fmt::Display for Statement { write!(f, " APPEND ONLY")?; } if !include_column_options.is_empty() { // (Ident, Option) - write!(f, " INCLUDE {}", display_comma_separated( + write!(f, "{}", display_comma_separated( include_column_options.iter().map(|(a, b)| { if let Some(b) = b { - format!("{} AS {}", a, b) + format!("INCLUDE {} AS {}", a, b) } else { - a.to_string() + format!("INCLUDE {}", a) } }).collect_vec().as_slice() ))?; From 3a89125486787f32692c3abf53ff5df5f21ca0be Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 26 Dec 2023 14:51:06 +0800 Subject: [PATCH 03/20] stash --- .../src/parser/additional_columns.rs | 164 +++++++++++++----- src/connector/src/parser/mod.rs | 23 ++- src/connector/src/source/base.rs | 2 + src/connector/src/source/kinesis/mod.rs | 1 + .../src/source/kinesis/source/message.rs | 23 +++ .../src/source/kinesis/source/mod.rs | 2 + .../src/source/kinesis/source/reader.rs | 9 +- 7 files changed, 168 insertions(+), 56 deletions(-) diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 767a656b63f44..74d44ce370735 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -73,7 +73,7 @@ fn kafka_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColum column_desc: ColumnDesc::named_with_additional_column( name, id, - DataType::Int64, + DataType::Varchar, AdditionalColumnType::Partition, ), is_hidden: false, @@ -87,7 +87,7 @@ fn kafka_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColum column_desc: ColumnDesc::named_with_additional_column( name, id, - DataType::Int64, + DataType::Varchar, AdditionalColumnType::Offset, ), is_hidden: false, @@ -110,52 +110,128 @@ fn kafka_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColum } fn pulsar_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColumnsFn)> { - vec![( - "key", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Bytea, - AdditionalColumnType::Key, - ), - is_hidden: false, - } - }), - )] + vec![ + ( + "key", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Bytea, + AdditionalColumnType::Key, + ), + is_hidden: false, + } + }), + ), + ( + "partition", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Varchar, + AdditionalColumnType::Partition, + ), + is_hidden: false, + } + }), + ), + ( + "offset", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Varchar, + AdditionalColumnType::Offset, + ), + is_hidden: false, + } + }), + ), + ] } fn kinesis_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColumnsFn)> { - vec![( - "key", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Bytea, - AdditionalColumnType::Key, - ), - is_hidden: false, - } - }), - )] + vec![ + ( + "key", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Bytea, + AdditionalColumnType::Key, + ), + is_hidden: false, + } + }), + ), + ( + "partition", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Varchar, + AdditionalColumnType::Partition, + ), + is_hidden: false, + } + }), + ), + ( + "offset", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Varchar, + AdditionalColumnType::Offset, + ), + is_hidden: false, + } + }), + ), + ] } fn s3_compatible_column_column_vec() -> Vec<(&'static str, CompatibleAdditionalColumnsFn)> { - vec![( - "file", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Varchar, - AdditionalColumnType::Filename, - ), - is_hidden: false, - } - }), - )] + vec![ + ( + "file", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Varchar, + AdditionalColumnType::Filename, + ), + is_hidden: false, + } + }), + ), + ( + "offset", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Varchar, + AdditionalColumnType::Offset, + ), + is_hidden: false, + } + }), + ), + ] } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 788b24cf587dd..8e1e2adb16080 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -31,7 +31,7 @@ use risingwave_common::catalog::{KAFKA_TIMESTAMP_COLUMN_NAME, TABLE_NAME_COLUMN_ use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; use risingwave_common::log::LogSuppresser; -use risingwave_common::types::{Datum, Scalar}; +use risingwave_common::types::{Datum, Scalar, ScalarImpl}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::catalog::{ SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy, StreamSourceInfo, @@ -349,12 +349,25 @@ impl SourceStreamChunkRowWriter<'_> { .unwrap(), )) } + (_, &AdditionalColumnType::Partition) => { + // the meta info does not involve spec connector + return Ok(A::output_for( + self.row_meta + .as_ref() + .map(|ele| ScalarImpl::Utf8(ele.split_id.to_string().into())), + )); + } + (_, &AdditionalColumnType::Offset) => { + // the meta info does not involve spec connector + return Ok(A::output_for( + self.row_meta + .as_ref() + .map(|ele| ScalarImpl::Utf8(ele.offset.to_string().into())), + )); + } ( _, - &AdditionalColumnType::Partition - | &AdditionalColumnType::Filename - | &AdditionalColumnType::Offset - | &AdditionalColumnType::Header, + &AdditionalColumnType::Filename | &AdditionalColumnType::Header, // AdditionalColumnType::Unspecified and AdditionalColumnType::Normal is means it comes from message payload // AdditionalColumnType::Key is processed in normal process, together with Unspecified ones ) => Err(AccessError::Other(anyhow!( diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 8db65bb9681bf..b10d203c6a640 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -40,6 +40,7 @@ use super::datagen::DatagenMeta; use super::filesystem::FsSplit; use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; +use super::kinesis::KinesisMeta; use super::monitor::SourceMetrics; use super::nexmark::source::message::NexmarkMeta; use super::OPENDAL_S3_CONNECTOR; @@ -575,6 +576,7 @@ pub struct SourceMessage { #[derive(Debug, Clone)] pub enum SourceMeta { Kafka(KafkaMeta), + Kinesis(KinesisMeta), Nexmark(NexmarkMeta), GooglePubsub(GooglePubsubMeta), Datagen(DatagenMeta), diff --git a/src/connector/src/source/kinesis/mod.rs b/src/connector/src/source/kinesis/mod.rs index ebb3184cc501b..c641c8ff73884 100644 --- a/src/connector/src/source/kinesis/mod.rs +++ b/src/connector/src/source/kinesis/mod.rs @@ -18,6 +18,7 @@ pub mod split; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; +pub use source::KinesisMeta; use with_options::WithOptions; use crate::common::KinesisCommon; diff --git a/src/connector/src/source/kinesis/source/message.rs b/src/connector/src/source/kinesis/source/message.rs index 9e63c05be0e0f..860d47dc7446d 100644 --- a/src/connector/src/source/kinesis/source/message.rs +++ b/src/connector/src/source/kinesis/source/message.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use aws_sdk_kinesis::types::Record; use crate::source::{SourceMessage, SourceMeta, SplitId}; @@ -36,6 +38,27 @@ impl From for SourceMessage { } } +#[derive(Clone, Debug)] +pub struct KinesisMeta { + // from `approximate_arrival_timestamp` of type `Option` + timestamp: Option, +} + +pub fn from_kinesis_record(value: &Record, split_id: SplitId) -> SourceMessage { + SourceMessage { + key: Some(value.partition_key.into_bytes()), + payload: Some(value.data.into_inner()), + offset: value.sequence_number.clone(), + split_id, + meta: SourceMeta::Kinesis(KinesisMeta { + timestamp: value + .approximate_arrival_timestamp + // todo: review if safe to unwrap + .map(|dt| dt.to_millis().unwrap()), + }), + } +} + impl KinesisMessage { pub fn new(shard_id: SplitId, message: Record) -> Self { KinesisMessage { diff --git a/src/connector/src/source/kinesis/source/mod.rs b/src/connector/src/source/kinesis/source/mod.rs index cd90a79f28913..a90922229ea3d 100644 --- a/src/connector/src/source/kinesis/source/mod.rs +++ b/src/connector/src/source/kinesis/source/mod.rs @@ -14,3 +14,5 @@ mod message; pub mod reader; + +pub use message::KinesisMeta; diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index 8ba465342c2ce..d1424651be39d 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -25,7 +25,7 @@ use futures_async_stream::try_stream; use tokio_retry; use crate::parser::ParserConfig; -use crate::source::kinesis::source::message::KinesisMessage; +use crate::source::kinesis::source::message::from_kinesis_record; use crate::source::kinesis::split::{KinesisOffset, KinesisSplit}; use crate::source::kinesis::KinesisProperties; use crate::source::{ @@ -138,12 +138,7 @@ impl CommonSplitReader for KinesisSplitReader { Ok(resp) => { self.shard_iter = resp.next_shard_iterator().map(String::from); let chunk = (resp.records().iter()) - .map(|r| { - SourceMessage::from(KinesisMessage::new( - self.shard_id.clone(), - r.clone(), - )) - }) + .map(|r| from_kinesis_record(r, self.split_id.clone())) .collect::>(); if chunk.is_empty() { tokio::time::sleep(Duration::from_millis(200)).await; From 9b4dd377f3fbad9da7ccc4c4eccd5e45af9523e0 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 26 Dec 2023 17:31:15 +0800 Subject: [PATCH 04/20] fix --- Cargo.lock | 15 +++++++++++++-- src/connector/Cargo.toml | 1 + .../src/source/kinesis/source/message.rs | 10 ++++------ 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5170a8fe09187..488f4a0476977 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1252,9 +1252,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.0.1" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2fc32035dc0636a8583cf0c6dd7f1e6d5404103b836d26228b8730907a88d9f" +checksum = "2aba8136605d14ac88f57dc3a693a9f8a4eab4a3f52bc03ff13746f0cd704e97" dependencies = [ "base64-simd", "bytes", @@ -1274,6 +1274,16 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "aws-smithy-types-convert" +version = "0.60.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2916d0db08f25d26d2d27d3a0d339075caead06c2a41f09870a121e2f87a4893" +dependencies = [ + "aws-smithy-types", + "chrono", +] + [[package]] name = "aws-smithy-xml" version = "0.60.0" @@ -8431,6 +8441,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-runtime-api", "aws-smithy-types", + "aws-smithy-types-convert", "aws-types", "base64 0.21.4", "byteorder", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 92aa632201409..376b4f1e153e2 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -34,6 +34,7 @@ aws-sdk-s3 = { workspace = true } aws-smithy-http = { workspace = true } aws-smithy-runtime-api = { workspace = true } aws-smithy-types = { workspace = true } +aws-smithy-types-convert = { version = "0.60.1", features = ["convert-chrono"] } aws-types = { workspace = true } base64 = "0.21" byteorder = "1" diff --git a/src/connector/src/source/kinesis/source/message.rs b/src/connector/src/source/kinesis/source/message.rs index 860d47dc7446d..0e4b18140068d 100644 --- a/src/connector/src/source/kinesis/source/message.rs +++ b/src/connector/src/source/kinesis/source/message.rs @@ -12,9 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use aws_sdk_kinesis::types::Record; +use aws_smithy_types_convert::date_time::DateTimeExt; use crate::source::{SourceMessage, SourceMeta, SplitId}; @@ -46,15 +45,14 @@ pub struct KinesisMeta { pub fn from_kinesis_record(value: &Record, split_id: SplitId) -> SourceMessage { SourceMessage { - key: Some(value.partition_key.into_bytes()), - payload: Some(value.data.into_inner()), + key: Some(value.partition_key.clone().into_bytes()), + payload: Some(value.data.clone().into_inner()), offset: value.sequence_number.clone(), split_id, meta: SourceMeta::Kinesis(KinesisMeta { timestamp: value .approximate_arrival_timestamp - // todo: review if safe to unwrap - .map(|dt| dt.to_millis().unwrap()), + .map(|dt| dt.to_chrono_utc().unwrap().timestamp_millis()), }), } } From 3f96e4d0873d4a253ec473e2ffb47a50a414c184 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 28 Dec 2023 18:28:12 +0800 Subject: [PATCH 05/20] support header --- .../src/parser/additional_columns.rs | 32 +++++++++++-------- src/connector/src/parser/mod.rs | 15 ++++++--- src/connector/src/parser/util.rs | 26 ++++++++++++++- .../src/source/kafka/source/message.rs | 10 ++++-- .../src/source/kafka/source/reader.rs | 12 ++++++- 5 files changed, 74 insertions(+), 21 deletions(-) diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 74d44ce370735..60c190cbd8b8a 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, StructType}; use risingwave_pb::plan_common::AdditionalColumnType; use crate::source::{ @@ -94,21 +94,27 @@ fn kafka_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColum } }), ), - // Todo(tabVersion): add header column desc - // ( - // "header", - // Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - // ColumnCatalog { - // column_desc: ColumnDesc::named(name, id, DataType::List( - // - // )), - // is_hidden: false, - // } - // }), - // ), + ( + "header", // type: struct[] + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named( + name, + id, + DataType::List(get_kafka_header_item_datatype().into()), + ), + is_hidden: false, + } + }), + ), ] } +pub fn get_kafka_header_item_datatype() -> DataType { + let struct_inner = vec![("key", DataType::Varchar), ("value", DataType::Bytea)]; + DataType::Struct(StructType::new(struct_inner)) +} + fn pulsar_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColumnsFn)> { vec![ ( diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 8e1e2adb16080..8dd3ac03facf4 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -50,7 +50,7 @@ use self::util::get_kafka_topic; use crate::common::AwsAuthProps; use crate::parser::maxwell::MaxwellParser; use crate::parser::unified::AccessError; -use crate::parser::util::extreact_timestamp_from_meta; +use crate::parser::util::{extract_headers_from_meta, extreact_timestamp_from_meta}; use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; use crate::source::{ @@ -365,11 +365,18 @@ impl SourceStreamChunkRowWriter<'_> { .map(|ele| ScalarImpl::Utf8(ele.offset.to_string().into())), )); } + (_, &AdditionalColumnType::Header) => { + return Ok(A::output_for( + self.row_meta + .as_ref() + .and_then(|ele| extract_headers_from_meta(ele.meta)) + .unwrap(), + )) + } ( _, - &AdditionalColumnType::Filename | &AdditionalColumnType::Header, - // AdditionalColumnType::Unspecified and AdditionalColumnType::Normal is means it comes from message payload - // AdditionalColumnType::Key is processed in normal process, together with Unspecified ones + &AdditionalColumnType::Filename, /* AdditionalColumnType::Unspecified and AdditionalColumnType::Normal is means it comes from message payload + * AdditionalColumnType::Key is processed in normal process, together with Unspecified ones */ ) => Err(AccessError::Other(anyhow!( "Column type {:?} not implemented yet", &desc.additional_column_type diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 356a1f7fe0114..1f38214231bf8 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -15,15 +15,18 @@ use std::collections::HashMap; use std::path::Path; use bytes::Bytes; +use itertools::Itertools; +use rdkafka::message::Headers; use reqwest::Url; use risingwave_common::error::ErrorCode::{ InternalError, InvalidConfigValue, InvalidParameterValue, ProtocolError, }; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::{Datum, Scalar}; +use risingwave_common::types::{Datum, ListValue, Scalar, ScalarImpl, StructValue}; use crate::aws_utils::{default_conn_config, s3_client}; use crate::common::AwsAuthProps; +use crate::parser::additional_columns::get_kafka_header_item_datatype; use crate::source::SourceMeta; const AVRO_SCHEMA_LOCATION_S3_REGION: &str = "region"; @@ -159,3 +162,24 @@ pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option { _ => None, } } + +pub fn extract_headers_from_meta(meta: &SourceMeta) -> Option { + match meta { + SourceMeta::Kafka(kafka_meta) => kafka_meta.headers.as_ref().map(|header| { + let header_item: Vec = header + .iter() + .map(|header| { + Some(ScalarImpl::Struct(StructValue::new(vec![ + Some(ScalarImpl::Utf8(header.key.to_string().into())), + header.value.map(|byte| ScalarImpl::Bytea(byte.into())), + ]))) + }) + .collect_vec(); + Some(ScalarImpl::List(ListValue::from_datum_iter( + &get_kafka_header_item_datatype(), + header_item, + ))) + }), + _ => None, + } +} diff --git a/src/connector/src/source/kafka/source/message.rs b/src/connector/src/source/kafka/source/message.rs index bc3d4e38cbd2b..c5730144fca5e 100644 --- a/src/connector/src/source/kafka/source/message.rs +++ b/src/connector/src/source/kafka/source/message.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use rdkafka::message::BorrowedMessage; +use rdkafka::message::{BorrowedMessage, OwnedHeaders}; use rdkafka::Message; use crate::source::base::SourceMessage; @@ -22,10 +22,11 @@ use crate::source::SourceMeta; pub struct KafkaMeta { // timestamp(milliseconds) of message append in mq pub timestamp: Option, + pub headers: Option, } impl SourceMessage { - pub fn from_kafka_message(message: &BorrowedMessage<'_>) -> Self { + pub fn from_kafka_message(message: &BorrowedMessage<'_>, require_header: bool) -> Self { SourceMessage { // TODO(TaoWu): Possible performance improvement: avoid memory copying here. key: message.key().map(|p| p.to_vec()), @@ -34,6 +35,11 @@ impl SourceMessage { split_id: message.partition().to_string().into(), meta: SourceMeta::Kafka(KafkaMeta { timestamp: message.timestamp().to_millis(), + headers: if require_header { + message.headers().map(|headers| headers.detach()) + } else { + None + }, }), } } diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 1053fa8c0aad8..66b4a730140c2 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -25,6 +25,7 @@ use rdkafka::config::RDKafkaLogLevel; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::error::KafkaError; use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; +use risingwave_pb::plan_common::AdditionalColumnType; use crate::parser::ParserConfig; use crate::source::base::SourceMessage; @@ -194,6 +195,14 @@ impl CommonSplitReader for KafkaSplitReader { let mut num_messages = 0; let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size; let mut res = Vec::with_capacity(max_chunk_size); + // ingest kafka message header can be expensive, do it only when required + let require_message_header = self + .parser_config + .common + .rw_columns + .iter() + .any(|col_desc| col_desc.additional_column_type == AdditionalColumnType::Header); + #[for_await] 'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(max_chunk_size) { let msgs: Vec<_> = msgs @@ -218,7 +227,8 @@ impl CommonSplitReader for KafkaSplitReader { Some(payload) => payload.len(), }; num_messages += 1; - let source_message = SourceMessage::from_kafka_message(&msg); + let source_message = + SourceMessage::from_kafka_message(&msg, require_message_header); let split_id = source_message.split_id.clone(); res.push(source_message); From 7107acf8d97c07fa91834d6aeb9e88abf16d1a53 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 28 Dec 2023 20:30:02 +0800 Subject: [PATCH 06/20] fix --- src/connector/src/parser/additional_columns.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 60c190cbd8b8a..a350ec002a0ed 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -98,10 +98,11 @@ fn kafka_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColum "header", // type: struct[] Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { ColumnCatalog { - column_desc: ColumnDesc::named( + column_desc: ColumnDesc::named_with_additional_column( name, id, DataType::List(get_kafka_header_item_datatype().into()), + AdditionalColumnType::Header, ), is_hidden: false, } From 587fb353d3ed2fae6526775096afec14cfb308e9 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 4 Jan 2024 14:07:55 +0800 Subject: [PATCH 07/20] stash --- scripts/source/additional_columns.py | 153 ++++++++++++++++++ .../src/parser/additional_columns.rs | 5 +- src/connector/src/parser/mod.rs | 4 +- 3 files changed, 158 insertions(+), 4 deletions(-) create mode 100644 scripts/source/additional_columns.py diff --git a/scripts/source/additional_columns.py b/scripts/source/additional_columns.py new file mode 100644 index 0000000000000..8fdb495c42b4f --- /dev/null +++ b/scripts/source/additional_columns.py @@ -0,0 +1,153 @@ +#!/usr/bin/python3 +import json +import random + +from confluent_kafka import Producer, Consumer, KafkaError, KafkaException +from confluent_kafka.admin import AdminClient, NewTopic + + +def get_column_name_by_connector_and_column(connector_name: str, additional_column_name: str) -> str: + f"""_rw_{connector_name}_{additional_column_name}""" + + +def get_supported_additional_columns_by_connector(connector_name: str) -> [str]: + # keep consistent with `src/connector/src/parser/additional_columns.rs` + supported_dict = { + "kafka": ["key", "partition", "offset", "timestamp", "header"], + "pulsar": ["key", "partition", "offset"], + "kinesis": ["key", "partition", "offset"], + "s3_v2": ["file", "offset"], + # todo: gcs + } + if connector_name not in supported_dict: + raise Exception("connector {} is not supported".format(connector_name)) + + return supported_dict[connector_name] + + +def get_connect_config_by_connector(connector_name: str) -> dict: + connect_config_dict = { + "kafka": { + "topic": "test_additional_columns", + "properties.bootstrap.server": "localhost:29092", # redpanda docker run here + "connector": "kafka", + } + } + if connector_name not in connect_config_dict: + raise Exception("connector {} is not supported".format(connector_name)) + + return connect_config_dict[connector_name] + + +def create_source_with_spec_connector(connector_name: str): + additional_columns = get_supported_additional_columns_by_connector(connector_name) + connect_config = get_connect_config_by_connector(connector_name) + + table_name = f"{connector_name}_test_additional_column" + base_schema = "CREATE TABLE {} (id INT, name VARCHAR, age INT) {} WITH ({}) FORMAT PLAIN ENCODE JSON" + additional_columns_str = "".join([" include {} ".format(column) for column in additional_columns]) + connect_config_str = ", ".join(["{} = '{}'".format(k, v) for k, v in connect_config.items()]) + + run_sql = base_schema.format(table_name, additional_columns_str, connect_config_str) + print(run_sql) + + return table_name + + +def drop_table(table_name: str): + sql = f"DROP TABLE IF EXISTS {table_name}" + print(sql) + + +def generate_data(): + for i in range(200): + key = "key_{}".format(i) + gen_one_row = { + "id": i, + "name": "name_{}".format(i), + "age": i % 20, + } + yield key, gen_one_row + + +def generate_kafka(connect_config: dict): + # generate kafka key, payload, header + broker_addr = connect_config["properties.bootstrap.server"] + topic = connect_config["topic"] + + kafka_conf = {'bootstrap.servers': broker_addr} + + admin_client = AdminClient(kafka_conf) + topic_list = [NewTopic(topic, num_partitions=3)] + admin_client.create_topics(topic_list) + + producer = Producer(kafka_conf) + header_pool = [("key1", "value1"), ("key2", "value2"), ("key3", "value3"), ("key4", "value4")] + + for key, one_row in generate_data(): + headers = random.sample(header_pool, random.randint(0, len(header_pool))) + producer.produce(topic=topic, value=json.dumps(one_row), key=json.dumps(key), headers=headers) + producer.flush() + + +def load_mq_for_ground_truth(): + pass + + +def load_kafka(config: dict) -> dict: + + def on_assign(consumer, partitions): + global eof_reached + eof_reached = {partition: False for partition in partitions} + + eof_reached = {} + + connect_conf = { + "bootstrap.servers": config["properties.bootstrap.server"], + "auto.offset.reset": "earliest", + "group.id": "consumer-ci", + } + topic = config["topic"] + consumer = Consumer(connect_conf) + consumer.subscribe([topic], on_assign=on_assign) + + print(eof_reached) + + result = {} + try: + while True: + msg = consumer.poll(timeout=1.0) + print(msg) + if msg is None: + print("No message") + continue + if msg.error(): + if msg.error().code() == KafkaError.PARTITION_EOF: + eof_reached[msg.partition()] = True + print("End of partition reached {}/{}".format(msg.topic(), msg.partition())) + if all(eof_reached.values()): + print("All partitions EOF, exiting") + break + continue + else: + raise KafkaException(msg.error()) + else: + result[msg.key()] = { + # focus on the following columns + "partition": msg.partition(), + "offset": msg.offset(), + "timestamp": msg.timestamp(), + "headers": msg.headers(), + } + + finally: + consumer.close() + + return result + + +if __name__ == '__main__': + # generate_kafka(get_connect_config_by_connector("kafka")) + + res = load_kafka(get_connect_config_by_connector("kafka")) + print(len(res)) diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index a350ec002a0ed..6186347e947e6 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -17,7 +17,8 @@ use risingwave_common::types::{DataType, StructType}; use risingwave_pb::plan_common::AdditionalColumnType; use crate::source::{ - KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, + GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR, + S3_CONNECTOR, }; pub type CompatibleAdditionalColumnsFn = @@ -30,7 +31,7 @@ pub fn get_connector_compatible_additional_columns( KAFKA_CONNECTOR => kafka_compatible_column_vec(), PULSAR_CONNECTOR => pulsar_compatible_column_vec(), KINESIS_CONNECTOR => kinesis_compatible_column_vec(), - OPENDAL_S3_CONNECTOR | S3_CONNECTOR => s3_compatible_column_column_vec(), + OPENDAL_S3_CONNECTOR | S3_CONNECTOR | GCS_CONNECTOR => s3_compatible_column_column_vec(), _ => return None, }; Some(compatible_columns) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 8dd3ac03facf4..f2ab8e51b0ea6 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -346,7 +346,7 @@ impl SourceStreamChunkRowWriter<'_> { self.row_meta .as_ref() .and_then(|ele| extreact_timestamp_from_meta(ele.meta)) - .unwrap(), + .unwrap_or(None), )) } (_, &AdditionalColumnType::Partition) => { @@ -370,7 +370,7 @@ impl SourceStreamChunkRowWriter<'_> { self.row_meta .as_ref() .and_then(|ele| extract_headers_from_meta(ele.meta)) - .unwrap(), + .unwrap_or(None), )) } ( From 64552ed7edf5127448bcdf4ec09e164a7a5b2689 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 4 Jan 2024 16:43:33 +0800 Subject: [PATCH 08/20] kafka additional col test --- e2e_test/s3/json_file.py | 6 +- e2e_test/source/basic/inlcude_key_as.slt | 36 ++++++ scripts/source/additional_columns.py | 153 ----------------------- scripts/source/prepare_ci_kafka.sh | 4 + src/connector/src/parser/mod.rs | 16 +-- 5 files changed, 53 insertions(+), 162 deletions(-) delete mode 100644 scripts/source/additional_columns.py diff --git a/e2e_test/s3/json_file.py b/e2e_test/s3/json_file.py index 585f44a7ce825..7c92f29d76226 100644 --- a/e2e_test/s3/json_file.py +++ b/e2e_test/s3/json_file.py @@ -25,7 +25,11 @@ def do_test(client, config, N, prefix): name TEXT, sex int, mark int, - ) WITH ( + ) + include file + WITH ( + + connector = 's3', match_pattern = '{prefix}*.json', s3.region_name = '{config['S3_REGION']}', diff --git a/e2e_test/source/basic/inlcude_key_as.slt b/e2e_test/source/basic/inlcude_key_as.slt index d7780d4916376..d5c073a50a865 100644 --- a/e2e_test/source/basic/inlcude_key_as.slt +++ b/e2e_test/source/basic/inlcude_key_as.slt @@ -48,9 +48,25 @@ WITH ( topic = 'upsert_json') FORMAT PLAIN ENCODE JSON +statement ok +create table additional_columns (a int) +include key as key_col +include partition as partition_col +include offset as offset_col +include timestamp as timestamp_col +include header as header_col +WITH ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'kafka_additional_columns') +FORMAT PLAIN ENCODE JSON + statement ok select * from upsert_students_default_key; +statement ok +select * from additional_columns; + # Wait enough time to ensure SourceExecutor consumes all Kafka data. sleep 3s @@ -59,5 +75,25 @@ select count(rw_key) from upsert_students_default_key ---- 15 +query I +SELECT count(*) +FROM additional_columns +WHERE key_col IS NOT NULL + AND partition_col IS NOT NULL + AND offset_col IS NOT NULL + AND timestamp_col IS NOT NULL + AND header_col IS NOT NULL +---- +101 + +query TT +SELECT (_rw_kafka_header[1]).key AS key,SELECT (_rw_kafka_header[1]).value::text AS value +FROM additional_columns limit 1; +---- +header1 \x7631 + statement ok drop table upsert_students_default_key + +statement ok +drop table additional_columns diff --git a/scripts/source/additional_columns.py b/scripts/source/additional_columns.py deleted file mode 100644 index 8fdb495c42b4f..0000000000000 --- a/scripts/source/additional_columns.py +++ /dev/null @@ -1,153 +0,0 @@ -#!/usr/bin/python3 -import json -import random - -from confluent_kafka import Producer, Consumer, KafkaError, KafkaException -from confluent_kafka.admin import AdminClient, NewTopic - - -def get_column_name_by_connector_and_column(connector_name: str, additional_column_name: str) -> str: - f"""_rw_{connector_name}_{additional_column_name}""" - - -def get_supported_additional_columns_by_connector(connector_name: str) -> [str]: - # keep consistent with `src/connector/src/parser/additional_columns.rs` - supported_dict = { - "kafka": ["key", "partition", "offset", "timestamp", "header"], - "pulsar": ["key", "partition", "offset"], - "kinesis": ["key", "partition", "offset"], - "s3_v2": ["file", "offset"], - # todo: gcs - } - if connector_name not in supported_dict: - raise Exception("connector {} is not supported".format(connector_name)) - - return supported_dict[connector_name] - - -def get_connect_config_by_connector(connector_name: str) -> dict: - connect_config_dict = { - "kafka": { - "topic": "test_additional_columns", - "properties.bootstrap.server": "localhost:29092", # redpanda docker run here - "connector": "kafka", - } - } - if connector_name not in connect_config_dict: - raise Exception("connector {} is not supported".format(connector_name)) - - return connect_config_dict[connector_name] - - -def create_source_with_spec_connector(connector_name: str): - additional_columns = get_supported_additional_columns_by_connector(connector_name) - connect_config = get_connect_config_by_connector(connector_name) - - table_name = f"{connector_name}_test_additional_column" - base_schema = "CREATE TABLE {} (id INT, name VARCHAR, age INT) {} WITH ({}) FORMAT PLAIN ENCODE JSON" - additional_columns_str = "".join([" include {} ".format(column) for column in additional_columns]) - connect_config_str = ", ".join(["{} = '{}'".format(k, v) for k, v in connect_config.items()]) - - run_sql = base_schema.format(table_name, additional_columns_str, connect_config_str) - print(run_sql) - - return table_name - - -def drop_table(table_name: str): - sql = f"DROP TABLE IF EXISTS {table_name}" - print(sql) - - -def generate_data(): - for i in range(200): - key = "key_{}".format(i) - gen_one_row = { - "id": i, - "name": "name_{}".format(i), - "age": i % 20, - } - yield key, gen_one_row - - -def generate_kafka(connect_config: dict): - # generate kafka key, payload, header - broker_addr = connect_config["properties.bootstrap.server"] - topic = connect_config["topic"] - - kafka_conf = {'bootstrap.servers': broker_addr} - - admin_client = AdminClient(kafka_conf) - topic_list = [NewTopic(topic, num_partitions=3)] - admin_client.create_topics(topic_list) - - producer = Producer(kafka_conf) - header_pool = [("key1", "value1"), ("key2", "value2"), ("key3", "value3"), ("key4", "value4")] - - for key, one_row in generate_data(): - headers = random.sample(header_pool, random.randint(0, len(header_pool))) - producer.produce(topic=topic, value=json.dumps(one_row), key=json.dumps(key), headers=headers) - producer.flush() - - -def load_mq_for_ground_truth(): - pass - - -def load_kafka(config: dict) -> dict: - - def on_assign(consumer, partitions): - global eof_reached - eof_reached = {partition: False for partition in partitions} - - eof_reached = {} - - connect_conf = { - "bootstrap.servers": config["properties.bootstrap.server"], - "auto.offset.reset": "earliest", - "group.id": "consumer-ci", - } - topic = config["topic"] - consumer = Consumer(connect_conf) - consumer.subscribe([topic], on_assign=on_assign) - - print(eof_reached) - - result = {} - try: - while True: - msg = consumer.poll(timeout=1.0) - print(msg) - if msg is None: - print("No message") - continue - if msg.error(): - if msg.error().code() == KafkaError.PARTITION_EOF: - eof_reached[msg.partition()] = True - print("End of partition reached {}/{}".format(msg.topic(), msg.partition())) - if all(eof_reached.values()): - print("All partitions EOF, exiting") - break - continue - else: - raise KafkaException(msg.error()) - else: - result[msg.key()] = { - # focus on the following columns - "partition": msg.partition(), - "offset": msg.offset(), - "timestamp": msg.timestamp(), - "headers": msg.headers(), - } - - finally: - consumer.close() - - return result - - -if __name__ == '__main__': - # generate_kafka(get_connect_config_by_connector("kafka")) - - res = load_kafka(get_connect_config_by_connector("kafka")) - print(len(res)) diff --git a/scripts/source/prepare_ci_kafka.sh b/scripts/source/prepare_ci_kafka.sh index cdc670a572967..d030f87e1f45b 100755 --- a/scripts/source/prepare_ci_kafka.sh +++ b/scripts/source/prepare_ci_kafka.sh @@ -74,6 +74,10 @@ for filename in $kafka_data_files; do ) & done +# test additional columns: produce messages with headers +ADDI_COLUMN_TOPIC="kafka_additional_columns" +for i in {0..100}; do echo "key$i:{\"a\": $i}" | kcat -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done + # write schema with name strategy ## topic: upsert_avro_json-record, key subject: string, value subject: CPLM.OBJ_ATTRIBUTE_VALUE diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index f2ab8e51b0ea6..0b97a48de27a5 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -373,14 +373,14 @@ impl SourceStreamChunkRowWriter<'_> { .unwrap_or(None), )) } - ( - _, - &AdditionalColumnType::Filename, /* AdditionalColumnType::Unspecified and AdditionalColumnType::Normal is means it comes from message payload - * AdditionalColumnType::Key is processed in normal process, together with Unspecified ones */ - ) => Err(AccessError::Other(anyhow!( - "Column type {:?} not implemented yet", - &desc.additional_column_type - ))), + (_, &AdditionalColumnType::Filename) => { + // Filename is used as partition in FS connectors + return Ok(A::output_for( + self.row_meta + .as_ref() + .map(|ele| ScalarImpl::Utf8(ele.split_id.to_string().into())), + )); + } (_, _) => { // For normal columns, call the user provided closure. match f(desc) { From b1572b4b54db6e96a245cfab74399b99b5c0bf00 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 4 Jan 2024 16:55:22 +0800 Subject: [PATCH 09/20] add s3 test --- e2e_test/s3/json_file.py | 10 ++++++---- src/connector/src/parser/mod.rs | 2 -- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/e2e_test/s3/json_file.py b/e2e_test/s3/json_file.py index 7c92f29d76226..76ec86fae0dc4 100644 --- a/e2e_test/s3/json_file.py +++ b/e2e_test/s3/json_file.py @@ -26,10 +26,8 @@ def do_test(client, config, N, prefix): sex int, mark int, ) - include file + include file as file_col WITH ( - - connector = 's3', match_pattern = '{prefix}*.json', s3.region_name = '{config['S3_REGION']}', @@ -64,7 +62,6 @@ def do_test(client, config, N, prefix): cur.execute( 'select count(*), sum(id), sum(sex), sum(mark) from s3_test_jsonfile') result = cur.fetchone() - print(result) assert result[0] == total_row @@ -72,6 +69,11 @@ def do_test(client, config, N, prefix): assert result[2] == int(N / 2) assert result[3] == 0 + cur.execute('select file_col from s3_test_jsonfile') + result = cur.fetchone() + file_col = result[0] + print(file_col) + cur.execute('drop table s3_test_jsonfile') cur.close() diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 0b97a48de27a5..8732aa1541420 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::LazyLock; -use anyhow::anyhow; use auto_enums::auto_enum; pub use avro::AvroParserConfig; pub use canal::*; @@ -49,7 +48,6 @@ use self::upsert_parser::UpsertParser; use self::util::get_kafka_topic; use crate::common::AwsAuthProps; use crate::parser::maxwell::MaxwellParser; -use crate::parser::unified::AccessError; use crate::parser::util::{extract_headers_from_meta, extreact_timestamp_from_meta}; use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; From 512b8d3d006594207419d380379276eba8fae3c2 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 4 Jan 2024 17:02:44 +0800 Subject: [PATCH 10/20] minor fix --- src/connector/src/parser/additional_columns.rs | 14 ++++++++++++++ src/connector/src/source/kinesis/source/message.rs | 1 + 2 files changed, 15 insertions(+) diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 6186347e947e6..cf1c9b3b18c5d 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -208,6 +208,20 @@ fn kinesis_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalCol } }), ), + ( + "timestamp", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Timestamptz, + AdditionalColumnType::Timestamp, + ), + is_hidden: false, + } + }), + ), ] } diff --git a/src/connector/src/source/kinesis/source/message.rs b/src/connector/src/source/kinesis/source/message.rs index 0e4b18140068d..5ae796040afc7 100644 --- a/src/connector/src/source/kinesis/source/message.rs +++ b/src/connector/src/source/kinesis/source/message.rs @@ -26,6 +26,7 @@ pub struct KinesisMessage { } impl From for SourceMessage { + // not in use fn from(msg: KinesisMessage) -> Self { SourceMessage { key: Some(msg.partition_key.into_bytes()), From dcd79dd0ba991276de803524353a5e644eac74ff Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 4 Jan 2024 17:58:23 +0800 Subject: [PATCH 11/20] fix kcat command --- scripts/source/prepare_ci_kafka.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/source/prepare_ci_kafka.sh b/scripts/source/prepare_ci_kafka.sh index d030f87e1f45b..68f69827bfc7c 100755 --- a/scripts/source/prepare_ci_kafka.sh +++ b/scripts/source/prepare_ci_kafka.sh @@ -76,7 +76,7 @@ done # test additional columns: produce messages with headers ADDI_COLUMN_TOPIC="kafka_additional_columns" -for i in {0..100}; do echo "key$i:{\"a\": $i}" | kcat -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done +for i in {0..100}; do echo "key$i:{\"a\": $i}" | ${KCAT_BIN} -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done # write schema with name strategy From 134f571ae2d198ef289998812c0cc0fd092d08fb Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 4 Jan 2024 21:42:21 +0800 Subject: [PATCH 12/20] fix e2e --- e2e_test/source/basic/inlcude_key_as.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source/basic/inlcude_key_as.slt b/e2e_test/source/basic/inlcude_key_as.slt index d5c073a50a865..be8ebac3e8d46 100644 --- a/e2e_test/source/basic/inlcude_key_as.slt +++ b/e2e_test/source/basic/inlcude_key_as.slt @@ -87,7 +87,7 @@ WHERE key_col IS NOT NULL 101 query TT -SELECT (_rw_kafka_header[1]).key AS key,SELECT (_rw_kafka_header[1]).value::text AS value +SELECT (_rw_kafka_header[1]).key AS key, (_rw_kafka_header[1]).value::text AS value FROM additional_columns limit 1; ---- header1 \x7631 From 600300a31babec3f4956f088b4f8788cdd104714 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 4 Jan 2024 22:43:32 +0800 Subject: [PATCH 13/20] fix e2e --- e2e_test/source/basic/inlcude_key_as.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source/basic/inlcude_key_as.slt b/e2e_test/source/basic/inlcude_key_as.slt index be8ebac3e8d46..ccf0df9e0277e 100644 --- a/e2e_test/source/basic/inlcude_key_as.slt +++ b/e2e_test/source/basic/inlcude_key_as.slt @@ -87,7 +87,7 @@ WHERE key_col IS NOT NULL 101 query TT -SELECT (_rw_kafka_header[1]).key AS key, (_rw_kafka_header[1]).value::text AS value +SELECT (header_col[1]).key AS key, (header_col[1]).value::text AS value FROM additional_columns limit 1; ---- header1 \x7631 From 1df7136acd67dc6a66e50927349912e03c16b8e4 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Fri, 5 Jan 2024 14:22:29 +0800 Subject: [PATCH 14/20] update madsim-rdkafka to fix missing apis Signed-off-by: Runji Wang --- Cargo.lock | 44 ++++++-------------------------------------- Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 97c1ac2b7b94d..2b1813d3dc5c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4724,7 +4724,7 @@ dependencies = [ "log", "murmur3", "once_cell", - "opendal 0.43.0", + "opendal", "ordered-float 3.9.1", "parquet 49.0.0", "prometheus", @@ -5473,9 +5473,9 @@ dependencies = [ [[package]] name = "madsim-rdkafka" -version = "0.3.0+0.34.0" +version = "0.3.1+0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00f9ab2d0545a55e4f209fc72c180a7e7b45a4e7baee7b4994c4628a877c5525" +checksum = "79bf1f687bdd0146a72d6132478032d770ebe1dbe42e1f46799c4dcbd9b31cbc" dependencies = [ "async-channel", "async-trait", @@ -6191,39 +6191,6 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" -[[package]] -name = "opendal" -version = "0.43.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c04ac25bc94e2b572a0f41bcc650cb39dd09255efce72b68eb5dc188b018c52a" -dependencies = [ - "anyhow", - "async-compat", - "async-trait", - "backon", - "base64 0.21.4", - "bytes", - "chrono", - "flagset", - "futures", - "http 0.2.9", - "log", - "md-5", - "once_cell", - "parking_lot 0.12.1", - "percent-encoding", - "pin-project", - "prometheus", - "quick-xml 0.30.0", - "reqsign", - "reqwest", - "serde", - "serde_json", - "sha2", - "tokio", - "uuid", -] - [[package]] name = "opendal" version = "0.44.0" @@ -6247,6 +6214,7 @@ dependencies = [ "parking_lot 0.12.1", "percent-encoding", "pin-project", + "prometheus", "quick-xml 0.30.0", "reqsign", "reqwest", @@ -8485,7 +8453,7 @@ dependencies = [ "mysql_common", "nexmark", "num-bigint", - "opendal 0.44.0", + "opendal", "parking_lot 0.12.1", "paste", "pretty_assertions", @@ -9053,7 +9021,7 @@ dependencies = [ "itertools 0.12.0", "madsim-aws-sdk-s3", "madsim-tokio", - "opendal 0.44.0", + "opendal", "prometheus", "risingwave_common", "rustls", diff --git a/Cargo.toml b/Cargo.toml index a46874f9e3da6..c3c1237a4da31 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,7 +107,7 @@ aws-types = "1" etcd-client = { package = "madsim-etcd-client", version = "0.4" } futures-async-stream = "0.2.9" hytra = "0.1" -rdkafka = { package = "madsim-rdkafka", version = "0.3.0", features = [ +rdkafka = { package = "madsim-rdkafka", version = "0.3.1", features = [ "cmake-build", ] } hashbrown = { version = "0.14.0", features = [ From 09c654c37b09b9a27c89bba4cfc55087e4ee45e1 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 5 Jan 2024 15:07:40 +0800 Subject: [PATCH 15/20] remove kinesis message contents --- .../src/source/kinesis/source/message.rs | 31 +------------------ 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/src/connector/src/source/kinesis/source/message.rs b/src/connector/src/source/kinesis/source/message.rs index 37b935ffba2eb..51a3a9f2ed1ec 100644 --- a/src/connector/src/source/kinesis/source/message.rs +++ b/src/connector/src/source/kinesis/source/message.rs @@ -18,25 +18,7 @@ use aws_smithy_types_convert::date_time::DateTimeExt; use crate::source::{SourceMessage, SourceMeta, SplitId}; #[derive(Clone, Debug)] -pub struct KinesisMessage { - pub shard_id: SplitId, - pub sequence_number: String, - pub partition_key: String, - pub payload: Vec, -} - -impl From for SourceMessage { - // not in use - fn from(msg: KinesisMessage) -> Self { - SourceMessage { - key: Some(msg.partition_key.into_bytes()), - payload: Some(msg.payload), - offset: msg.sequence_number.clone(), - split_id: msg.shard_id, - meta: SourceMeta::Empty, - } - } -} +pub struct KinesisMessage {} #[derive(Clone, Debug)] pub struct KinesisMeta { @@ -57,14 +39,3 @@ pub fn from_kinesis_record(value: &Record, split_id: SplitId) -> SourceMessage { }), } } - -impl KinesisMessage { - pub fn new(shard_id: SplitId, message: Record) -> Self { - KinesisMessage { - shard_id, - sequence_number: message.sequence_number, - partition_key: message.partition_key, - payload: message.data.into_inner(), - } - } -} From b60ba68774bb0cdc2a64f278b3bedc2590885e25 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 10 Jan 2024 14:35:55 +0800 Subject: [PATCH 16/20] resolve comments --- e2e_test/source/basic/inlcude_key_as.slt | 6 ++++ src/connector/src/parser/util.rs | 30 ++-------------- .../src/source/kafka/source/message.rs | 35 ++++++++++++++++++- 3 files changed, 43 insertions(+), 28 deletions(-) diff --git a/e2e_test/source/basic/inlcude_key_as.slt b/e2e_test/source/basic/inlcude_key_as.slt index ccf0df9e0277e..83cace5eae6f6 100644 --- a/e2e_test/source/basic/inlcude_key_as.slt +++ b/e2e_test/source/basic/inlcude_key_as.slt @@ -86,6 +86,12 @@ WHERE key_col IS NOT NULL ---- 101 +# the input data is from scripts/source/prepare_ci_kafka.sh +# ``` +# for i in {0..100}; do echo "key$i:{\"a\": $i}" | ${KCAT_BIN} -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done +# ``` +# The command generates 101 messages with key `key0` to `key100` and value `{"a": 0}` to `{"a": 100}`, with fixed headers `header1=v1` and `header2=v2`. + query TT SELECT (header_col[1]).key AS key, (header_col[1]).value::text AS value FROM additional_columns limit 1; diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index cf2b59ad145cf..4e0bd6f2f3f0f 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -14,16 +14,13 @@ use std::collections::HashMap; use bytes::Bytes; -use itertools::Itertools; -use rdkafka::message::Headers; use reqwest::Url; use risingwave_common::error::ErrorCode::{InvalidParameterValue, ProtocolError}; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::{Datum, ListValue, Scalar, ScalarImpl, StructValue}; +use risingwave_common::types::Datum; use crate::aws_utils::load_file_descriptor_from_s3; use crate::common::AwsAuthProps; -use crate::parser::additional_columns::get_kafka_header_item_datatype; use crate::source::SourceMeta; /// get kafka topic name @@ -117,35 +114,14 @@ pub(super) async fn bytes_from_url(url: &Url, config: Option<&AwsAuthProps>) -> pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option { match meta { - SourceMeta::Kafka(kafka_meta) => kafka_meta - .timestamp - .map(|ts| { - risingwave_common::cast::i64_to_timestamptz(ts) - .unwrap() - .to_scalar_value() - }) - .into(), + SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(), _ => None, } } pub fn extract_headers_from_meta(meta: &SourceMeta) -> Option { match meta { - SourceMeta::Kafka(kafka_meta) => kafka_meta.headers.as_ref().map(|header| { - let header_item: Vec = header - .iter() - .map(|header| { - Some(ScalarImpl::Struct(StructValue::new(vec![ - Some(ScalarImpl::Utf8(header.key.to_string().into())), - header.value.map(|byte| ScalarImpl::Bytea(byte.into())), - ]))) - }) - .collect_vec(); - Some(ScalarImpl::List(ListValue::from_datum_iter( - &get_kafka_header_item_datatype(), - header_item, - ))) - }), + SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_headers(), _ => None, } } diff --git a/src/connector/src/source/kafka/source/message.rs b/src/connector/src/source/kafka/source/message.rs index 057b83c961982..52f9722533136 100644 --- a/src/connector/src/source/kafka/source/message.rs +++ b/src/connector/src/source/kafka/source/message.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use rdkafka::message::{BorrowedMessage, OwnedHeaders}; +use itertools::Itertools; +use rdkafka::message::{BorrowedMessage, Headers, OwnedHeaders}; use rdkafka::Message; +use risingwave_common::types::{Datum, ListValue, Scalar, ScalarImpl, StructValue}; +use crate::parser::additional_columns::get_kafka_header_item_datatype; use crate::source::base::SourceMessage; use crate::source::SourceMeta; @@ -25,6 +28,36 @@ pub struct KafkaMeta { pub headers: Option, } +impl KafkaMeta { + pub fn extract_timestamp(&self) -> Option { + self.timestamp + .map(|ts| { + risingwave_common::cast::i64_to_timestamptz(ts) + .unwrap() + .to_scalar_value() + }) + .into() + } + + pub fn extract_headers(&self) -> Option { + self.headers.as_ref().map(|headers| { + let header_item: Vec = headers + .iter() + .map(|header| { + Some(ScalarImpl::Struct(StructValue::new(vec![ + Some(ScalarImpl::Utf8(header.key.to_string().into())), + header.value.map(|byte| ScalarImpl::Bytea(byte.into())), + ]))) + }) + .collect_vec(); + Some(ScalarImpl::List(ListValue::from_datum_iter( + &get_kafka_header_item_datatype(), + header_item, + ))) + }) + } +} + impl SourceMessage { pub fn from_kafka_message(message: &BorrowedMessage<'_>, require_header: bool) -> Self { SourceMessage { From a719ee46b02cba9094ad596e8fd9dded0e2db2bd Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 10 Jan 2024 15:20:47 +0800 Subject: [PATCH 17/20] feat: Improve handling of Kafka source timestamps - Add `_rw_kafka_timestamp` column to messages from Kafka source - Handle addition of columns and bind primary key columns - Set connector to backfill mode and enable CDC sharing mode - Check and add timestamp column before generating column IDs - Throw error if source does not support PRIMARY KEY constraint - Bind source watermark based on columns - Resolve privatelink connection for Kafka source - Create PbSource object with provided properties - Import `KAFKA_TIMESTAMP_COLUMN_NAME` and handle legacy column in `trad_source.rs` Signed-off-by: tabVersion --- src/frontend/src/handler/create_source.rs | 27 +++++++++++------ .../src/from_proto/source/trad_source.rs | 29 ++++++++++++++++++- 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index c0d3248b5af8a..d314a93b9662e 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -821,16 +821,24 @@ fn check_and_add_timestamp_column( columns: &mut Vec, ) { if is_kafka_connector(with_properties) { - let kafka_timestamp_column = ColumnCatalog { - column_desc: ColumnDesc::named( - KAFKA_TIMESTAMP_COLUMN_NAME, - ColumnId::placeholder(), - DataType::Timestamptz, - ), + if columns + .iter() + .any(|col| col.column_desc.additional_column_type == AdditionalColumnType::Timestamp) + { + // already has timestamp column, no need to add a new one + return; + } - is_hidden: true, - }; - columns.push(kafka_timestamp_column); + // add a hidden column `_rw_kafka_timestamp` to each message from Kafka source + let mut catalog = get_connector_compatible_additional_columns(KAFKA_CONNECTOR) + .unwrap() + .iter() + .find(|(col_name, _)| col_name.eq(&"timestamp")) + .unwrap() + .1(ColumnId::placeholder(), KAFKA_TIMESTAMP_COLUMN_NAME); + catalog.is_hidden = true; + + columns.push(catalog); } } @@ -1158,6 +1166,7 @@ pub async fn handle_create_source( with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into()); } + // must behind `handle_addition_columns` check_and_add_timestamp_column(&with_properties, &mut columns); let mut col_id_gen = ColumnIdGenerator::new_initial(); diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 7927ed3aa1827..5b93585aea4b1 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::catalog::{default_key_column_name_version_mapping, ColumnId, TableId}; +use risingwave_common::catalog::{ + default_key_column_name_version_mapping, ColumnId, TableId, KAFKA_TIMESTAMP_COLUMN_NAME, +}; use risingwave_connector::source::{ConnectorProperties, SourceCtrlOpts}; use risingwave_pb::data::data_type::TypeName as PbTypeName; use risingwave_pb::plan_common::{ @@ -83,6 +85,31 @@ impl ExecutorBuilder for SourceExecutorBuilder { } } + { + // compatible code: handle legacy column `_rw_kafka_timestamp` + // the column is auto added for all kafka source to empower batch query on source + // solution: rewrite the column `additional_column_type` to Timestamp + + let _ = source_columns.iter_mut().map(|c| { + let _ = c.column_desc.as_mut().map(|desc| { + let is_timestamp = desc + .get_column_type() + .map(|col_type| { + col_type.type_name == PbTypeName::Timestamptz as i32 + }) + .unwrap(); + if desc.name == KAFKA_TIMESTAMP_COLUMN_NAME + && is_timestamp + // the column is from a legacy version + && desc.version == ColumnDescVersion::Unspecified as i32 + { + desc.additional_column_type = + AdditionalColumnType::Timestamp as i32; + } + }); + }); + } + let source_desc_builder = SourceDescBuilder::new( source_columns.clone(), params.env.source_metrics(), From ce7adff9113d0a96f8816f152e08e4a3c6d109bf Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 10 Jan 2024 21:29:40 +0800 Subject: [PATCH 18/20] feat: Refactor and optimize Kafka data pipeline - Added new source `s10` with columns `v1` and `v2` - Included a timestamp column `some_ts` in the `s10` source - Configured `s10` source as a Kafka connector with topic, bootstrap server, and startup mode properties - Implemented a query to filter rows from `s10` based on a specific timestamp - Dropped tables `s8` and `s9` - Removed source `s9` - Removed source `s10` Signed-off-by: tabVersion --- e2e_test/source/basic/kafka_batch.slt | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/e2e_test/source/basic/kafka_batch.slt b/e2e_test/source/basic/kafka_batch.slt index cc9a50e1d4108..ef88952975d2b 100644 --- a/e2e_test/source/basic/kafka_batch.slt +++ b/e2e_test/source/basic/kafka_batch.slt @@ -76,6 +76,16 @@ create source s9 (id bytea) with ( scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE BYTES +statement ok +create source s10 (v1 int, v2 varchar) +include timestamp as some_ts +with ( + connector = 'kafka', + topic = 'kafka_1_partition_topic', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON + query IT rowsort select * from s1 ---- @@ -219,8 +229,20 @@ select id from s9 order by id \x6b6b \x776561776566776566 +# query from a kafka timestamp column with alias +query IT rowsort +select * from s10 where some_ts > '1977-01-01 00:00:00+00:00' +---- +1 1 +2 22 +3 333 +4 4444 + statement ok drop table s8 statement ok -drop source s9 \ No newline at end of file +drop source s9 + +statement ok +drop source s10 From 5c62b5a2d83a5f3b111fc638224c93e8d14aa3dc Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 10 Jan 2024 21:48:07 +0800 Subject: [PATCH 19/20] rerun Signed-off-by: tabVersion From c6ad3458bb975ad19c1e3f060315ef84cdb65c72 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 10 Jan 2024 22:29:11 +0800 Subject: [PATCH 20/20] fix --- e2e_test/source/basic/kafka_batch.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source/basic/kafka_batch.slt b/e2e_test/source/basic/kafka_batch.slt index ef88952975d2b..421a1e6cbc298 100644 --- a/e2e_test/source/basic/kafka_batch.slt +++ b/e2e_test/source/basic/kafka_batch.slt @@ -231,7 +231,7 @@ select id from s9 order by id # query from a kafka timestamp column with alias query IT rowsort -select * from s10 where some_ts > '1977-01-01 00:00:00+00:00' +select v1, v2 from s10 where some_ts > '1977-01-01 00:00:00+00:00' ---- 1 1 2 22