From 3e0179535abfc4195badc20505fd1cde01543ed9 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 7 Jun 2024 17:03:02 +0800 Subject: [PATCH] pref(connector): avoid allocation for metadata column when parsing --- src/connector/src/parser/mod.rs | 63 ++++++++++--------- src/connector/src/parser/util.rs | 18 +++--- .../src/source/cdc/source/message.rs | 27 ++++---- .../src/source/kafka/source/message.rs | 61 ++++++++++-------- 4 files changed, 86 insertions(+), 83 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index d439da2491e6b..f7667a66a3747 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -30,7 +30,7 @@ use risingwave_common::bail; use risingwave_common::catalog::{KAFKA_TIMESTAMP_COLUMN_NAME, TABLE_NAME_COLUMN_NAME}; use risingwave_common::log::LogSuppresser; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; -use risingwave_common::types::{Datum, DatumCow, Scalar, ScalarImpl}; +use risingwave_common::types::{Datum, DatumCow, DatumRef, ScalarRefImpl}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::tracing::InstrumentStream; use risingwave_connector_codec::decoder::avro::MapHandling; @@ -203,17 +203,17 @@ pub struct MessageMeta<'a> { offset: &'a str, } -impl MessageMeta<'_> { +impl<'a> MessageMeta<'a> { /// Extract the value for the given column. /// /// Returns `None` if the column is not a meta column. - fn value_for_column(self, desc: &SourceColumnDesc) -> Option { - match desc.column_type { + fn value_for_column(self, desc: &SourceColumnDesc) -> Option> { + let datum: DatumRef<'_> = match desc.column_type { // Row id columns are filled with `NULL` here and will be filled with the real // row id generated by `RowIdGenExecutor` later. - SourceColumnType::RowId => Datum::None.into(), + SourceColumnType::RowId => None, // Extract the offset from the meta data. - SourceColumnType::Offset => Datum::Some(self.offset.into()).into(), + SourceColumnType::Offset => Some(self.offset.into()), // Extract custom meta data per connector. SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.meta => { assert_eq!( @@ -221,14 +221,11 @@ impl MessageMeta<'_> { KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected kafka meta column name" ); - kafka_meta - .timestamp - .map(|ts| { - risingwave_common::cast::i64_to_timestamptz(ts) - .unwrap() - .to_scalar_value() - }) - .into() + kafka_meta.timestamp.map(|ts| { + risingwave_common::cast::i64_to_timestamptz(ts) + .unwrap() + .into() + }) } SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.meta => { assert_eq!( @@ -236,19 +233,21 @@ impl MessageMeta<'_> { TABLE_NAME_COLUMN_NAME, "unexpected cdc meta column name" ); - Datum::Some(cdc_meta.full_table_name.as_str().into()).into() + Some(cdc_meta.full_table_name.as_str().into()) } // For other cases, return `None`. - SourceColumnType::Meta | SourceColumnType::Normal => None, - } + SourceColumnType::Meta | SourceColumnType::Normal => return None, + }; + + Some(datum) } } trait OpAction { type Output<'a>; - fn output_for<'a>(datum: Datum) -> Self::Output<'a>; + fn output_for<'a>(datum: impl Into>) -> Self::Output<'a>; fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>); @@ -263,7 +262,7 @@ impl OpAction for OpActionInsert { type Output<'a> = DatumCow<'a>; #[inline(always)] - fn output_for<'a>(datum: Datum) -> Self::Output<'a> { + fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { datum.into() } @@ -289,7 +288,7 @@ impl OpAction for OpActionDelete { type Output<'a> = DatumCow<'a>; #[inline(always)] - fn output_for<'a>(datum: Datum) -> Self::Output<'a> { + fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { datum.into() } @@ -315,8 +314,9 @@ impl OpAction for OpActionUpdate { type Output<'a> = (DatumCow<'a>, DatumCow<'a>); #[inline(always)] - fn output_for<'a>(datum: Datum) -> Self::Output<'a> { - (datum.clone().into(), datum.into()) + fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { + let datum = datum.into(); + (datum.clone(), datum) } #[inline(always)] @@ -345,7 +345,7 @@ impl SourceStreamChunkRowWriter<'_> { } fn do_action<'a, A: OpAction>( - &mut self, + &'a mut self, mut f: impl FnMut(&SourceColumnDesc) -> AccessResult>, ) -> AccessResult<()> { let mut parse_field = |desc: &SourceColumnDesc| { @@ -411,10 +411,11 @@ impl SourceStreamChunkRowWriter<'_> { match self.row_meta { Some(row_meta) => { if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.meta { - Ok(A::output_for( - extract_cdc_meta_column(cdc_meta, col, desc.name.as_str())? - .unwrap_or(None), - )) + Ok(A::output_for(extract_cdc_meta_column( + cdc_meta, + col, + desc.name.as_str(), + )?)) } else { Err(AccessError::Uncategorized { message: "CDC metadata not found in the message".to_string(), @@ -439,7 +440,7 @@ impl SourceStreamChunkRowWriter<'_> { return Ok(A::output_for( self.row_meta .as_ref() - .map(|ele| ScalarImpl::Utf8(ele.split_id.to_string().into())), + .map(|ele| ScalarRefImpl::Utf8(ele.split_id)), )); } (_, &Some(AdditionalColumnType::Offset(_))) => { @@ -447,7 +448,7 @@ impl SourceStreamChunkRowWriter<'_> { return Ok(A::output_for( self.row_meta .as_ref() - .map(|ele| ScalarImpl::Utf8(ele.offset.to_string().into())), + .map(|ele| ScalarRefImpl::Utf8(ele.offset)), )); } (_, &Some(AdditionalColumnType::HeaderInner(ref header_inner))) => { @@ -461,7 +462,7 @@ impl SourceStreamChunkRowWriter<'_> { header_inner.data_type.as_ref(), ) }) - .unwrap_or(None), + .unwrap_or(Datum::None.into()), )) } (_, &Some(AdditionalColumnType::Headers(_))) => { @@ -477,7 +478,7 @@ impl SourceStreamChunkRowWriter<'_> { return Ok(A::output_for( self.row_meta .as_ref() - .map(|ele| ScalarImpl::Utf8(ele.split_id.to_string().into())), + .map(|ele| ScalarRefImpl::Utf8(ele.split_id)), )); } (_, _) => { diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 4afbbb718d54a..30cb1fbf7d62e 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -17,7 +17,7 @@ use anyhow::Context; use bytes::Bytes; use reqwest::Url; use risingwave_common::bail; -use risingwave_common::types::Datum; +use risingwave_common::types::{Datum, DatumCow, DatumRef}; use risingwave_pb::data::DataType as PbDataType; use crate::aws_utils::load_file_descriptor_from_s3; @@ -132,19 +132,19 @@ pub(super) async fn bytes_from_url( } } -pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option { +pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option> { match meta { SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(), - SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta.extract_timestamp(), + SourceMeta::DebeziumCdc(cdc_meta) => Some(cdc_meta.extract_timestamp()), _ => None, } } -pub fn extract_cdc_meta_column( - cdc_meta: &DebeziumCdcMeta, +pub fn extract_cdc_meta_column<'a>( + cdc_meta: &'a DebeziumCdcMeta, column_type: &additional_column::ColumnType, column_name: &str, -) -> AccessResult> { +) -> AccessResult> { match column_type { ColumnType::Timestamp(_) => Ok(cdc_meta.extract_timestamp()), ColumnType::DatabaseName(_) => Ok(cdc_meta.extract_database_name()), @@ -162,11 +162,11 @@ pub fn extract_headers_from_meta(meta: &SourceMeta) -> Option { } } -pub fn extract_header_inner_from_meta( - meta: &SourceMeta, +pub fn extract_header_inner_from_meta<'a>( + meta: &'a SourceMeta, inner_field: &str, data_type: Option<&PbDataType>, -) -> Option { +) -> Option> { match meta { SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_header_inner(inner_field, data_type), /* expect output of type `bytea` or `varchar` */ _ => None, diff --git a/src/connector/src/source/cdc/source/message.rs b/src/connector/src/source/cdc/source/message.rs index e74ed55ce1f9f..f12d18339b527 100644 --- a/src/connector/src/source/cdc/source/message.rs +++ b/src/connector/src/source/cdc/source/message.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::{Datum, Scalar, ScalarImpl, Timestamptz}; +use risingwave_common::types::{DatumRef, ScalarRefImpl, Timestamptz}; use risingwave_pb::connector_service::CdcMessage; use crate::source::base::SourceMessage; @@ -30,27 +30,22 @@ pub struct DebeziumCdcMeta { } impl DebeziumCdcMeta { - pub fn extract_timestamp(&self) -> Option { - Some( - Timestamptz::from_millis(self.source_ts_ms) - .unwrap() - .to_scalar_value(), - ) - .into() + pub fn extract_timestamp(&self) -> DatumRef<'_> { + Some(ScalarRefImpl::Timestamptz( + Timestamptz::from_millis(self.source_ts_ms).unwrap(), + )) } - pub fn extract_database_name(&self) -> Option { - Some(ScalarImpl::from( - self.full_table_name.as_str()[0..self.db_name_prefix_len].to_string(), + pub fn extract_database_name(&self) -> DatumRef<'_> { + Some(ScalarRefImpl::Utf8( + &self.full_table_name.as_str()[0..self.db_name_prefix_len], )) - .into() } - pub fn extract_table_name(&self) -> Option { - Some(ScalarImpl::from( - self.full_table_name.as_str()[self.db_name_prefix_len..].to_string(), + pub fn extract_table_name(&self) -> DatumRef<'_> { + Some(ScalarRefImpl::Utf8( + &self.full_table_name.as_str()[self.db_name_prefix_len..], )) - .into() } pub fn new(full_table_name: String, source_ts_ms: i64, is_transaction_meta: bool) -> Self { diff --git a/src/connector/src/source/kafka/source/message.rs b/src/connector/src/source/kafka/source/message.rs index 0ef55dc79132d..247166a156763 100644 --- a/src/connector/src/source/kafka/source/message.rs +++ b/src/connector/src/source/kafka/source/message.rs @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; + use itertools::Itertools; use rdkafka::message::{BorrowedMessage, Headers, OwnedHeaders}; use rdkafka::Message; -use risingwave_common::types::{Datum, ListValue, Scalar, ScalarImpl, StructValue}; +use risingwave_common::types::{ + Datum, DatumCow, DatumRef, ListValue, ScalarImpl, ScalarRefImpl, StructValue, +}; use risingwave_pb::data::data_type::TypeName as PbTypeName; use risingwave_pb::data::DataType as PbDataType; @@ -31,39 +35,42 @@ pub struct KafkaMeta { } impl KafkaMeta { - pub fn extract_timestamp(&self) -> Option { - self.timestamp - .map(|ts| { - risingwave_common::cast::i64_to_timestamptz(ts) - .unwrap() - .to_scalar_value() - }) - .into() + pub fn extract_timestamp(&self) -> Option> { + self.timestamp.map(|ts| { + Some(ScalarRefImpl::Timestamptz( + risingwave_common::cast::i64_to_timestamptz(ts).unwrap(), + )) + }) } - pub fn extract_header_inner( - &self, + pub fn extract_header_inner<'a>( + &'a self, inner_field: &str, data_type: Option<&PbDataType>, - ) -> Option { - let target_value = self - .headers - .as_ref() - .iter() - .find_map(|headers| { - headers - .iter() - .find(|header| header.key == inner_field) - .map(|header| header.value) - }) - .unwrap_or(None); // if not found the specified column, return None - if let Some(data_type) = data_type + ) -> Option> { + let target_value = self.headers.as_ref().iter().find_map(|headers| { + headers + .iter() + .find(|header| header.key == inner_field) + .map(|header| header.value) + })?; // if not found the specified column, return None + + let Some(target_value) = target_value else { + return Some(Datum::None.into()); + }; + + let datum = if let Some(data_type) = data_type && data_type.type_name == PbTypeName::Varchar as i32 { - Some(target_value.map(|byte| ScalarImpl::Utf8(String::from_utf8_lossy(byte).into()))) + match String::from_utf8_lossy(target_value) { + Cow::Borrowed(str) => Some(ScalarRefImpl::Utf8(str)).into(), + Cow::Owned(string) => Some(ScalarImpl::Utf8(string.into())).into(), + } } else { - Some(target_value.map(|byte| ScalarImpl::Bytea(byte.into()))) - } + Some(ScalarRefImpl::Bytea(target_value)).into() + }; + + Some(datum) } pub fn extract_headers(&self) -> Option {